0. 简介

上篇博客提到了goroutine有关体系调用的调度进行了叙述,关于IO密集型的拜访,每次请求都或许导致一次M的创立,这其实是不能承受的。Go SDK为了处理网络IO密集型的运用场景,开发了网络轮询器这一关键组件。

网络轮询器使用了操作体系提供的IO多路复用模型来提升网络设备的使用率以及程序的功能,下面,咱们将分别介绍几种常见的IO模型以及网络轮询器在以Linux为比方的体系上的完成。

1. IO多路复用

所谓IO多路复用便是运用select/poll/epoll这一系列的多路选择器,完成一个线程监控多个文件句柄,一旦某个文件句柄安排妥当(ready),就能够告诉到对应运用程序的读写操作;没有文件句柄安排妥当时,就会堵塞运用程序,从而释放CPU资源。

1.1 select&poll

select完成多路复用的方法是,将需求监听的描绘符都放到一个文件描绘符调集,然后调用select函数将文件描绘符调集复制到内核里,让内核来查看是否有事情产生,查看的方法很粗暴,便是经过遍历文件描绘符调集的方法,当查看到有事情产生后,将此描绘符标记为可读或可写,接着再把整个文件描绘符调集复制回用户态里,然后用户态还需求再经过遍历的办法找到可读或可写的描绘符,然后再对其处理。

所以,关于select方法,在时刻上,需求进行两遍遍历,一次在内核态,一次在用户态,所以其时刻复杂度是O(N);在空间上,会产生两次文件描绘符调集的复制,一次由用户空间复制到内核空间,一次由内核空间复制到用户空间,可是由于select运用的是固定长度的bitsmap,默许最多1024个文件描绘符。

pollselect没有本质区别,只是不再运用bitsmap来存储关注的描绘符,而是采用链表的方法存储,突破了文件描绘符的个数约束,可是随着文件描绘符的个数添加,其O(N)的时刻复杂度也会使得功率越来越低下。

1.2 epoll

int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...);
listen(s, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); // 将一切需求监听的socket添加到epfd中
while(1) {
    int n = epoll_wait(...);
    for(events){
        // 处理逻辑
    }
}

以上是一个很经典的epoll运用逻辑:先用epoll_create创立一个epfd目标,然后经过epoll_ctl将需求监控的文件描绘符放到epfd中,终究调用epoll_wait等候数据。

比较于selectpollepoll很好地处理了前二者在时刻和空间上功率问题:

  1. epoll在内核中运用红黑树跟踪文件描绘符调集,大大缩减了时刻,比较于select/poll遍历调集的 O(N) 的时刻复杂度,红黑树的时刻复杂度是 O(logN)
  2. epoll运用事情驱动的机制(前缀e应该便是event的首字母),当某个文件描绘符有消息时,当用户调用epoll_wait函数时,只会回来有事情产生的文件描绘符的个数,在空间上也大大节省了。

插个题外话,网上不少文章耳食之言,以为epoll运用mmap进行内存映射,从而节省了内存复制,这完全是过错的,能够查看源码,或许参阅epoll以及Epoll到底用没用到mmap之一。

1.3 磁盘IO是否适用多路复用IO

需求阐明的是,以下评论均指产生在Linux体系中的多路复用;国外文件一般不称磁盘IO为disk io,而是regular file

咱们先说定论:磁盘IO不适用多路复用IO!!!

多路复用IO最好搭配非堵塞IO运用,Linux手册有以下一段话:

Under Linux, select() may report a socket file descriptor as “ready for reading”, while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.

谷歌翻译如下:

在 Linux 下,select() 或许会将套接字文件描绘符陈述为“准备好读取”,但随后的读取会堵塞。 例如,这或许产生在数据已抵达但查看时校验和过错并被丢弃的状况下。 或许存在文件描绘符被虚假陈述为安排妥当的其他状况。 因此,在不应堵塞的套接字上运用 O_NONBLOCK 或许更安全。

简略来说,至少多路复用的select方法被官方推荐运用O_NONBLOCK来设置成非堵塞IO。不过,磁盘IO并不支撑非堵塞IO,依据Non-blocking I/O with regular files:

