基本介绍

WaitGroup是go用来做任务编列的一个并发原语,它要处理的便是并发 – 等候的问题:

当有一个 goroutine A 在查看点(checkpoint)等候一组 goroutine 悉数完结,假如这些 goroutine 还没悉数完结,goroutine A 就会堵塞在查看点,直到一切 goroutine 都完结后才干继续履行

试想假如没有WaitGroup,想要在协程A比及其他协程履行完结后能立马履行,只能不断轮询其他协程是否履行结束,这样的问题是:

  1. 及时性差:轮询距离越高,及时性越差
  1. 无谓的空轮训,糟蹋系统资源

而用WaitGroup时,协程A只用堵塞,直到其他协程履行结束后,再告诉协程A

其他言语也供给了相似的工具,例如Java的CountDownLatch

运用

Waitgroup供给了3个办法:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • Add:增加计数值
  • Done:削减计数值
  • Wait:调用这个办法的 goroutine 会一直堵塞,直到 WaitGroup 的计数值变为 0

源码剖析

type WaitGroup struct {
   // 防止复制
   noCopy noCopy
   // 64位环境下,高32位是计数值,低32位记载waiter的数量
   state1 uint64
   // 用于信号量
   state2 uint32
}

Add

func (wg *WaitGroup) Add(delta int) {
   // 获取状况值,信号量
   statep, semap := wg.state()
   // 将参数delta左32位,加到statep中,即给计数值加上delta
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // 加后的计数值
   v := int32(state >> 32)
   // waiter的数量
   w := uint32(state)
   // 加后不能是负值
   if v < 0 {
      panic( "sync: negative WaitGroup counter" )
   }
   // 有waiter的状况下,当前协程又加了计数值,panic
   // 即有waiter的状况下,不能再给waitgroup增加计数值了
   if w != 0 && delta > 0 && v == int32(delta) {
      panic( "sync: WaitGroup misuse: Add called concurrently with Wait" )
   }
   // 假如加完后v大于0,或许加完后v等于0,但没有等候者,直接回来
   if v > 0 || w == 0 {
      return
   }
   // 接下来便是v等于0,且w大于0的状况
   // 再次查看是否有Add和Wait并发调用的状况
   if *statep != state {
      panic( "sync: WaitGroup misuse: Add called concurrently with Wait" )
   }
   // 将计数值和waiter数量清0
   *statep = 0
   // 唤醒一切的waiter
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}
  • 由于state高32位保存计数值,因而需求将参数delta左移32位后加到state上才正确
  • 假如加完后v大于0,或许加完后v等于0,但没有等候者,直接回来

    • v大于0:表明自己不是最终一个调用Done的协程,不必自己来释放waiter,直接回来
    • v等于0,但没有等候者:由于没有等候者,也就不必释放等候者,也直接回来
  • 不然便是v等于0,且w大于0的状况:

    • 自己是最终一个调用Done的,且还有等候者,那就唤醒一切等候者

Done

Done内部调用Add,仅仅参数传-1,表明削减计数值

func (wg *WaitGroup) Done() {
   wg.Add(-1)
}

Wait

func (wg *WaitGroup) Wait() {
   statep, semap := wg.state()
   for {
      state := atomic.LoadUint64(statep)
      // v:计数值
      v := int32(state >> 32)
      w := uint32(state)
      // 假如计数值为0,自己不需求比及,直接回来
      if v == 0 {
         return
   }
      // 增加waiter计数值
 if atomic.CompareAndSwapUint64(statep, state, state+1) {
         // 自己在信号量上堵塞
         runtime_Semacquire(semap)
         // 查看Waitgroup是否在wait回来前被重用
         if *statep != 0 {
            panic( "sync: WaitGroup is reused before previous Wait has returned" )
         }
         return
      }
   }
}
  • 假如计数值为0,当前不需求堵塞,直接回来
  • 不然将waiter数量加1,假如添加成功,就把自己堵塞到信号量上
  • 被唤醒时,假如statep不为0,表明该waitgroup是否在wait回来前被重用了,panic

注意事项

通过源码剖析能够看出,Waitgroup有以下运用注意事项:

  1. 计数器的值有必要大于等于0

    1. 一开端调用Add时,不能传负数
    2. 调用Done的次数不能过多,导致超过了 WaitGroup 的计数值
    3. 因而运用 WaitGroup 的正确姿势是,预先确定好 WaitGroup 的计数值,然后调用相同次数的 Done 完结相应的任务
  1. 确保在希望的Add调用完结后,再调用Wait,不然Wait发现计数值为0时不会堵塞

    1. 最好在一个协程中,按次序先调Add,再调Wait
  1. 需求重用时,需求在前一组调用Wait结束后,再开端新一轮的运用

    1. WaitGroup 是能够重用的。只需 WaitGroup 的计值康复到零值的状况,那么它就能够被看作是新创建的 WaitGroup,被重复运用,而不能在前一组没运用完的状况下又运用