在上一篇文章《深化了解 go chan》中,咱们讲解了 chan 相关的一些概念、原理等东西, 今日让咱们再深化一下,读一下它的源码,看看底层实践上是怎样完结的。

全体规划

咱们能够从以下三个角度看 chan 的规划(源码位于 runtime/chan.go,结构体 hchan 便是 chan 的底层数据结构):

  • 存储:chan 里边的数据是经过一个环形行列来存储的(实践上是一个数组,可是咱们视作环形行列来操作。无缓冲 chan 不必存储,会直接从 sender 仿制到 receiver
  • 发送:数据发送到 chan 的时分,假如 chan 满了,则会将发送数据的协程挂起,将其放入一个协程行列中,chan 空闲的时分会唤醒这个协程行列。假如 chan 没满,则发送行列为空。
  • 接纳:从 chan 中接纳数据的时分,假如 chan 是空的,则会将接纳数据的协程挂起,将其放入一个协程行列中,当 chan 有数据的时分会唤醒这个协程行列。假如 chan 有数据,则接纳行列为空。

文中一些比较要害的名词解释:

  • sender: 标明测验写入 changoroutine
  • receiver: 标明测验从 chan 读取数据的 goroutine
  • sendq 是一个行列,存储那些测验写入 channel 但被堵塞的 goroutine
  • recvq 是一个行列,存储那些测验读取 channel 但被堵塞的 goroutine
  • g 标明一个协程。
  • gopark 是将协程挂起的函数,协程状况:_Grunning => _Gwaiting
  • goready 是将协程改为可运转状况的函数,协程状况: _Gwaiting => _Grunnable

现在,假设咱们有下面这样的一段代码,经过这段代码,咱们能够大约看一下 chan 的总体规划:

package main
func main() {
   // 创立一个缓冲区巨细为 9 的 chan
   ch := make(chan int, 9)
   // 往 chan 写入 [1,2,3,4,5,6,7]
   for i := 0; i < 7; i++ {
      ch <- i + 1
   }
   // 将 1 从缓冲区移出来
   <-ch
}

现在,咱们的 chan 大约长得像下面这个姿态,后边会具体展开将这个图中的一切元素:

go chan 设计与实现

上图为了阐明而在 recvq 和 sendq 都画了 3 个 G,但实践上 recvq 和 sendq 至少有一个为空。由于不行能有协程正在等候接纳数据的时分,还有协程的数据由于发不出去数据而堵塞。

数据结构

在底层,go 是运用 hchan 这个结构体来标明 chan 的,下面是结构体的界说:

type hchan struct {
   qcount   uint           // 缓冲区(环形行列)元素个数
   dataqsiz uint           // 缓冲区的巨细(最多可容纳的元素个数)
   buf      unsafe.Pointer // 指向缓冲区进口的指针(从 buf 开端 qcount * elemsize 巨细的内存便是缓冲区所用的内存)
   elemsize uint16         // chan 对应类型元素的巨细(首要用以核算第 i 个元素的内存地址)
   closed   uint32         // chan 是否现已封闭(0-未封闭,1-已封闭)
   elemtype *_type         // chan 的元素类型
   sendx    uint           // chan 发送操作处理到的方位
   recvx    uint           // chan 接纳操作处理到的方位
   recvq    waitq          // 等候接纳数据的协程行列(双向链表)
   sendq    waitq          // 等候发送数据的协程行列(双向链表)
   // 锁
   lock mutex
}

waitq 的数据结构如下:

type waitq struct {
   first *sudog
   last  *sudog
}

waitq 用来保存堵塞在等候或接纳数据的协程列表(是一个双向链表),在免除堵塞的时分,需求唤醒这两个行列中的数据。

对应上图各字段具体阐明

hchan,关于 hchan 这个结构体,咱们知道,在 go 里边,结构体字段是存储在一段接连的内存上的(能够看看《深化了解 go unsafe》),所以图中用了接连的一段单元格标明。

下面是各字段阐明:

  • qcount: 写入 chan 缓冲区元素个数。咱们的代码往 chan 中存入了 7 个数,然后从中取出了一个数,终究还剩 6 个,因而 qcount6
  • dataqsiz: hchan 缓冲区的长度。它在内存中是接连的一段内存,是一个数组,是经过 make 创立的时分传入的,是 9
  • bufhchan 缓冲区指针。指向了一个数组,这个数组便是用来保存发送到 chan 的数据的。
  • sendxrecvx:写、读操作的下标。指向了 buf 指向的数组中的下标,sendx 是下一个发送操作保存的下标,recvx 是下一个接纳操作的下标。
  • recvqsendq: 堵塞在 chan 读写上的协程列表。底层是双向链表,链表的元素是 sudogsudog 是一个对 g 的封装),咱们能够简略地了解为 recvqsendq 的元素便是 g(协程)。