Regular files arealwaysreadable and they are alsoalwayswriteable. This is clearly stated in the relevant POSIX specifications.I cannot stress this enough. Putting a regular file in non-blocking has ABSOLUTELY no effectsother than changing one bit in the file flags.

简略而言,便是磁盘IO永远是可读可写的,非堵塞IO对其没有效果。

可是以上还不是磁盘IO不支撑多路复用的最首要原因,请看以下剖析:

select运用磁盘IO无意义,由于其永远是ready的

File descriptors associated with regular files always select true for ready to read, ready to write, and error conditions.

留意,以上标注来源于Linux手册,可是我本地man select却并没有这段话,只能贴出链接。

epoll直接不支撑磁盘IO

假如说,selectpoll只是对磁盘IO不起作用,由于其状况运用是ready,所以每次循环读都会得到成果,那么epoll做的更加决绝:不支撑磁盘IO的句柄,直接报EPERM,经过man epoll_ctl得到:

EPERM The target file fd does not support epoll. This error can occur if fd refers to, for example, a regular file or a directory.

在Linux环境上,Go中的网络轮询器的底层是依据epoll完成的,所以其根本不支撑磁盘IO的多路复用,任何磁盘IO的操作都会向上篇博客中描绘的那样,堕入内核调用。并且在os.file_unix.go中,在创立文件时会测验将其归入到网络轮询器的体系中,有如下描绘,有些状况下会无法将文件注册到网络轮询器中,比方Linux体系中的磁盘文件。

// An error here indicates a failure to register
// with the netpoll system. That can happen for
// a file descriptor that is not supported by
// epoll/kqueue; for example, disk files on
// Linux systems. We assume that any real error
// will show up in later I/O.

所以,在《Go 言语设计与完成》的6.6节,作者重复提及网络轮询器不仅用于监控网络IO,还能用于监控文件IO,这个定论是过错的!至少在Linux环境下,磁盘IO不能运用多路复用。我看有许多小伙伴的博客转述了其观点,恐怕会形成更大的错误传达。

2. Go netpoller

Go是一门跨渠道的编程言语,而不同渠道针对IO多路复用也有不同的完成方法,Go netpoller经过在底层对Linux下的epoll、freeBSD或许MacOS下的kqueue或许Windows下的iocp进行封装,完成运用同步编程形式达到异步执行的效果,完成Go言语的网络轮询器,提升网络IO密集型运用的功率。这儿需求阐明的是,kqueue支撑对磁盘IO做多路复用,可是现在大多数的服务器都是Linux体系的,所以咱们以下评论只针对Linux体系。

2.1 场景

以下,运用Go言语完成一个典型的TCP echo server

package main
import (
   "fmt"
   "net"
)
func main() {
   lis, err := net.Listen("tcp", ":8080")
   if err != nil {
      panic(err)
   }
   for {
      conn, err := lis.Accept()
      if err != nil {
         panic(err)
      }
      go func() {
         defer conn.Close()
         handle(conn)
      }()
   }
}
func handle(conn net.Conn) {
   buf := make([]byte, 1024)
   for {
      n, err := conn.Read(buf)
      if err != nil {
         fmt.Printf("read err: %+v\n", err)
         return
      }
      _, _ = conn.Write(buf[:n])
   }
}

上述形式是典型的goroutine-per-connection形式,下面,咱们就来看看Go调度器是怎样做到这种形式的。

咱们能够经过以下指令反编译代码

$ go build -gcflags "-N -l" -o main main.go
$ objdump -d main >> main.i

2.2 net.Listen

main.main -> net.Listen -> net.(*ListenConfig).Listen -> net.(*sysListener).listenTCP -> net.internetSocket -> net.socket -> net.sysSocket,留意这个net.sysSocketnet.sock_cloexec.go中,其调用的函数如下,能够发现,其运用的IO对错堵塞IO。

s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)

其实所谓socketFunc便是体系调用socket

var socketFunc        func(int, int, int) (int, error)  = syscall.Socket

