Go 语言实现——网络 IO ======================= Go 的 net 包提供的是阻塞的接口,但在底层实现上却是非阻塞的,类似于 Python 的 gevent 库。 Go 在 runtime 底层实现了一个 netpoller,net 包里的网络函数在需要阻塞的地方会调用 runtime 提供的接口向 netpoller 注册对应 fd 的读写事件并将当前 goroutine 从 running 状态切换成 waiting 状态挂起,netpoller 后台有一个线程不断的 poll 所有注册的 fd,在事件触发后再将其关联的 goroutine 从 waiting 状态切换成 runnable 状态并加到全局 goroutine 运行队列中去。 从非阻塞到阻塞相关的封装主要涉及以下三个结构体: .. code-block:: go // net/fd_unix.go // 封装了 socket fd 相关的 connect、read、write、close 等接口 // 提供给 net 包更上层的封装使用 type netFD struct { pfd poll.FD } // internal/poll/fd_unix.go // 对 runtime netpoller 进一步的封装 type FD struct { Sysfd int pd pollDesc } // runtime/netpoll.go // 底层 netpoller 用数据结构 type pollDesc struct { link *pollDesc fd uintptr rg uintptr // pdReady, pdWait, 等待读事件的 goroutine 或 nil wg uintptr // pdReady, pdWait, 等待写事件的 goroutine 或 nil } netFD 这个结构体由 socket 接口创建: .. code-block:: go // net/sock_posix.go func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) { // sysSocket 函数里会调用 socket 系统调用创建一个新的网络 fd // 并将其设置成非阻塞的返回。 sysfd, err := sysSocket(family, sotype, proto) // 将网络 fd 封装成 netFD 结构体返回。 return &netFD{ pfd: poll.FD{ Sysfd: sysfd, }, }, nil } 因为 socket 创建的 netFD 是非阻塞的,netFD 实现各个接口时在调用本该阻塞函数的地方后面需要插入等待 fd 相关事件发生的代码,从而将非阻塞的 fd 操作转换成阻塞的形式,以 netFD.connect 为例: .. code-block:: go func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) { // connectFunc = syscall.Connect switch err := connectFunc(fd.pfd.Sysfd, ra); err { case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR: case nil, syscall.EISCONN: // 连接成功,向 netpoller 注册这个 fd 的所有读写事件,返回 if err := fd.pfd.Init(fd.net, true); err != nil { return nil, err } return nil, nil default: return nil, os.NewSyscallError("connect", err) } // 向 netpoller 注册这个 fd 的所有读写事件 if err := fd.pfd.Init(fd.net, true); err != nil { return nil, err } for { // 挂起本 goroutine,等待 fd 的写事件 if err := fd.pfd.WaitWrite(); err != nil { return nil, err } nerr, err := getsockoptIntFunc(fd.pfd.Sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR) if err != nil { return nil, os.NewSyscallError("getsockopt", err) } switch err := syscall.Errno(nerr); err { case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR: // 还没连接上,继续等待 case syscall.EISCONN: // 连接已经建立,返回 return nil, nil default: return nil, os.NewSyscallError("getsockopt", err) } } } 这里,和 “注册事件,设置 callback,return” 的方式不一样, fd.pfd.WaitWrite() 这个函数调用会调用 Go 调度的接口“将 goroutine 挂起,直到等待的事件发生后再从挂起的地方继续向下执行”。 .. code-block:: go // internal/poll/fd_unix.go func (fd *FD) Init(net string, pollable bool) error { return fd.pd.init(fd) } // internal/poll/fd_poll_runtime.go func (pd *pollDesc) init(fd *FD) error { // var serverInit sync.Once // 如果 netpoller 还没初始化,先初始化 serverInit.Do(runtime_pollServerInit) // 初始化 fd.pd 结构体,并调用 epoll_ctrl 注册 fd.Sysfd 相关的事件 // EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET // runtime_pollOpen 是 runtime/netpoll.go 提供的接口 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) if errno != 0 { return syscall.Errno(errno) } return nil } // internal/poll/fd_unix.go func (fd *FD) WaitWrite() error { return fd.pd.waitWrite(fd.isFile) } // internal/poll/fd_poll_runtime.go func (pd *pollDesc) waitWrite(isFile bool) error { return pd.wait('w', isFile) } func (pd *pollDesc) waitCanceled(mode int) { runtime_pollWaitCanceled(pd.runtimeCtx, mode) } // runtime/netpoll.go func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) { for !netpollblock(pd, int32(mode), true) { } } func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := *gpp // 如果等待的事件已发生,直接返回 // netpoller 会 poll fd 的所有事件,并不只是当前等待的事件 if old == pdReady { *gpp = 0 return true } if atomic.Casuintptr(gpp, 0, pdWait) { break } } if waitio || netpollcheckerr(pd, mode) == 0 { // func netpollblockcommit () { // return atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) // } // 调用 netpollblockcommit 将当前 goroutine 的 g 指针 赋值给 gpp // 如果 gpp 不是 pdWait,赋值失败,说明事件已经发生了,直接返回 // 否则挂起当前 goroutine,直到下次调度到执行再返回。 // gopark 是 runtime/proc.go 中提供的。 gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5) } old := atomic.Xchguintptr(gpp, 0) // 返回等待的事件是不是已经 ready return old == pdReady } 最后,*sysmon* 中每隔一段时间会调用 netpoll poll 所有注册的 fd,并将有事件发生的 fd 的关联的 goroutine 重新加入到可执行队列中。 .. code-block:: go func sysmon() { for { lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) // poll 是否有 fd 有事件发生,返回有事件发生 fd 关联的 goroutine 列表,每 10ms 执行一次 gp := netpoll(false) if gp != nil { // 将 goroutine 改成 runnable 状态并插入到全局 goroutine 队列去 injectglist(gp) } } } }