g 和 sudog 是什么?

上面提到了 gsudogg 是底层用来标明协程的结构体,而 sudog 是对 g 的封装,记录了一些额外的信息,比方相关的 hchan

在 go 里边,协程调度的模型是 GMP 模型,G 代表协程、M 代表线程、P 标明协程调度器。我上图里边的 G 便是代表协程(当然,实践上是 sudog)。 还有一个下面会提到的便是 g0g0 标明 P 上启动的第一个协程。

GMP 模型是别的一个庞大的话题了,大家能够自行去了解一下,对了解本文也很有好处。由于在 chan 堵塞的时分实践上也是一个协程调度的进程。 具体来说,便是从 g 的栈切换到 g0 的栈,然后从头进行协程调度。这个时分 g 由于从运转状况修正为了等候状况,所以在协程调度中不会将它调度来履行, 而是会去找其他可履行的协程来履行。

创立 chan

咱们的 make(chan int, 9) 终究会调用 makechan 办法:

// chantype 是 chan 元素类型,size 是缓冲区巨细
func makechan(t *chantype, size int) *hchan {
   elem := t.elem
   // compiler checks this but be safe.
   // 检查元素个数是否合法(不能超过 1<<16 个)
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   // 判别内存是否对齐
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }
   // mem 是 chan 缓冲区(环形行列)所需求的内存巨细
   // mem = 元素巨细 * 元素个数
   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }
   // 界说 hchan
   var c *hchan
   switch {
   case mem == 0:
      // 行列或许元素巨细是 0(比方 make(chan int, 0))
      // 只需求分配 hchan 所需求的内存
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // ...
   case elem.ptrdata == 0:
      // elem 类型里边不包含指针
      // 分配的内存 = hchan 所需内存 + 缓冲区内存
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      // 分配的是接连的一段内存,缓冲区内存在 hchan 后边
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // 元素类型里边包含指针
      c = new(hchan)
      // buf 需求别的分配内存
      c.buf = mallocgc(mem, elem, true)
   }
   // 单个元素的巨细
   c.elemsize = uint16(elem.size)
   // 元素类型
   c.elemtype = elem
   // 缓冲区巨细
   c.dataqsiz = uint(size)
   // ...
}

