golang channel 实现原理

用过 go 的都知道 channel,无需多言,直接开整!

1 中心数据结构

golang channel 实现原理

1.1 hchan

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

hchan: channel 数据结构

(1)qcount:当时 channel 中存在多少个元素;

(2)dataqsize: 当时 channel 能存放的元素容量;

(3)buf:channel 中用于存放元素的环形缓冲区;

(4)elemsize:channel 元素类型的巨细;

(5)closed:标识 channel 是否封闭;

(6)elemtype:channel 元素类型;

(7)sendx:发送元素进入环形缓冲区的 index;

(8)recvx:接纳元素所在的环形缓冲区的 index;

(9)recvq:因接纳而堕入堵塞的协程行列;

(10)sendq:因发送而堕入堵塞的协程行列;

1.2 waitq

type waitq struct {
    first *sudog
    last  *sudog
}

waitq:堵塞的协程行列

(1)first:行列头部

(2)last:行列尾部

1.3 sudog

type sudog struct {
    g *g
    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)
    // ...
    isSelect bool
    c        *hchan 
}

sudog:用于包装协程的节点

(1)g:goroutine,协程;

(2)next:行列中的下一个节点;

(3)prev:行列中的前一个节点;

(4)elem: 读取/写入 channel 的数据的容器;

(5)isSelect:标识当时协程是否处在 select 多路复用的流程中;

(6)c:标识与当时 sudog 交互的 chan.

2 结构器函数

golang channel 实现原理

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    // ...
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)
    return
}

(1)判别请求内存空间巨细是否越界,mem 巨细为 element 类型巨细与 element 个数相乘后得到,仅当无缓冲型 channel 时,因个数为 0 导致巨细为 0;

(2)依据类型,初始 channel,分为 无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel;

(3)假使为无缓冲型,则仅请求一个巨细为默认值 96 的空间;

(4)如若有缓冲的 struct 型,则一次性分配好 96 + mem 巨细的空间,而且调整 chan 的 buf 指向 mem 的开始方位;

(5)假使为有缓冲的 pointer 型,则分别请求 chan 和 buf 的空间,两者无需接连;

(6)对 channel 的其他字段进行初始化,包括元素类型巨细、元素类型、容量以及锁的初始化.

3 写流程

3.1 两类反常情况处理

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

(1)关于未初始化的 chan,写入操作会引发死锁;

(2)关于已封闭的 chan,写入操作会引发 panic.

3.2 case1:写时存在堵塞读协程

golang channel 实现原理

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    // ..

(1)加锁;

(2)从堵塞度协程行列中取出一个 goroutine 的封装目标 sudog;

(3)在 send 办法中,会基于 memmove 办法,直接将元素拷贝交给 sudog 对应的 goroutine;

(4)在 send 办法中会完结解锁动作.

3.3 case2:写时无堵塞读协程但环形缓冲区仍有空间

golang channel 实现原理

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    // ...
}

(1)加锁;

(2)将当时元素添加到环形缓冲区 sendx 对应的方位;

(3)sendx++;

(4)qcount++;

(4)解锁,回来.

3.4 case3:写时无堵塞读协程且环形缓冲区无空间

golang channel 实现原理

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    gp.waiting = nil
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

(1)加锁;

(2)结构封装当时 goroutine 的 sudog 目标;

(3)完结指针指向,树立 sudog、goroutine、channel 之间的指向关系;

(4)把 sudog 添加到当时 channel 的堵塞写协程行列中;

(5)park 当时协程;

(6)假使协程从 park 中被唤醒,则收回 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走);

(7)解锁,回来

3.5 写流程全体串联

golang channel 实现原理

4 读流程

4.1 反常 case1:读空 channel

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // ...
}

(1)park 挂起,引起死锁;

4.2 反常 case2:channel 已封闭且内部无元素

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
        // The channel has been closed, but the channel's buffer have data.
    } 
    // ...

(1)直接解锁回来即可

4.3 case3:读时有堵塞的写协程

golang channel 实现原理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    // Just found waiting sender with not closed.
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
     }
}

(1)加锁;

(2)从堵塞写协程行列中获取到一个写协程;

(3)假使 channel 无缓冲区,则直接读取写协程元素,并唤醒写协程;

(4)假使 channel 有缓冲区,则读取缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写写成;

(5)解锁,回来.

4.4 case4:读时无堵塞写协程且缓冲区有元素

golang channel 实现原理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

(1)加锁;

(2)获取到 recvx 对应方位的元素;

(3)recvx++

(4)qcount–

(5)解锁,回来

4.5 case5:读时无堵塞写协程且缓冲区无元素

golang channel 实现原理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    gp.waiting = nil
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

(1)加锁;

(2)结构封装当时 goroutine 的 sudog 目标;

(3)完结指针指向,树立 sudog、goroutine、channel 之间的指向关系;

(4)把 sudog 添加到当时 channel 的堵塞读协程行列中;

(5)park 当时协程;

(6)假使协程从 park 中被唤醒,则收回 sudog(sudog能被唤醒,其对应的元素必然已经被写入);

(7)解锁,回来

4.6 读流程全体串联

golang channel 实现原理

5 堵塞与非堵塞形式

在上述源码剖析流程中,均是以堵塞形式为主线进行讲述,忽略非堵塞形式的有关处理逻辑.

此处说明两个问题:

(1)非堵塞形式下,流程逻辑有何差异?

(2)何时会进入非堵塞形式?

5.1 非堵塞形式逻辑差异

非堵塞形式下,读/写 channel 办法经过一个 bool 型的响应参数,用以标识是否读取/写入成功.

(1)一切需要使得当时 goroutine 被挂起的操作,在非堵塞形式下都会回来 false;

(2)一切是的当时 goroutine 会进入死锁的操作,在非堵塞形式下都会回来 false;

(3)一切能当即完结读取/写入操作的条件下,非堵塞形式下会回来 true.

5.2 何时进入非堵塞形式

默认情况下,读/写 channel 都是堵塞形式,只要在 select 句子组成的多路复用分支中,与 channel 的交互会变成非堵塞形式:

ch := make(chan int)
select{
  case <- ch:
  default:
}

5.3 代码一览

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	return chanrecv(c, elem, false)
}

在 select 句子包裹的多路复用分支中,读和写 channel 操作会被汇编为 selectnbrecv 和 selectnbsend 办法,底层同样复用 chanrecv 和 chansend 办法,但此刻由于第三个入参 block 被设置为 false,导致后续会走进非堵塞的处理分支.

6 两种读 channel 的协议

读取 channel 时,能够依据第二个 bool 型的回来值用以判别当时 channel 是否已处于封闭状态:

ch := make(chan int, 2)
got1 := <- ch
got2,ok := <- ch

实现上述功能的原因是,两种格式下,读 channel 操作会被汇编成不同的办法:

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

7 封闭

golang channel 实现原理

func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    c.closed = 1
    var glist gList
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        glist.push(gp)
    }
    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        glist.push(gp)
    }
    unlock(&c.lock)
    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)

(1)封闭未初始化过的 channel 会 panic;

(2)加锁;

(3)重复封闭 channel 会 panic;

(4)将堵塞读协程行列中的协程节点一致添加到 glist;

(5)将堵塞写协程行列中的协程节点一致添加到 glist;

(6)唤醒 glist 当中的一切协程.