0. 简介
上篇博客提到了goroutine
有关体系调用的调度进行了叙述,关于IO密集型的拜访,每次请求都或许导致一次M的创立,这其实是不能承受的。Go SDK
为了处理网络IO密集型的运用场景,开发了网络轮询器
这一关键组件。
网络轮询器使用了操作体系提供的IO多路复用模型来提升网络设备的使用率以及程序的功能,下面,咱们将分别介绍几种常见的IO模型以及网络轮询器在以Linux为比方的体系上的完成。
1. IO多路复用
所谓IO多路复用便是运用select/poll/epoll
这一系列的多路选择器,完成一个线程监控多个文件句柄,一旦某个文件句柄安排妥当(ready)
,就能够告诉到对应运用程序的读写操作;没有文件句柄安排妥当时,就会堵塞运用程序,从而释放CPU资源。
1.1 select&poll
select
完成多路复用的方法是,将需求监听的描绘符都放到一个文件描绘符调集,然后调用select
函数将文件描绘符调集复制到内核里,让内核来查看是否有事情产生,查看的方法很粗暴,便是经过遍历文件描绘符调集的方法,当查看到有事情产生后,将此描绘符标记为可读或可写,接着再把整个文件描绘符调集复制回用户态里,然后用户态还需求再经过遍历的办法找到可读或可写的描绘符,然后再对其处理。
所以,关于select
方法,在时刻上,需求进行两遍遍历,一次在内核态,一次在用户态,所以其时刻复杂度是O(N);在空间上,会产生两次文件描绘符调集的复制,一次由用户空间复制到内核空间,一次由内核空间复制到用户空间,可是由于select
运用的是固定长度的bitsmap,默许最多1024个文件描绘符。
poll
和select
没有本质区别,只是不再运用bitsmap来存储关注的描绘符,而是采用链表的方法存储,突破了文件描绘符的个数约束,可是随着文件描绘符的个数添加,其O(N)的时刻复杂度也会使得功率越来越低下。
1.2 epoll
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...);
listen(s, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); // 将一切需求监听的socket添加到epfd中
while(1) {
int n = epoll_wait(...);
for(events){
// 处理逻辑
}
}
以上是一个很经典的epoll
运用逻辑:先用epoll_create
创立一个epfd
目标,然后经过epoll_ctl
将需求监控的文件描绘符放到epfd
中,终究调用epoll_wait
等候数据。
比较于select
和poll
,epoll
很好地处理了前二者在时刻和空间上功率问题:
-
epoll
在内核中运用红黑树跟踪文件描绘符调集,大大缩减了时刻,比较于select/poll
遍历调集的 O(N) 的时刻复杂度,红黑树的时刻复杂度是 O(logN); -
epoll
运用事情驱动的机制(前缀e
应该便是event
的首字母),当某个文件描绘符有消息时,当用户调用epoll_wait
函数时,只会回来有事情产生的文件描绘符的个数,在空间上也大大节省了。
插个题外话,网上不少文章耳食之言,以为epoll
运用mmap
进行内存映射,从而节省了内存复制,这完全是过错的,能够查看源码,或许参阅epoll以及Epoll到底用没用到mmap之一。
1.3 磁盘IO是否适用多路复用IO
需求阐明的是,以下评论均指产生在Linux体系中的多路复用;国外文件一般不称磁盘IO为disk io
,而是regular file
。
咱们先说定论:磁盘IO不适用多路复用IO!!!
多路复用IO最好搭配非堵塞IO运用,Linux手册有以下一段话:
Under Linux, select() may report a socket file descriptor as “ready for reading”, while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.
谷歌翻译如下:
在 Linux 下,select() 或许会将套接字文件描绘符陈述为“准备好读取”,但随后的读取会堵塞。 例如,这或许产生在数据已抵达但查看时校验和过错并被丢弃的状况下。 或许存在文件描绘符被虚假陈述为安排妥当的其他状况。 因此,在不应堵塞的套接字上运用 O_NONBLOCK 或许更安全。
简略来说,至少多路复用的select
方法被官方推荐运用O_NONBLOCK
来设置成非堵塞IO。不过,磁盘IO并不支撑非堵塞IO,依据Non-blocking I/O with regular files:
Regular files arealwaysreadable and they are alsoalwayswriteable. This is clearly stated in the relevant POSIX specifications.I cannot stress this enough. Putting a regular file in non-blocking has ABSOLUTELY no effectsother than changing one bit in the file flags.
简略而言,便是磁盘IO永远是可读可写的,非堵塞IO对其没有效果。
可是以上还不是磁盘IO不支撑多路复用的最首要原因,请看以下剖析:
select运用磁盘IO无意义,由于其永远是ready的:
File descriptors associated with regular files always select true for ready to read, ready to write, and error conditions.
留意,以上标注来源于Linux手册,可是我本地man select
却并没有这段话,只能贴出链接。
epoll直接不支撑磁盘IO:
假如说,select
和poll
只是对磁盘IO不起作用,由于其状况运用是ready,所以每次循环读都会得到成果,那么epoll
做的更加决绝:不支撑磁盘IO的句柄,直接报EPERM
,经过man epoll_ctl
得到:
EPERM The target file fd does not support epoll. This error can occur if fd refers to, for example, a regular file or a directory.
在Linux环境上,Go
中的网络轮询器的底层是依据epoll
完成的,所以其根本不支撑磁盘IO的多路复用,任何磁盘IO的操作都会向上篇博客中描绘的那样,堕入内核调用。并且在os.file_unix.go
中,在创立文件时会测验将其归入到网络轮询器的体系中,有如下描绘,有些状况下会无法将文件注册到网络轮询器中,比方Linux体系中的磁盘文件。
// An error here indicates a failure to register
// with the netpoll system. That can happen for
// a file descriptor that is not supported by
// epoll/kqueue; for example, disk files on
// Linux systems. We assume that any real error
// will show up in later I/O.
所以,在《Go 言语设计与完成》的6.6节,作者重复提及网络轮询器不仅用于监控网络IO,还能用于监控文件IO,这个定论是过错的!至少在Linux环境下,磁盘IO不能运用多路复用。我看有许多小伙伴的博客转述了其观点,恐怕会形成更大的错误传达。
2. Go netpoller
Go
是一门跨渠道的编程言语,而不同渠道针对IO多路复用也有不同的完成方法,Go netpoller
经过在底层对Linux下的epoll
、freeBSD或许MacOS下的kqueue
或许Windows下的iocp
进行封装,完成运用同步编程形式达到异步执行的效果,完成Go
言语的网络轮询器,提升网络IO密集型运用的功率。这儿需求阐明的是,kqueue
支撑对磁盘IO做多路复用,可是现在大多数的服务器都是Linux体系的,所以咱们以下评论只针对Linux体系。
2.1 场景
以下,运用Go
言语完成一个典型的TCP echo server
:
package main
import (
"fmt"
"net"
)
func main() {
lis, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
for {
conn, err := lis.Accept()
if err != nil {
panic(err)
}
go func() {
defer conn.Close()
handle(conn)
}()
}
}
func handle(conn net.Conn) {
buf := make([]byte, 1024)
for {
n, err := conn.Read(buf)
if err != nil {
fmt.Printf("read err: %+v\n", err)
return
}
_, _ = conn.Write(buf[:n])
}
}
上述形式是典型的goroutine-per-connection
形式,下面,咱们就来看看Go
调度器是怎样做到这种形式的。
咱们能够经过以下指令反编译代码
$ go build -gcflags "-N -l" -o main main.go
$ objdump -d main >> main.i
2.2 net.Listen
main.main -> net.Listen -> net.(*ListenConfig).Listen -> net.(*sysListener).listenTCP -> net.internetSocket -> net.socket -> net.sysSocket
,留意这个net.sysSocket
在net.sock_cloexec.go
中,其调用的函数如下,能够发现,其运用的IO对错堵塞IO。
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
其实所谓socketFunc
便是体系调用socket
。
var socketFunc func(int, int, int) (int, error) = syscall.Socket
以上是整个体系调用的进程,net.socket
函数在经过net.sysSocket
创立完socket句柄后,会经过newFD
函数创立net.netFD
目标,而这个目标包括着最重要的网络轮询器的重要目标poll.FD
。
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
然后,由于咱们树立的是TCP衔接,所以会走入到listenStream
办法,里面调用了体系调用bind
和listen
(能够看到,Go
中的net.Listen
封装了本来C言语树立tcp服务时调用体系调用的socket -> bind -> listen
三个步骤)。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
然后,以上函数调用了fd.init()
来初始化咱们的句柄操作,为了这部分能继续讲下去,也是咱们网络轮询器的重点,接下来,咱们需求介绍一下Go
网络轮询器几个重要的结构体。
2.3 数据结构以及fd.init()
做了什么
net.netFD
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
net.netFD
是网络描绘符,其包括了重要的网络轮询器的结构体pfd
,类型是poll.FD
。
poll.FD
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
poll.FD
定义如上,其包括体系的描绘符Sysfd
和轮询器的重要结构pd
,类型是poll.pollDesc
。
pollDesc
type pollDesc struct {
runtimeCtx uintptr
}
这儿的这个struct只包括一个指针,其实这是为了进行不同体系兼容而设置的,3.2中终究剖析到fd.init()
函数进行初始化,终究会调用到以下函数:
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
以上init
函数,在编译条件是linux
的景象下,会定位到runtime.netpoll.go
中以下函数:
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
能够看到,以上操作终究又会调用到netpollinit
函数,而这个函数会依据体系不同编译不同文件,比方Linux体系中是runtime.netpoll_epoll.go
,若是Mac,则会定位到runtime.netpoll_kqueue.go
,咱们仍旧选择Linux渠道进行剖析:
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
其间需求留意一点,netpollBreakRd
和netpollBreakWr
是注册的管道,用于唤醒epollwait
函数的,将netpollBreakRd
注册到epoll
中,假如需求唤醒epollwait
堵塞,则使用netpollBreakWr
在管道这端写入即可。
终究需求留意的是,在以上init
中,还会调用runtime_pollOpen(uintptr(fd.Sysfd))
将listenFD
注册到epoll
中。
剖析完以上代码能够发现,在创立tcp
服务并且监听的进程中,同时初进行了epoll
的相关操作,并且预置了能够唤醒epollwait
堵塞的管道。
2.4 Listener.Accept
Listener.Accept
的调用进程为Listener.Accept -> net.(*TCPListener).Accept -> net.(*TCPListener).accept -> net.(*netFD).accept -> poll.(*FD).Accept -> poll.accept -> syscall.Accept
。在poll.(*FD).Accept
中,做如下处理:
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { // 这儿的fd,便是之前listen的fd
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd) // 运用listen时的fd接纳链接,由于listen fd设置为非堵塞,所以必定回来
if err == nil { // 假如没有错,阐明真的有衔接,回来
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN: // 假如回来EAGAIN,判别能够运用poll体系后运用waitRead等候
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
而pollDesc.waitRead
函数是怎样做到等候的呢,其内部调用了pollDesc.wait -> runtime_pollWait -> runtime.poll_runtime_pollWait
:
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 判别是否有等候的IO事情产生
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
netpollblock
函数如下:
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) { // 假如当时IO现已ready,那么直接回来true
return true
}
if gpp.CompareAndSwap(0, pdWait) { // 假如没有,则跳出循环
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// waitio = false;
// netpollcheckerr(pd, mode) == pollNoError 此刻一般是建立的,所以gopark住当时的goroutine
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
经过以上剖析能够看到,在Go
中调用Listener.Accept
函数后,会首先调用一次体系调用的accept
,假如有衔接请求则树立衔接;假如没有衔接请求,那么内核回来EAGAIN
过错,那么会经过一系列调用,终究经过gopark
函数挂起协程,等候接入事情的到来再唤醒协程。
在被唤醒之后,让咱们回到net.(*netFD).accept
函数,接下来会将accept
体系操作回来的句柄重新生成一个FD,并且相同注册到epoll
中,为之后的read、write操作做衬托。
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
2.4 conn.Read/conn.Write
经过前面的剖析咱们知道,net.Listen
操作回来的FD被注册到epoll
是为了Listener.Accept
,而Listener.Accept
操作回来的FD注册到epoll
中是为了conn.Read/conn.Write
的。
和Listener.Accept
相同,咱们以conn.Read
为例,阅历如下调用:conn.Read -> net.(*conn).Read -> net.(*netFD).Read -> poll.(*FD).Read
:
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) // 测验去读一次,假如读到则回来,由于对错堵塞IO,所以读不到也会回来EAGAIN
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil { // 相同调用waitRead等候事情
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
能够发现,读的时候和Accept操作相似,也会先测验读一次,没读到之后相同调用waitRead
挂起协程,营建堵塞读的假象。写的进程很相似,咱们就不剖析了。
以上,咱们剖析了accept
和读写操作时,使用poller
机制营建出同步堵塞调用的假象,实际上体系运用多路IO的作业进程,那么,协程什么时候会被唤醒呢?
2.5 协程的唤醒
实际上,关于网络轮询器,是经过runtime.netpoll
函数去唤醒的,比方之前咱们提过的findrunnable
函数,就会调用runtime.netpoll
函数去查看是否有ready
的读写事情,详细调度咱们就不剖析了,下面看看runtime.netpoll
是怎样作业的:
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32 // 以下逻辑讲delay转换为 epollwait 的 timeout 值
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList // goroutine链表,终究回来给调用方
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { // 时刻过长会被经过管道操作中止 epollwait
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
netpollready(&toRun, pd, mode) // 将g加入到行列中
}
}
return toRun
}
其实以上作业首要便是把现已安排妥当的goroutine
加入到toRun
行列中,而runtime.findrunnable
有如下代码处理安排妥当的
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
...
list := netpoll(delay) // block until new work is available
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if faketime != 0 && list.empty() {
// Using fake time and nothing is ready; stop M.
// When all M's stop, checkdead will call timejump.
stopm()
goto top
}
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ == nil { // 假如没有P,将list行列中的一切g加到全局行列中
injectglist(&list)
} else { // 否则pop出一个g来运转,剩余的依据策略,放置到本P的本地行列或许全局行列中
acquirep(_p_)
if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
}
经过以上剖析,咱们知道了被park
住的协程是如何被唤醒的。
3. 小结
Go netpoller
使用多路IO复+非堵塞IO+Go runtime scheduler
打造的原生网络模型,能够防止每次调用都堕入内核中,并且为上层封装了堵塞IO接口,提供goroutine-per-connection
这种简略优雅的网络模型,大大提升了网络处理的能力,能够说,Go
是一门适用于网络IO密集型运用的言语。
4. 参阅文献
9.2 I/O 多路复用:select/poll/epoll
Go netpoller原生网络模型之源码全面解密
Epoll到底用没用到mmap之一