创立 chan 的进程首要便是给 hchan 分配内存的进程:

  • 非缓冲 chan,只需求分配 hchan 结构体所需求的内存,无需分配环形行列内存(数据会直接从 sender 仿制到 receiver
  • 缓冲 chan(不包含指针),分配 hchan 所需求的内存和环形行列所需求的内存,其间 buf 会紧挨着 hchan
  • 缓冲 chan(含指针),hchan 和环形行列所需求的内存单独进行分配

对应到文章最初的图便是,底下的 hchanbuf 那两段内存。

发送数据

<- 语法糖

在《深化了解 go chan》中,咱们说也过,<- 这个操作符号是一种语法糖, 实践上,<- 会被编译成一个函数调用,关于发送操作而言,c <- x 会编译为对下面的函数的调用:

// elem 是被发送到 chan 的数据的指针。
// 关于 ch <- x,ch 对应参数中的 c,unsafe.Pointer(&x) 对应参数中的 elem。
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

别的,关于 select 里边的调用,chansend 会回来一个布尔值给 select 用来判别是否是要选中当时 case 分支。 假如 chan 发送成功,则回来 true,则 select 的那个分支得以履行。(select...case 本质上是 if...else,回来 false 标明判别失利。)

chansend 第二个参数的意义

chansend 第二个参数 true 标明是一个堵塞调用,别的一种是在 select 里边的发送操作,在 select 中的操作对错堵塞的。

package main
func main() {
	ch := make(chan int, 2)
	ch <- 1 // 假如 ch 满了,会堵塞
	select {
	case ch <- 3: // 非堵塞
	}
}

select 中对 chan 的读写对错堵塞的,不会导致当时协程堵塞,假如是由于 chan 满或许空无法发送或接纳, 则不会导致堵塞在 case 的某一个分支上,还能够持续判别其他 case 分支。

select 中的 send 完结:

// go 代码:
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// 实践效果:
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
// select 里边往 chan 发送数据的分支,回来的 selected 标明当时的分支是否被选中
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

chansend 发送完结

  1. 发送到 nil chanselect 中发送不堵塞,其他状况堵塞)

假如是在 selectcase 里边发送,则不会堵塞,其他状况会导致当时 goroutine 挂起,永久堵塞

go chan 设计与实现

示例代码:

// 下面的代码运转会报错:
var ch chan int
// 发送到 nil chan 会永久堵塞
ch <- 1
select {
// 这个发送失利,可是不会堵塞,可持续判别其他分支。
case ch <- 3:
}
  1. 发送到满了的 chanselect 中发送不堵塞,其他状况堵塞)

关于无缓冲而且又没有 receiver,或许是有缓冲可是缓冲满了的状况,发送也会堵塞(咱们称其为 full,也便是满了,满了的 chan 是放不下任何数据了的,所以就无法再往 chan 发送数据了):

receiver 标明等候从 chan 接纳数据的协程。

go chan 设计与实现

关于满了的 chan,什么时分能够再次发送呢?那便是receiver 接纳数据的时分chan 之所以会满便是由于没有 receiver,也便是没有从 chan 接纳数据的协程。

A. 关于无缓冲的 chan,在满了的状况下,当有 receiver 来读取数据的时分,数据会直接从 sender 仿制到 receiver 中:

go chan 设计与实现

B. 关于有缓冲,可是缓冲满了的状况(图中 chan 满了,而且有两个 g 正在等候写入 chan):

go chan 设计与实现

这个发送进程大约如下:

  • receiverchan 中获取到 chan 队头元素,然后 chan 的队头元素出队。
  • 发送行列 sendq 仇人元素出队,将其要发送的数据写入到 chan 缓冲中。最后,sendq 只剩下一个等候写入 chang

示例代码:

package main
// 留意:以下代码可能不能正常履行,仅仅为了描绘问题。
func main() {
   // 状况 2.A.
   var ch1 = make(chan int) // 无缓冲的 chan
   ch1 <- 1                 // 堵塞
   select {
   // 不堵塞,可是不会履行这个分支
   case ch1 <- 1:
   }
   // 状况 2.B.
   var ch2 = make(chan int, 1) // 有缓冲,缓冲区容量为 1
   ch2 <- 1                    // 1 写入之后,ch2 的缓冲区满了
   go func() {
      ch2 <- 2 // 堵塞,调用 gopark 挂起
   }()
   go func() {
      ch2 <- 3 // 堵塞
   }()
   select {
   // 不堵塞,可是不会履行这个分支
   case ch2 <- 4:
   }
}
  1. 发送到有缓冲,可是缓冲还没满的 chan(不堵塞,发送成功)

这种状况比较简略,便是将 sender 要发送的数据写入到 chan 缓冲区:

go chan 设计与实现

示例代码:

var ch = make(chan int, 1)
// 不堵塞,1 写入 chan 缓冲区
ch <- 1

chansend 源码解读

