Go 语言实现——网络 IO

Go 的 net 包提供的是阻塞的接口,但在底层实现上却是非阻塞的,类似于 Python 的 gevent 库。

Go 在 runtime 底层实现了一个 netpoller,net 包里的网络函数在需要阻塞的地方会调用 runtime 提供的接口向 netpoller 注册对应 fd 的读写事件并将当前 goroutine 从 running 状态切换成 waiting 状态挂起,netpoller 后台有一个线程不断的 poll 所有注册的 fd,在事件触发后再将其关联的 goroutine 从 waiting 状态切换成 runnable 状态并加到全局 goroutine 运行队列中去。

从非阻塞到阻塞相关的封装主要涉及以下三个结构体:

// net/fd_unix.go
// 封装了 socket fd 相关的 connect、read、write、close 等接口
// 提供给 net 包更上层的封装使用
type netFD struct {
    pfd poll.FD
}

// internal/poll/fd_unix.go
// 对 runtime netpoller 进一步的封装
type FD struct {
    Sysfd int
    pd pollDesc
}

// runtime/netpoll.go
// 底层 netpoller 用数据结构
type pollDesc struct {
    link *pollDesc
    fd      uintptr
    rg      uintptr // pdReady, pdWait, 等待读事件的 goroutine 或 nil
    wg      uintptr // pdReady, pdWait, 等待写事件的 goroutine 或 nil
}

netFD 这个结构体由 socket 接口创建:

// net/sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
    // sysSocket 函数里会调用 socket 系统调用创建一个新的网络 fd
    // 并将其设置成非阻塞的返回。
    sysfd, err := sysSocket(family, sotype, proto)
    // 将网络 fd 封装成 netFD 结构体返回。
    return &netFD{
        pfd: poll.FD{
            Sysfd:         sysfd,
        },
    }, nil
}

因为 socket 创建的 netFD 是非阻塞的,netFD 实现各个接口时在调用本该阻塞函数的地方后面需要插入等待 fd 相关事件发生的代码,从而将非阻塞的 fd 操作转换成阻塞的形式,以 netFD.connect 为例:

func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, ret error) {
    // connectFunc = syscall.Connect
    switch err := connectFunc(fd.pfd.Sysfd, ra); err {
    case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
    case nil, syscall.EISCONN:
        // 连接成功,向 netpoller 注册这个 fd 的所有读写事件,返回
        if err := fd.pfd.Init(fd.net, true); err != nil {
            return nil, err
        }
        return nil, nil
    default:
        return nil, os.NewSyscallError("connect", err)
    }
    // 向 netpoller 注册这个 fd 的所有读写事件
    if err := fd.pfd.Init(fd.net, true); err != nil {
        return nil, err
    }

    for {
        // 挂起本 goroutine,等待 fd 的写事件
        if err := fd.pfd.WaitWrite(); err != nil {
            return nil, err
        }
        nerr, err := getsockoptIntFunc(fd.pfd.Sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
        if err != nil {
            return nil, os.NewSyscallError("getsockopt", err)
        }
        switch err := syscall.Errno(nerr); err {
        case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
            // 还没连接上,继续等待
        case syscall.EISCONN:
            // 连接已经建立,返回
            return nil, nil
        default:
            return nil, os.NewSyscallError("getsockopt", err)
        }
    }
}

这里,和 “注册事件,设置 callback,return” 的方式不一样, fd.pfd.WaitWrite() 这个函数调用会调用 Go 调度的接口“将 goroutine 挂起,直到等待的事件发生后再从挂起的地方继续向下执行”。

// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error {
    return fd.pd.init(fd)
}
// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
    // var serverInit sync.Once
    // 如果 netpoller 还没初始化,先初始化
    serverInit.Do(runtime_pollServerInit)
    // 初始化 fd.pd 结构体,并调用 epoll_ctrl 注册 fd.Sysfd 相关的事件
    // EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET
    // runtime_pollOpen 是 runtime/netpoll.go 提供的接口
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        return syscall.Errno(errno)
    }
    return nil
}

// internal/poll/fd_unix.go
func (fd *FD) WaitWrite() error {
    return fd.pd.waitWrite(fd.isFile)
}
// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) waitWrite(isFile bool) error {
    return pd.wait('w', isFile)
}
func (pd *pollDesc) waitCanceled(mode int) {
    runtime_pollWaitCanceled(pd.runtimeCtx, mode)
}
// runtime/netpoll.go
func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
    for !netpollblock(pd, int32(mode), true) {
    }
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        old := *gpp
        // 如果等待的事件已发生,直接返回
        // netpoller 会 poll fd 的所有事件,并不只是当前等待的事件
        if old == pdReady {
            *gpp = 0
            return true
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    if waitio || netpollcheckerr(pd, mode) == 0 {
        // func netpollblockcommit () {
        //   return atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
        // }
        // 调用 netpollblockcommit 将当前 goroutine 的 g 指针 赋值给 gpp
        // 如果 gpp 不是 pdWait,赋值失败,说明事件已经发生了,直接返回
        // 否则挂起当前 goroutine,直到下次调度到执行再返回。
        // gopark 是 runtime/proc.go 中提供的。
        gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    }
    old := atomic.Xchguintptr(gpp, 0)
    // 返回等待的事件是不是已经 ready
    return old == pdReady
}

最后,sysmon 中每隔一段时间会调用 netpoll poll 所有注册的 fd,并将有事件发生的 fd 的关联的 goroutine 重新加入到可执行队列中。

func sysmon() {
    for {
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        now := nanotime()
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            // poll 是否有 fd 有事件发生,返回有事件发生 fd 关联的 goroutine 列表,每 10ms 执行一次
            gp := netpoll(false)
            if gp != nil {
                // 将 goroutine 改成 runnable 状态并插入到全局 goroutine 队列去
                injectglist(gp)
            }
        }
    }
}