Go 语言实现——网络 IO¶
Go 的 net 包提供的是阻塞的接口,但在底层实现上却是非阻塞的,类似于 Python 的 gevent 库。
Go 在 runtime 底层实现了一个 netpoller,net 包里的网络函数在需要阻塞的地方会调用 runtime 提供的接口向 netpoller 注册对应 fd 的读写事件并将当前 goroutine 从 running 状态切换成 waiting 状态挂起,netpoller 后台有一个线程不断的 poll 所有注册的 fd,在事件触发后再将其关联的 goroutine 从 waiting 状态切换成 runnable 状态并加到全局 goroutine 运行队列中去。
从非阻塞到阻塞相关的封装主要涉及以下三个结构体:
// net/fd_unix.go
// 封装了 socket fd 相关的 connect、read、write、close 等接口
// 提供给 net 包更上层的封装使用
type netFD struct {
pfd poll.FD
}
// internal/poll/fd_unix.go
// 对 runtime netpoller 进一步的封装
type FD struct {
Sysfd int
pd pollDesc
}
// runtime/netpoll.go
// 底层 netpoller 用数据结构
type pollDesc struct {
link *pollDesc
fd uintptr
rg uintptr // pdReady, pdWait, 等待读事件的 goroutine 或 nil
wg uintptr // pdReady, pdWait, 等待写事件的 goroutine 或 nil
}
netFD 这个结构体由 socket 接口创建:
// net/sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
// sysSocket 函数里会调用 socket 系统调用创建一个新的网络 fd
// 并将其设置成非阻塞的返回。
sysfd, err := sysSocket(family, sotype, proto)
// 将网络 fd 封装成 netFD 结构体返回。
return &netFD{
pfd: poll.FD{
Sysfd: sysfd,
},
}, nil
}
因为 socket 创建的 netFD 是非阻塞的,netFD 实现各个接口时在调用本该阻塞函数的地方后面需要插入等待 fd 相关事件发生的代码,从而将非阻塞的 fd 操作转换成阻塞的形式,以 netFD.connect 为例:
func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) {
// connectFunc = syscall.Connect
switch err := connectFunc(fd.pfd.Sysfd, ra); err {
case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
case nil, syscall.EISCONN:
// 连接成功,向 netpoller 注册这个 fd 的所有读写事件,返回
if err := fd.pfd.Init(fd.net, true); err != nil {
return nil, err
}
return nil, nil
default:
return nil, os.NewSyscallError("connect", err)
}
// 向 netpoller 注册这个 fd 的所有读写事件
if err := fd.pfd.Init(fd.net, true); err != nil {
return nil, err
}
for {
// 挂起本 goroutine,等待 fd 的写事件
if err := fd.pfd.WaitWrite(); err != nil {
return nil, err
}
nerr, err := getsockoptIntFunc(fd.pfd.Sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
if err != nil {
return nil, os.NewSyscallError("getsockopt", err)
}
switch err := syscall.Errno(nerr); err {
case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
// 还没连接上,继续等待
case syscall.EISCONN:
// 连接已经建立,返回
return nil, nil
default:
return nil, os.NewSyscallError("getsockopt", err)
}
}
}
这里,和 “注册事件,设置 callback,return” 的方式不一样, fd.pfd.WaitWrite() 这个函数调用会调用 Go 调度的接口“将 goroutine 挂起,直到等待的事件发生后再从挂起的地方继续向下执行”。
// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error {
return fd.pd.init(fd)
}
// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
// var serverInit sync.Once
// 如果 netpoller 还没初始化,先初始化
serverInit.Do(runtime_pollServerInit)
// 初始化 fd.pd 结构体,并调用 epoll_ctrl 注册 fd.Sysfd 相关的事件
// EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET
// runtime_pollOpen 是 runtime/netpoll.go 提供的接口
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return syscall.Errno(errno)
}
return nil
}
// internal/poll/fd_unix.go
func (fd *FD) WaitWrite() error {
return fd.pd.waitWrite(fd.isFile)
}
// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
func (pd *pollDesc) waitCanceled(mode int) {
runtime_pollWaitCanceled(pd.runtimeCtx, mode)
}
// runtime/netpoll.go
func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
for !netpollblock(pd, int32(mode), true) {
}
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
// 如果等待的事件已发生,直接返回
// netpoller 会 poll fd 的所有事件,并不只是当前等待的事件
if old == pdReady {
*gpp = 0
return true
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
if waitio || netpollcheckerr(pd, mode) == 0 {
// func netpollblockcommit () {
// return atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
// }
// 调用 netpollblockcommit 将当前 goroutine 的 g 指针 赋值给 gpp
// 如果 gpp 不是 pdWait,赋值失败,说明事件已经发生了,直接返回
// 否则挂起当前 goroutine,直到下次调度到执行再返回。
// gopark 是 runtime/proc.go 中提供的。
gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
}
old := atomic.Xchguintptr(gpp, 0)
// 返回等待的事件是不是已经 ready
return old == pdReady
}
最后,sysmon 中每隔一段时间会调用 netpoll poll 所有注册的 fd,并将有事件发生的 fd 的关联的 goroutine 重新加入到可执行队列中。
func sysmon() {
for {
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
// poll 是否有 fd 有事件发生,返回有事件发生 fd 关联的 goroutine 列表,每 10ms 执行一次
gp := netpoll(false)
if gp != nil {
// 将 goroutine 改成 runnable 状态并插入到全局 goroutine 队列去
injectglist(gp)
}
}
}
}