堵塞形式下,在发送的进程中,假如遇到无法发送成功的状况,会调用 gopark 来将协程挂起,然后当时协程堕入堵塞状况。

非堵塞形式下(select),在发送进程中,任何无法发送的状况,都会直接回来 false,标明发送失利。

// 参数阐明:
// c 标明 hchan 实例
// ep 标明要发送的数据所在的地址
// block 是否是堵塞形式(select 句子的 case 里边的发送对错堵塞形式,其他状况是堵塞形式)
// 非堵塞形式下,遇到无法发送的状况,会回来 false。堵塞形式下,遇到无法发送的状况,协程会挂起。
// 回来值:标明是否发送成功。false 的时分,假如是 select 的 case,则标明没有选中这个 case。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 状况 1:nil chan
   if c == nil {
      // select 句子里边发送数据到 chan 的操作失利,直接回来 false,标明当时的 case 没有被选中。
      if !block {
         // select 分支没有被选中
         return false
      }
      // 堵塞形式,协程挂起
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
   // ... 其他代码...
   // 不获取锁的状况下快速失利。select 中 chan 满了的时分无法发送成功,直接回来 false,协程无需挂起。
   // 场景:非堵塞形式、chan 未封闭、chan 已满(无缓冲且没有接纳数据的协程、或许有缓冲可是缓冲区满)
   if !block && c.closed == 0 && full(c) {
      return false
   }
   // ... 其他代码...
   // 获取锁
   lock(&c.lock)
   // 假如 chan 现已封闭,则开释锁并 panic,不能往一个现已封闭的 chan 发送数据
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }
   // 状况 2.A,又或许是有缓冲可是缓冲区空,有一个正在等候接纳数据的 receiver。
   // 假如有协程在等候接纳数据(阐明 chan 缓冲区空、或许 chan 是无缓冲的)
   // 则直接将元素传递给这个接纳数据的协程,这样就避免了 sender -> chan -> receiver 这个数据仿制的进程,效率更高。
   // 回来 true 标明 select 的分支能够履行(发送成功)
   if sg := c.recvq.dequeue(); sg != nil {
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
   // 状况 3,发送到缓冲 chan,且 chan 未满
   // 没有协程在等候接纳数据。
   // 缓冲区还有空余,则将数据写入到 chan 的缓冲区
   if c.qcount < c.dataqsiz {
      // 获取写入的地址
      qp := chanbuf(c, c.sendx)
      // 经过内存仿制的方式写入
      typedmemmove(c.elemtype, qp, ep)
      // 写入的下标指向下一个方位
      c.sendx++
      // 假如到超出环形行列尾了,则指向第一个方位
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // chan 里边的元素个数加上 1
      c.qcount++
      // 开释锁
      unlock(&c.lock)
      // 发送成功,回来 true
      return true
   }
   // 没有协程在接纳数据,而且缓冲区满了。
   // 假如是 select 句子里边的发送,则开释锁,回来 false
   if !block {
      unlock(&c.lock)
      return false
   }
   // 发不出去,当时协程堵塞。
   // 堵塞形式下,缓冲区满了,需求将当时协程挂起。
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   mysg.elem = ep // chan 要操作的元素指针
   mysg.waitlink = nil
   mysg.g = gp           // sudog 上的 g 特点
   mysg.isSelect = false // 假如是 select,上面现已回来了,因而这里是 false
   mysg.c = c            // sudog 上的 c 特点
   gp.waiting = mysg     // g 正在等候的 sudog
   gp.param = nil        // 当通道操作唤醒被堵塞的 goroutine 时,它将 param 设置为指向已完结的堵塞操作的 sudog
   c.sendq.enqueue(mysg) // 将 sudog 放入发送行列
   // 在 chan 读写上堵塞的标志
   gp.parkingOnChan.Store(true)
   // 最要害的一步:将当时协程挂起
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // 保证 ep 指向的地址不被垃圾收回器收回
   KeepAlive(ep)
   // ...被唤醒了之后的一些收尾操作...
   return true
}
// 参数阐明:c 是 chan 实例,sg 是等候接纳数据的 g,ep 是被发送进 chan 的数据,unlockf 是开释锁的函数。
// 空 chan 上发送,会直接发送给等候接纳数据的协程。
// ep 指向的值会被仿制到 sg 中(ep -> sg,ep 是被发送的值,sg 是要接纳数据的 g)。
// 接纳数据的协程会被唤醒。
// 通道 c 有必要是空的而且获取了锁。send 会经过 unlockf 来开释锁。
// sg 有必要已从 c 中退出行列(从 recvq 这个接纳行列中移除)。
// ep 有必要不能为 nil,一起指向堆或许调用者的栈。
// sg 是接纳行列上的 g。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // ...其他代码...
   // 假如没有疏忽回来值,将值直接从 ep 仿制到 sg 中
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   // 开释锁
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 最要害的一步:唤醒等候行列中的那个接纳到数据的 g
   //(也便是之前由于接纳不到数据而被堵塞的那个 g)
   goready(gp, skip+1)
}
// 参数:t 是 chan 的元素类型,sg 是接纳数据的 g(协程),src 是被发送的数据的指针。
// 场景:无缓冲 chan、有缓冲可是缓冲区没数据。
// 效果:将数据直接从发送数据的协程仿制到接纳数据的协程。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
   dst := sg.elem
   typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
   // 将 ep 的值直接仿制到 sg 中
   memmove(dst, src, t.size)
}
// full 陈述 c 上的发送是否会堵塞(即通道已满)。
func full(c *hchan) bool {
   // c.dataqsiz 是不行变的(创立 chan 后不会再去修正)
   // 因而在 chan 操作期间的任何时间读取都是安全的。
   if c.dataqsiz == 0 {
      // 假如对错缓冲 chan,则看接纳行列有没有数据,有则标明满了(没有正在发送的 g)
      return c.recvq.first == nil
   }
   // 假如是缓冲 chan,只需求比较实践元素总数跟缓冲区容量即可
   return c.qcount == c.dataqsiz
}