以上是整个体系调用的进程,net.socket函数在经过net.sysSocket创立完socket句柄后,会经过newFD函数创立net.netFD目标,而这个目标包括着最重要的网络轮询器的重要目标poll.FD

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
   s, err := sysSocket(family, sotype, proto)
   if err != nil {
      return nil, err
   }
   if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
      poll.CloseFunc(s)
      return nil, err
   }
   if fd, err = newFD(s, family, sotype, net); err != nil {
      poll.CloseFunc(s)
      return nil, err
   }
   if laddr != nil && raddr == nil {
      switch sotype {
      case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
         if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
            fd.Close()
            return nil, err
         }
         return fd, nil
      case syscall.SOCK_DGRAM:
         if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
            fd.Close()
            return nil, err
         }
         return fd, nil
      }
   }
   if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
      fd.Close()
      return nil, err
   }
   return fd, nil
}

然后,由于咱们树立的是TCP衔接,所以会走入到listenStream办法,里面调用了体系调用bindlisten(能够看到,Go中的net.Listen封装了本来C言语树立tcp服务时调用体系调用的socket -> bind -> listen三个步骤)。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
   var err error
   if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
      return err
   }
   var lsa syscall.Sockaddr
   if lsa, err = laddr.sockaddr(fd.family); err != nil {
      return err
   }
   if ctrlFn != nil {
      c, err := newRawConn(fd)
      if err != nil {
         return err
      }
      if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
         return err
      }
   }
   if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
      return os.NewSyscallError("bind", err)
   }
   if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
      return os.NewSyscallError("listen", err)
   }
   if err = fd.init(); err != nil {
      return err
   }
   lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
   fd.setAddr(fd.addrFunc()(lsa), nil)
   return nil
}

然后,以上函数调用了fd.init()来初始化咱们的句柄操作,为了这部分能继续讲下去,也是咱们网络轮询器的重点,接下来,咱们需求介绍一下Go网络轮询器几个重要的结构体。

2.3 数据结构以及fd.init()做了什么

net.netFD

// Network file descriptor.
type netFD struct {
   pfd poll.FD
   // immutable until Close
   family      int
   sotype      int
   isConnected bool // handshake completed or use of association with peer
   net         string
   laddr       Addr
   raddr       Addr
}

net.netFD是网络描绘符,其包括了重要的网络轮询器的结构体pfd,类型是poll.FD

poll.FD

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
   // Lock sysfd and serialize access to Read and Write methods.
   fdmu fdMutex
   // System file descriptor. Immutable until Close.
   Sysfd int
   // I/O poller.
   pd pollDesc
   // Writev cache.
   iovecs *[]syscall.Iovec
   // Semaphore signaled when file is closed.
   csema uint32
   // Non-zero if this file has been set to blocking mode.
   isBlocking uint32
   // Whether this is a streaming descriptor, as opposed to a
   // packet-based descriptor like a UDP socket. Immutable.
   IsStream bool
   // Whether a zero byte read indicates EOF. This is false for a
   // message based socket connection.
   ZeroReadIsEOF bool
   // Whether this is a file rather than a network socket.
   isFile bool
}

poll.FD定义如上,其包括体系的描绘符Sysfd和轮询器的重要结构pd,类型是poll.pollDesc

pollDesc

type pollDesc struct {
   runtimeCtx uintptr
}

这儿的这个struct只包括一个指针,其实这是为了进行不同体系兼容而设置的,3.2中终究剖析到fd.init()函数进行初始化,终究会调用到以下函数:

func (pd *pollDesc) init(fd *FD) error {
   serverInit.Do(runtime_pollServerInit)
   ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
   if errno != 0 {
      return errnoErr(syscall.Errno(errno))
   }
   pd.runtimeCtx = ctx
   return nil
}

以上init函数,在编译条件是linux的景象下,会定位到runtime.netpoll.go中以下函数:

