0. 简介

上篇博客提到了goroutine有关体系调用的调度进行了叙述,关于IO密集型的拜访,每次请求都或许导致一次M的创立,这其实是不能承受的。Go SDK为了处理网络IO密集型的运用场景,开发了网络轮询器这一关键组件。


1. IO多路复用


1.1 select&poll




1.2 epoll

int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...);
listen(s, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); // 将一切需求监听的socket添加到epfd中
while(1) {
    int n = epoll_wait(...);
        // 处理逻辑



  1. epoll在内核中运用红黑树跟踪文件描绘符调集,大大缩减了时刻,比较于select/poll遍历调集的 O(N) 的时刻复杂度,红黑树的时刻复杂度是 O(logN)
  2. epoll运用事情驱动的机制(前缀e应该便是event的首字母),当某个文件描绘符有消息时,当用户调用epoll_wait函数时,只会回来有事情产生的文件描绘符的个数,在空间上也大大节省了。


1.3 磁盘IO是否适用多路复用IO

需求阐明的是,以下评论均指产生在Linux体系中的多路复用;国外文件一般不称磁盘IO为disk io,而是regular file



Under Linux, select() may report a socket file descriptor as “ready for reading”, while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.


在 Linux 下,select() 或许会将套接字文件描绘符陈述为“准备好读取”,但随后的读取会堵塞。 例如,这或许产生在数据已抵达但查看时校验和过错并被丢弃的状况下。 或许存在文件描绘符被虚假陈述为安排妥当的其他状况。 因此,在不应堵塞的套接字上运用 O_NONBLOCK 或许更安全。

简略来说,至少多路复用的select方法被官方推荐运用O_NONBLOCK来设置成非堵塞IO。不过,磁盘IO并不支撑非堵塞IO,依据Non-blocking I/O with regular files:

Regular files arealwaysreadable and they are alsoalwayswriteable. This is clearly stated in the relevant POSIX specifications.I cannot stress this enough. Putting a regular file in non-blocking has ABSOLUTELY no effectsother than changing one bit in the file flags.




File descriptors associated with regular files always select true for ready to read, ready to write, and error conditions.

留意,以上标注来源于Linux手册,可是我本地man select却并没有这段话,只能贴出链接。


假如说,selectpoll只是对磁盘IO不起作用,由于其状况运用是ready,所以每次循环读都会得到成果,那么epoll做的更加决绝:不支撑磁盘IO的句柄,直接报EPERM,经过man epoll_ctl得到:

EPERM The target file fd does not support epoll. This error can occur if fd refers to, for example, a regular file or a directory.


// An error here indicates a failure to register
// with the netpoll system. That can happen for
// a file descriptor that is not supported by
// epoll/kqueue; for example, disk files on
// Linux systems. We assume that any real error
// will show up in later I/O.

所以,在《Go 言语设计与完成》的6.6节,作者重复提及网络轮询器不仅用于监控网络IO,还能用于监控文件IO,这个定论是过错的!至少在Linux环境下,磁盘IO不能运用多路复用。我看有许多小伙伴的博客转述了其观点,恐怕会形成更大的错误传达。

2. Go netpoller

Go是一门跨渠道的编程言语,而不同渠道针对IO多路复用也有不同的完成方法,Go netpoller经过在底层对Linux下的epoll、freeBSD或许MacOS下的kqueue或许Windows下的iocp进行封装,完成运用同步编程形式达到异步执行的效果,完成Go言语的网络轮询器,提升网络IO密集型运用的功率。这儿需求阐明的是,kqueue支撑对磁盘IO做多路复用,可是现在大多数的服务器都是Linux体系的,所以咱们以下评论只针对Linux体系。

2.1 场景

以下,运用Go言语完成一个典型的TCP echo server

package main
import (
func main() {
   lis, err := net.Listen("tcp", ":8080")
   if err != nil {
   for {
      conn, err := lis.Accept()
      if err != nil {
      go func() {
         defer conn.Close()
func handle(conn net.Conn) {
   buf := make([]byte, 1024)
   for {
      n, err := conn.Read(buf)
      if err != nil {
         fmt.Printf("read err: %+v\n", err)
      _, _ = conn.Write(buf[:n])



$ go build -gcflags "-N -l" -o main main.go
$ objdump -d main >> main.i

2.2 net.Listen

main.main -> net.Listen -> net.(*ListenConfig).Listen -> net.(*sysListener).listenTCP -> net.internetSocket -> net.socket -> net.sysSocket,留意这个net.sysSocketnet.sock_cloexec.go中,其调用的函数如下,能够发现,其运用的IO对错堵塞IO。

s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)


var socketFunc        func(int, int, int) (int, error)  = syscall.Socket


func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
   s, err := sysSocket(family, sotype, proto)
   if err != nil {
      return nil, err
   if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
      return nil, err
   if fd, err = newFD(s, family, sotype, net); err != nil {
      return nil, err
   if laddr != nil && raddr == nil {
      switch sotype {
      case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
         if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
            return nil, err
         return fd, nil
      case syscall.SOCK_DGRAM:
         if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
            return nil, err
         return fd, nil
   if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
      return nil, err
   return fd, nil

然后,由于咱们树立的是TCP衔接,所以会走入到listenStream办法,里面调用了体系调用bindlisten(能够看到,Go中的net.Listen封装了本来C言语树立tcp服务时调用体系调用的socket -> bind -> listen三个步骤)。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
   var err error
   if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
      return err
   var lsa syscall.Sockaddr
   if lsa, err = laddr.sockaddr(fd.family); err != nil {
      return err
   if ctrlFn != nil {
      c, err := newRawConn(fd)
      if err != nil {
         return err
      if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
         return err
   if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
      return os.NewSyscallError("bind", err)
   if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
      return os.NewSyscallError("listen", err)
   if err = fd.init(); err != nil {
      return err
   lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
   fd.setAddr(fd.addrFunc()(lsa), nil)
   return nil


2.3 数据结构以及fd.init()做了什么


// Network file descriptor.
type netFD struct {
   pfd poll.FD
   // immutable until Close
   family      int
   sotype      int
   isConnected bool // handshake completed or use of association with peer
   net         string
   laddr       Addr
   raddr       Addr



// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
   // Lock sysfd and serialize access to Read and Write methods.
   fdmu fdMutex
   // System file descriptor. Immutable until Close.
   Sysfd int
   // I/O poller.
   pd pollDesc
   // Writev cache.
   iovecs *[]syscall.Iovec
   // Semaphore signaled when file is closed.
   csema uint32
   // Non-zero if this file has been set to blocking mode.
   isBlocking uint32
   // Whether this is a streaming descriptor, as opposed to a
   // packet-based descriptor like a UDP socket. Immutable.
   IsStream bool
   // Whether a zero byte read indicates EOF. This is false for a
   // message based socket connection.
   ZeroReadIsEOF bool
   // Whether this is a file rather than a network socket.
   isFile bool



type pollDesc struct {
   runtimeCtx uintptr


func (pd *pollDesc) init(fd *FD) error {
   ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
   if errno != 0 {
      return errnoErr(syscall.Errno(errno))
   pd.runtimeCtx = ctx
   return nil


func poll_runtime_pollServerInit() {
func netpollGenericInit() {
   if atomic.Load(&netpollInited) == 0 {
      lockInit(&netpollInitLock, lockRankNetpollInit)
      if netpollInited == 0 {
         atomic.Store(&netpollInited, 1)


func netpollinit() {
   epfd = epollcreate1(_EPOLL_CLOEXEC)
   if epfd < 0 {
      epfd = epollcreate(1024)
      if epfd < 0 {
         println("runtime: epollcreate failed with", -epfd)
         throw("runtime: netpollinit failed")
   r, w, errno := nonblockingPipe()
   if errno != 0 {
      println("runtime: pipe failed with", -errno)
      throw("runtime: pipe failed")
   ev := epollevent{
      events: _EPOLLIN,
   *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
   errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
   if errno != 0 {
      println("runtime: epollctl failed with", -errno)
      throw("runtime: epollctl failed")
   netpollBreakRd = uintptr(r)
   netpollBreakWr = uintptr(w)




2.4 Listener.Accept

Listener.Accept的调用进程为Listener.Accept -> net.(*TCPListener).Accept -> net.(*TCPListener).accept -> net.(*netFD).accept -> poll.(*FD).Accept -> poll.accept -> syscall.Accept。在poll.(*FD).Accept中,做如下处理:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { // 这儿的fd,便是之前listen的fd
   if err := fd.readLock(); err != nil {
      return -1, nil, "", err
   defer fd.readUnlock()
   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return -1, nil, "", err
   for {
      s, rsa, errcall, err := accept(fd.Sysfd) // 运用listen时的fd接纳链接,由于listen fd设置为非堵塞,所以必定回来
      if err == nil { // 假如没有错,阐明真的有衔接,回来
         return s, rsa, "", err
      switch err {
      case syscall.EINTR:
      case syscall.EAGAIN: // 假如回来EAGAIN,判别能够运用poll体系后运用waitRead等候
         if fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil {
      case syscall.ECONNABORTED:
         // This means that a socket on the listen
         // queue was closed before we Accept()ed it;
         // it's a silly error, so try again.
      return -1, nil, errcall, err

pollDesc.waitRead函数是怎样做到等候的呢,其内部调用了pollDesc.wait -> runtime_pollWait -> runtime.poll_runtime_pollWait

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   errcode := netpollcheckerr(pd, int32(mode))
   if errcode != pollNoError {
      return errcode
   // As for now only Solaris, illumos, and AIX use level-triggered IO.
   if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
      netpollarm(pd, mode)
   // 进入 netpollblock 判别是否有等候的IO事情产生
   for !netpollblock(pd, int32(mode), false) {
      errcode = netpollcheckerr(pd, int32(mode))
      if errcode != pollNoError {
         return errcode
      // Can happen if timeout has fired and unblocked us,
      // but before we had a chance to run, timeout has been reset.
      // Pretend it has not happened and retry.
   return pollNoError


func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   gpp := &pd.rg
   if mode == 'w' {
      gpp = &pd.wg
   // set the gpp semaphore to pdWait
   for {
      // Consume notification if already ready.
      if gpp.CompareAndSwap(pdReady, 0) { // 假如当时IO现已ready,那么直接回来true
         return true
      if gpp.CompareAndSwap(0, pdWait) { // 假如没有,则跳出循环
      // Double check that this isn't corrupt; otherwise we'd loop
      // forever.
      if v := gpp.Load(); v != pdReady && v != 0 {
         throw("runtime: double wait")
   // need to recheck error states after setting gpp to pdWait
   // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
   // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
   if waitio || netpollcheckerr(pd, mode) == pollNoError {
   // waitio = false;
   // netpollcheckerr(pd, mode) == pollNoError 此刻一般是建立的,所以gopark住当时的goroutine
      gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
   // be careful to not lose concurrent pdReady notification
   old := gpp.Swap(0)
   if old > pdWait {
      throw("runtime: corrupted polldesc")
   return old == pdReady



func (fd *netFD) accept() (netfd *netFD, err error) {
   d, rsa, errcall, err := fd.pfd.Accept()
   if err != nil {
      if errcall != "" {
         err = wrapSyscallError(errcall, err)
      return nil, err
   if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
      return nil, err
   if err = netfd.init(); err != nil {
      return nil, err
   lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
   netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
   return netfd, nil

2.4 conn.Read/conn.Write


Listener.Accept相同,咱们以conn.Read为例,阅历如下调用:conn.Read -> net.(*conn).Read -> net.(*netFD).Read -> poll.(*FD).Read

func (fd *FD) Read(p []byte) (int, error) {
   if err := fd.readLock(); err != nil {
      return 0, err
   defer fd.readUnlock()
   if len(p) == 0 {
      // If the caller wanted a zero byte read, return immediately
      // without trying (but after acquiring the readLock).
      // Otherwise syscall.Read returns 0, nil which looks like
      // io.EOF.
      // TODO(bradfitz): make it wait for readability? (Issue 15735)
      return 0, nil
   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return 0, err
   if fd.IsStream && len(p) > maxRW {
      p = p[:maxRW]
   for {
      n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) // 测验去读一次,假如读到则回来,由于对错堵塞IO,所以读不到也会回来EAGAIN
      if err != nil {
         n = 0
         if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil { // 相同调用waitRead等候事情
      err = fd.eofError(n, err)
      return n, err



2.5 协程的唤醒


func netpoll(delay int64) gList {
   if epfd == -1 {
      return gList{}
   var waitms int32 // 以下逻辑讲delay转换为 epollwait 的 timeout 值
   if delay < 0 {
      waitms = -1
   } else if delay == 0 {
      waitms = 0
   } else if delay < 1e6 {
      waitms = 1
   } else if delay < 1e15 {
      waitms = int32(delay / 1e6)
   } else {
      // An arbitrary cap on how long to wait for a timer.
      // 1e9 ms == ~11.5 days.
      waitms = 1e9
   var events [128]epollevent
   n := epollwait(epfd, &events[0], int32(len(events)), waitms)
   if n < 0 {
      if n != -_EINTR {
         println("runtime: epollwait on fd", epfd, "failed with", -n)
         throw("runtime: netpoll failed")
      // If a timed sleep was interrupted, just return to
      // recalculate how long we should sleep now.
      if waitms > 0 {
         return gList{}
      goto retry
   var toRun gList // goroutine链表,终究回来给调用方
   for i := int32(0); i < n; i++ {
      ev := &events[i]
      if ev.events == 0 {
      if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { // 时刻过长会被经过管道操作中止 epollwait
         if ev.events != _EPOLLIN {
            println("runtime: netpoll: break fd ready for", ev.events)
            throw("runtime: netpoll: break fd ready for something unexpected")
         if delay != 0 {
            // netpollBreak could be picked up by a
            // nonblocking poll. Only read the byte
            // if blocking.
            var tmp [16]byte
            read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
            atomic.Store(&netpollWakeSig, 0)
      var mode int32
      if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'r'
      if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'w'
      if mode != 0 {
         pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
         pd.setEventErr(ev.events == _EPOLLERR)
         netpollready(&toRun, pd, mode) // 将g加入到行列中
   return toRun


if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
   list := netpoll(delay) // block until new work is available
   atomic.Store64(&sched.pollUntil, 0)
   atomic.Store64(&sched.lastpoll, uint64(nanotime()))
   if faketime != 0 && list.empty() {
      // Using fake time and nothing is ready; stop M.
      // When all M's stop, checkdead will call timejump.
      goto top
   _p_ = pidleget()
   if _p_ == nil { // 假如没有P,将list行列中的一切g加到全局行列中
   } else { // 否则pop出一个g来运转,剩余的依据策略,放置到本P的本地行列或许全局行列中
      if !list.empty() {
         gp := list.pop()
         casgstatus(gp, _Gwaiting, _Grunnable)
         if trace.enabled {
            traceGoUnpark(gp, 0)
         return gp, false
      if wasSpinning {
         _g_.m.spinning = true
         atomic.Xadd(&sched.nmspinning, 1)
      goto top


3. 小结

Go netpoller使用多路IO复+非堵塞IO+Go runtime scheduler打造的原生网络模型,能够防止每次调用都堕入内核中,并且为上层封装了堵塞IO接口,提供goroutine-per-connection这种简略优雅的网络模型,大大提升了网络处理的能力,能够说,Go是一门适用于网络IO密集型运用的言语。

4. 参阅文献

9.2 I/O 多路复用:select/poll/epoll

Go netpoller原生网络模型之源码全面解密