接纳数据

<- 语法糖

在发送数据的那一节咱们提到了,ch <- x 编译之后,实践上是对 chansend1 的函数调用。同样的,在接纳数据的时分, <- 这个操作符也会根据不同状况编译成不同的函数调用:

// elem 是用来保存从 c 中接纳到的值的地址的指针
// <- c 编译器处理之后实践上便是下面的这个函数调用。(从通道接纳,可是疏忽接纳到的值)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
   chanrecv(c, elem, true)
}
// received 标明是否是从 chan 中接纳到的(假如 chan 封闭,则接纳到的是零值,received 是 false)
// v, ok := <-c 编译之后的函数(从通道接纳,第一个 v 对应 elem,第二个 ok 对应 received)
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   _, received = chanrecv(c, elem, true)
   return
}
// select 里边的接纳操作:
//
// select {
// case v, ok = <-c:
//    ... foo
// default:
//    ... bar
// }
//
// 实践 go 完结
//
// if selected, ok = selectnbrecv(&v, c); selected {
//    ... foo
// } else {
//    ... bar
// }
//
// select 里边从 chan 接纳数据的分支,回来的 selected 标明当时的分支是否被选中,received 标明是否有数据被接纳到
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
   return chanrecv(c, elem, false)
}

还需求再提醒一下的是:chan 封闭之后,而且 chan 缓冲区一切数据被接纳完之后,received 才会是 false,并不是一封闭 received 马上回来 false

chanrecv 函数 block 参数的意义

chansend 中的 block 参数的效果相同,用来判别是否是 select 形式下的接纳操作,假如是,则在需求堵塞的时分不会堵塞,取而代之的是直接回来。

chanrecv 接纳数据完结

  1. nil chan 接纳(select 中接纳不堵塞,其他状况堵塞)

nil chan 中读取的时分,假如是堵塞形式,会调用 gopark 将协程堵塞起来。

go chan 设计与实现

示例代码:

var ch chan int
<-ch
  1. 从空 chan 接纳(select 中接纳不堵塞,其他状况堵塞)

go chan 设计与实现

判别空的条件为:无缓冲而且没有等候发送数据的 g,或许有缓冲可是缓冲区无数据。

示例代码:

package main
// 留意:以下代码履行不了,仅仅展现一下实践中对应的代码
func main() {
	// 状况 1,无缓冲的 chan,空的
	var ch1 = make(chan int)
	<-ch1 // 堵塞
	select {
	// 不堵塞,可是该分支不会履行
	case <-ch1:
	}
	// 状况 2,有缓冲的 chan,空的
	var ch2 = make(chan int, 1)
	<-ch2 // 堵塞
	select {
	// 不堵塞,可是该分支不会履行
	case <-ch2:
	}
}
  1. 从缓冲区满的 chan 接纳(不会堵塞,这个时分 sendq 一定不为空)

这种状况不会堵塞,上面现已有图了,这里不再贴了。

  1. 从缓冲区不满的 chan 接纳(不会堵塞)

go chan 设计与实现

示例代码:

package main
func main() {
	var ch = make(chan int, 2)
	ch <- 1
	// 从缓冲区没满的 chan 接纳
	<-ch
}

chanrecv 源码解读

chanrecv 函数:

  • 参数:cchan 实例,ep 是用来接纳数据的指针,block 标明是否是堵塞形式。
  • 回来值:selected 标明 select 句子的 case 是否被选中,received 标明接纳到的值是否有用。
  • 功用:从 c 这个通道接纳数据,一起将接纳到的数据写入到 ep 里。

概览:

  • ep 可能是 nil,这意味着接纳到的值被疏忽了(对应 <-c 这种形式的接纳)。
  • 假如对错堵塞形式,而且通道无数据,回来 (false, false),也便是 select 句子中的 case 不会被选中。
  • 不然,假如 c 封闭了,会对 ep 指向的地址设置零值,然后回来 (true, false)。假如是 select 句子,意味被选中,
  • 可是 receivedfalse 标明回来的数不是通道封闭之前发送的。
  • 不然,将从通道中获取到的值写入 ep 指向的地址,而且回来 (true, true)
  • 一个非 nilep 有必要指向堆或许调用者的栈。