func poll_runtime_pollServerInit() {
   netpollGenericInit()
}
func netpollGenericInit() {
   if atomic.Load(&netpollInited) == 0 {
      lockInit(&netpollInitLock, lockRankNetpollInit)
      lock(&netpollInitLock)
      if netpollInited == 0 {
         netpollinit()
         atomic.Store(&netpollInited, 1)
      }
      unlock(&netpollInitLock)
   }
}

能够看到,以上操作终究又会调用到netpollinit函数,而这个函数会依据体系不同编译不同文件,比方Linux体系中是runtime.netpoll_epoll.go,若是Mac,则会定位到runtime.netpoll_kqueue.go,咱们仍旧选择Linux渠道进行剖析:

func netpollinit() {
   epfd = epollcreate1(_EPOLL_CLOEXEC)
   if epfd < 0 {
      epfd = epollcreate(1024)
      if epfd < 0 {
         println("runtime: epollcreate failed with", -epfd)
         throw("runtime: netpollinit failed")
      }
      closeonexec(epfd)
   }
   r, w, errno := nonblockingPipe()
   if errno != 0 {
      println("runtime: pipe failed with", -errno)
      throw("runtime: pipe failed")
   }
   ev := epollevent{
      events: _EPOLLIN,
   }
   *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
   errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
   if errno != 0 {
      println("runtime: epollctl failed with", -errno)
      throw("runtime: epollctl failed")
   }
   netpollBreakRd = uintptr(r)
   netpollBreakWr = uintptr(w)
}

其间需求留意一点,netpollBreakRdnetpollBreakWr是注册的管道,用于唤醒epollwait函数的,将netpollBreakRd注册到epoll中,假如需求唤醒epollwait堵塞,则使用netpollBreakWr在管道这端写入即可。

终究需求留意的是,在以上init中,还会调用runtime_pollOpen(uintptr(fd.Sysfd))listenFD注册到epoll中。

剖析完以上代码能够发现,在创立tcp服务并且监听的进程中,同时初进行了epoll的相关操作,并且预置了能够唤醒epollwait堵塞的管道。

2.4 Listener.Accept

Listener.Accept的调用进程为Listener.Accept -> net.(*TCPListener).Accept -> net.(*TCPListener).accept -> net.(*netFD).accept -> poll.(*FD).Accept -> poll.accept -> syscall.Accept。在poll.(*FD).Accept中,做如下处理:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { // 这儿的fd,便是之前listen的fd
   if err := fd.readLock(); err != nil {
      return -1, nil, "", err
   }
   defer fd.readUnlock()
   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return -1, nil, "", err
   }
   for {
      s, rsa, errcall, err := accept(fd.Sysfd) // 运用listen时的fd接纳链接,由于listen fd设置为非堵塞,所以必定回来
      if err == nil { // 假如没有错,阐明真的有衔接,回来
         return s, rsa, "", err
      }
      switch err {
      case syscall.EINTR:
         continue
      case syscall.EAGAIN: // 假如回来EAGAIN,判别能够运用poll体系后运用waitRead等候
         if fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil {
               continue
            }
         }
      case syscall.ECONNABORTED:
         // This means that a socket on the listen
         // queue was closed before we Accept()ed it;
         // it's a silly error, so try again.
         continue
      }
      return -1, nil, errcall, err
   }
}

pollDesc.waitRead函数是怎样做到等候的呢,其内部调用了pollDesc.wait -> runtime_pollWait -> runtime.poll_runtime_pollWait

//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
   errcode := netpollcheckerr(pd, int32(mode))
   if errcode != pollNoError {
      return errcode
   }
   // As for now only Solaris, illumos, and AIX use level-triggered IO.
   if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
      netpollarm(pd, mode)
   }
   // 进入 netpollblock 判别是否有等候的IO事情产生
   for !netpollblock(pd, int32(mode), false) {
      errcode = netpollcheckerr(pd, int32(mode))
      if errcode != pollNoError {
         return errcode
      }
      // Can happen if timeout has fired and unblocked us,
      // but before we had a chance to run, timeout has been reset.
      // Pretend it has not happened and retry.
   }
   return pollNoError
}