// 从 c 读取数据,写入到 ep 指向的地址。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // ...
   // c 是 nil chan
   if c == nil {
      // select 里边的 case 不会被选中
      if !block {
         return
      }
      // 堵塞形式时,协程挂起
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      // 在实践履行的时分,假如其他协程都履行完了,只剩下这一个协程(又或许悉数协程都是睡眠状况,而且无法被唤醒的那种),那么会报错:
      // "fatal error: all goroutines are asleep - deadlock!"
      throw("unreachable")
   }
   // 假如对错堵塞形式(select),而且 c 是空的
   if !block && empty(c) {
      // chan 未封闭,而且是空的,回来 false,false
      if atomic.Load(&c.closed) == 0 {
         return
      }
      // chan 现已封闭,而且 chan 是空的
      if empty(c) {
         // ...
         // 回来一个零值
         if ep != nil {
            typedmemclr(c.elemtype, ep)
         }
         // select 分支被选中,可是回来值是无效的,是一个零值
         return true, false
      }
   }
   // ...
   // 获取锁
   lock(&c.lock)
   // chan 已封闭
   if c.closed != 0 {
      // chan 现已封闭,一起也没有数据
      if c.qcount == 0 {
         // ...
         // 开释锁
         unlock(&c.lock)
         if ep != nil {
            // 设置零值
            typedmemclr(c.elemtype, ep)
         }
         // select 的分支被选中,可是回来值无效
         return true, false
      }
   } else {
      // chan 未封闭,而且有一个等候发送的元素(对应状况:chan 是满的或许无缓冲而且没有 receiver)
      // 假如无缓冲:则将元素直接从 sender 仿制到 receiver 中。
      // 不然:意味着 c 的缓冲区满了,从环形行列中接纳值,将 sg 需求发送的值添加到环形行列尾,
      //        实践上这个时分,行列头和行列尾都是同一个方位,由于行列满了。
      //    只不过,行列头和行列尾指向的方位会发生变化(都加 1,然后对缓冲区长度取模)。
      if sg := c.sendq.dequeue(); sg != nil {
         // 找到一个 sender。
         // 假如无缓冲,直接从 sender 仿制到 receiver
         // 不然,环形行列仇人元素仿制给 receiver,sender 要发送的元素仿制进环形行列队尾。
         recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
         // select 分支被选中,接纳成功,而且接纳的值是有用的。
         return true, true
      }
   }
   // 缓冲区有数据,而且缓冲区没满
   if c.qcount > 0 {
      // qp 是被接纳元素的地址
      qp := chanbuf(c, c.recvx)
      // ...
      // 将 qp 指向的值仿制到 ep
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // 清空行列中 ep 的空间(设置为零值)
      typedmemclr(c.elemtype, qp)
      // 被接纳的下标指向下一个元素
      c.recvx++
      // 环形行列,回到最初
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      // 缓冲区长度减 1
      c.qcount--
      // 开释锁
      unlock(&c.lock)
      // select 分支被选中,而且接纳的值是有用的。
      return true, true
   }
   // 缓冲区空的,而且对错堵塞(select)
   if !block {
      // 开释锁
      unlock(&c.lock)
      // 回来 false,false
      return false, false
   }
   // 缓冲区空,而且是堵塞形式,一起没有等候发送的 g
   // 没有 sender,堵塞
   gp := getg()
   mysg := acquireSudog()
   // ...
   // c 的 recvq,也便是等候接纳的行列,在队尾添加当时的 g
   c.recvq.enqueue(mysg)
   // ...
   // g 挂起,等候下一个发送数据的协程
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
   // ... 被唤醒后的操作 ...
   return true, success
}
// recv 处理缓冲区已满的 chan 的接纳操作(或许无缓冲,这个函数处理这两种状况)。
// 有两部分:
//  1. 等候发送数据的协程(sender),会将其要发送的数据放入 chan 中,然后这个协程会被唤醒
//  2. 被接纳协程接纳的值会写入到 ep 中
//
// 关于同步 chan(无缓冲 chan),两个值是同一个。
// 关于异步 chan,接纳者从 chan 的缓冲区获取数据,发送方的输入放入 chan 缓冲区。
// 通道 c 有必要已满并锁定。recv 会运用 unlockf 来解锁 c。
// sg 有必要现已从 c 中移除(准确来说是 c.sendq)。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 假如无缓冲区
   if c.dataqsiz == 0 {
      // ...
      // 直接将 sender 的要发送的值仿制到 ep
      if ep != nil {
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // 有缓冲区,可是缓冲区满了。
      // 从行列头获取元素,即将发送的值放入行列尾。(实践上操作的是同一个方位,由于环形行列满了)
      // 需求获取的值的指针地址
      qp := chanbuf(c, c.recvx)
      // ...
      // 假如需求接纳值,则将 qp 仿制到 ep(没有疏忽回来值)
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // 即将发送的值写入到 qp(sendq 仇人元素要发送的值写入到 qp,也便是 chan 刚刚空出来的方位)
      typedmemmove(c.elemtype, qp, sg.elem)
      // 行列头、尾指针移动
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   }
   sg.elem = nil
   gp := sg.g
   // 开释锁
   unlockf()
   // ...
   // 唤醒协程(这个被唤醒的协程是之前由于发送不出去被堵塞的协程)
   goready(gp, skip+1)
}
// 将数据直接从 sender 仿制到 receiver
// 场景:发送到无缓冲的 chan
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
   src := sg.elem
   typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
   // dst 是 receiver 栈里保存接纳值的地址,src 是 sender 栈里要被发送的值的地址
   memmove(dst, src, t.size)
}

封闭 chan

chan 封闭的进程比较简略,修正 closed 为 1,然后唤醒发送行列和接纳行列里边的 g,假如发送行列有 g,被唤醒之后会 panic,由于不能往一个现已封闭的 chan 发送数据。

go chan 设计与实现

// 封闭 chan
func closechan(c *hchan) {
   // 不能封闭 nil chan
   if c == nil {
      panic(plainError("close of nil channel"))
   }
   // 敞开锁
   lock(&c.lock)
   if c.closed != 0 {
      // chan 现已封闭,panic,不能重复封闭。开释锁
      unlock(&c.lock)
      panic(plainError("close of closed channel"))
   }
   // ...
   // 设置 closed 标志
   c.closed = 1
   // gList 用来保存堵塞在 chan 上的 g(链表,包含了 sender 和 receiver)
   var glist gList
   // 开释一切等候读取 chan 的协程(免除堵塞状况)
   for {
      // recvq 队头元素出队
      sg := c.recvq.dequeue()
      if sg == nil {
         // sendq 现已没有元素了
         break
      }
      // 封闭之后,从 chan 接纳到的都是零值
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      // ...
      glist.push(gp)
   }
   // 开释一切正在等候写入 chan 的协程(免除堵塞状况,这些协程会 panic)
   for {
      // sendq 队头元素出队
      sg := c.sendq.dequeue()
      if sg == nil {
         // sendq 现已没有元素了
         break
      }
      // ...
      glist.push(gp)
   }
   // 开释锁
   unlock(&c.lock)
   // 将一切等候的协程修正为安排妥当态
   for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      // g 状况修正为可运转状况
      goready(gp, 3)
   }
}

关于实践开发的效果

在上一篇文章和本文中,花了很大的篇幅来叙述 chan 的规划、完结与运用,这么多东西对咱们有什么用呢?

其间十分重要的一个效果是,清楚地了解 chan 的工作机制,便于咱们对程序实践运转状况进行分析, 尤其是一些十分隐晦的读写 chan 场景,究竟稍有不小心就会导致协程泄漏,这对进程影响可能对错常大的。

比方下面的这种代码:

package main
import (
   "fmt"
   "runtime"
   "time"
)
func main() {
   for i := 0; i < 10000; i++ {
      time.Sleep(time.Second)
      go func() {
         // 永久堵塞,协程泄漏
         var ch chan int
         ch <- 1
      }()
      // 咱们会看到协程数量逐渐增长。
      // 可是这部分挂起的协程永久不会被调度。
      fmt.Printf("goroutine count: %d\n", runtime.NumGoroutine())
   }
   time.Sleep(time.Hour)
}

tips:在 chan 读写的地方需求留意自己的写法会不会让 goroutine 永久堕入堵塞,或许长时间堵塞。

总结

  • chan 底层是 hchan 结构体。
  • go 语法里边的 <- 不过是语法糖,在编译的时分,会编译成 hchan 相关的办法调用。终究都会调用 chansend 或许 chanrecvselect...case 里边的 chan 读写终究也会编译为对 chansendchanrecv 的调用。
  • chan 总体规划:维护了三个行列:
    • hchan.buf: chan 中暂存 sender 发送数据的行列(在有 receiver 读取的时分会从这个行列中仿制到 receiver 中)
    • hchan.recvq: 接纳行列,存储那些测验读取 channel 但被堵塞的 goroutine
    • hchan.sendq: 发送行列,存储那些测验写入 channel 但被堵塞的 goroutine
  • 读写 chan 的协程堵塞是经过 gopark 完结的,而从堵塞态转换为可运转状况是经过 goready 完结的。
  • chan 读写操作堵塞的时分,假如是在 select 句子中,则会直接回来(标明当时的分支没有被选中),不然,会调用 gopark 挂起当时协程。
  • 在封闭 chan 的时分,会调用 goready 唤醒堵塞在发送或许接纳操作上的 g(协程)。
  • 无缓冲 chan 的操作有点特别,关于无缓冲 chan,有必要一起有 senderreceiver 才干发送和接纳成功,不然另一边都会堕入堵塞(当然,select 不会堵塞)。