netpollblock函数如下:

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
   gpp := &pd.rg
   if mode == 'w' {
      gpp = &pd.wg
   }
   // set the gpp semaphore to pdWait
   for {
      // Consume notification if already ready.
      if gpp.CompareAndSwap(pdReady, 0) { // 假如当时IO现已ready,那么直接回来true
         return true
      }
      if gpp.CompareAndSwap(0, pdWait) { // 假如没有,则跳出循环
         break
      }
      // Double check that this isn't corrupt; otherwise we'd loop
      // forever.
      if v := gpp.Load(); v != pdReady && v != 0 {
         throw("runtime: double wait")
      }
   }
   // need to recheck error states after setting gpp to pdWait
   // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
   // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
   if waitio || netpollcheckerr(pd, mode) == pollNoError {
   // waitio = false;
   // netpollcheckerr(pd, mode) == pollNoError 此刻一般是建立的,所以gopark住当时的goroutine
      gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
   }
   // be careful to not lose concurrent pdReady notification
   old := gpp.Swap(0)
   if old > pdWait {
      throw("runtime: corrupted polldesc")
   }
   return old == pdReady
}

经过以上剖析能够看到,在Go中调用Listener.Accept函数后,会首先调用一次体系调用的accept,假如有衔接请求则树立衔接;假如没有衔接请求,那么内核回来EAGAIN过错,那么会经过一系列调用,终究经过gopark函数挂起协程,等候接入事情的到来再唤醒协程。

在被唤醒之后,让咱们回到net.(*netFD).accept函数,接下来会将accept体系操作回来的句柄重新生成一个FD,并且相同注册到epoll中,为之后的read、write操作做衬托。

func (fd *netFD) accept() (netfd *netFD, err error) {
   d, rsa, errcall, err := fd.pfd.Accept()
   if err != nil {
      if errcall != "" {
         err = wrapSyscallError(errcall, err)
      }
      return nil, err
   }
   if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
      poll.CloseFunc(d)
      return nil, err
   }
   if err = netfd.init(); err != nil {
      netfd.Close()
      return nil, err
   }
   lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
   netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
   return netfd, nil
}

2.4 conn.Read/conn.Write

经过前面的剖析咱们知道,net.Listen操作回来的FD被注册到epoll是为了Listener.Accept,而Listener.Accept操作回来的FD注册到epoll中是为了conn.Read/conn.Write的。

Listener.Accept相同,咱们以conn.Read为例,阅历如下调用:conn.Read -> net.(*conn).Read -> net.(*netFD).Read -> poll.(*FD).Read

func (fd *FD) Read(p []byte) (int, error) {
   if err := fd.readLock(); err != nil {
      return 0, err
   }
   defer fd.readUnlock()
   if len(p) == 0 {
      // If the caller wanted a zero byte read, return immediately
      // without trying (but after acquiring the readLock).
      // Otherwise syscall.Read returns 0, nil which looks like
      // io.EOF.
      // TODO(bradfitz): make it wait for readability? (Issue 15735)
      return 0, nil
   }
   if err := fd.pd.prepareRead(fd.isFile); err != nil {
      return 0, err
   }
   if fd.IsStream && len(p) > maxRW {
      p = p[:maxRW]
   }
   for {
      n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) // 测验去读一次,假如读到则回来,由于对错堵塞IO,所以读不到也会回来EAGAIN
      if err != nil {
         n = 0
         if err == syscall.EAGAIN && fd.pd.pollable() {
            if err = fd.pd.waitRead(fd.isFile); err == nil { // 相同调用waitRead等候事情
               continue
            }
         }
      }
      err = fd.eofError(n, err)
      return n, err
   }
}

能够发现,读的时候和Accept操作相似,也会先测验读一次,没读到之后相同调用waitRead挂起协程,营建堵塞读的假象。写的进程很相似,咱们就不剖析了。

以上,咱们剖析了accept和读写操作时,使用poller机制营建出同步堵塞调用的假象,实际上体系运用多路IO的作业进程,那么,协程什么时候会被唤醒呢?

2.5 协程的唤醒

实际上,关于网络轮询器,是经过runtime.netpoll函数去唤醒的,比方之前咱们提过的findrunnable函数,就会调用runtime.netpoll函数去查看是否有ready的读写事情,详细调度咱们就不剖析了,下面看看runtime.netpoll是怎样作业的:

func netpoll(delay int64) gList {
   if epfd == -1 {
      return gList{}
   }
   var waitms int32 // 以下逻辑讲delay转换为 epollwait 的 timeout 值
   if delay < 0 {
      waitms = -1
   } else if delay == 0 {
      waitms = 0
   } else if delay < 1e6 {
      waitms = 1
   } else if delay < 1e15 {
      waitms = int32(delay / 1e6)
   } else {
      // An arbitrary cap on how long to wait for a timer.
      // 1e9 ms == ~11.5 days.
      waitms = 1e9
   }
   var events [128]epollevent
retry:
   n := epollwait(epfd, &events[0], int32(len(events)), waitms)
   if n < 0 {
      if n != -_EINTR {
         println("runtime: epollwait on fd", epfd, "failed with", -n)
         throw("runtime: netpoll failed")
      }
      // If a timed sleep was interrupted, just return to
      // recalculate how long we should sleep now.
      if waitms > 0 {
         return gList{}
      }
      goto retry
   }
   var toRun gList // goroutine链表,终究回来给调用方
   for i := int32(0); i < n; i++ {
      ev := &events[i]
      if ev.events == 0 {
         continue
      }
      if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { // 时刻过长会被经过管道操作中止 epollwait
         if ev.events != _EPOLLIN {
            println("runtime: netpoll: break fd ready for", ev.events)
            throw("runtime: netpoll: break fd ready for something unexpected")
         }
         if delay != 0 {
            // netpollBreak could be picked up by a
            // nonblocking poll. Only read the byte
            // if blocking.
            var tmp [16]byte
            read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
            atomic.Store(&netpollWakeSig, 0)
         }
         continue
      }
      var mode int32
      if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'r'
      }
      if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
         mode += 'w'
      }
      if mode != 0 {
         pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
         pd.setEventErr(ev.events == _EPOLLERR)
         netpollready(&toRun, pd, mode) // 将g加入到行列中
      }
   }
   return toRun
}

其实以上作业首要便是把现已安排妥当的goroutine加入到toRun行列中,而runtime.findrunnable有如下代码处理安排妥当的

if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
   ...
   list := netpoll(delay) // block until new work is available
   atomic.Store64(&sched.pollUntil, 0)
   atomic.Store64(&sched.lastpoll, uint64(nanotime()))
   if faketime != 0 && list.empty() {
      // Using fake time and nothing is ready; stop M.
      // When all M's stop, checkdead will call timejump.
      stopm()
      goto top
   }
   lock(&sched.lock)
   _p_ = pidleget()
   unlock(&sched.lock)
   if _p_ == nil { // 假如没有P,将list行列中的一切g加到全局行列中
      injectglist(&list)
   } else { // 否则pop出一个g来运转,剩余的依据策略,放置到本P的本地行列或许全局行列中
      acquirep(_p_)
      if !list.empty() {
         gp := list.pop()
         injectglist(&list)
         casgstatus(gp, _Gwaiting, _Grunnable)
         if trace.enabled {
            traceGoUnpark(gp, 0)
         }
         return gp, false
      }
      if wasSpinning {
         _g_.m.spinning = true
         atomic.Xadd(&sched.nmspinning, 1)
      }
      goto top
   }
}

经过以上剖析,咱们知道了被park住的协程是如何被唤醒的。

3. 小结

Go netpoller使用多路IO复+非堵塞IO+Go runtime scheduler打造的原生网络模型,能够防止每次调用都堕入内核中,并且为上层封装了堵塞IO接口,提供goroutine-per-connection这种简略优雅的网络模型,大大提升了网络处理的能力,能够说,Go是一门适用于网络IO密集型运用的言语。

4. 参阅文献

9.2 I/O 多路复用:select/poll/epoll

Go netpoller原生网络模型之源码全面解密

Epoll到底用没用到mmap之一