布景

Golang 里边,我们常常运用 channel 进行协程之间的通讯。这里有一个经典的场景,也便是出产者顾客模式,出产者协程不断地往 Channel 里边塞元素,而顾客协程不断地消费这些元素。

【Golang】如何实现实现带优先级的select

写成代码便是如下:

package main
import (
	"fmt"
	"sync"
	"time"
)
func main() {
	ch := make(chan int, 10)
	var wg sync.WaitGroup
	wg.Add(2)
	go producer(ch, &wg)
	go consumer(ch, &wg)
	wg.Wait()
}
// 出产者
func producer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	i := 0
	for {
		select {
		case ch <- i:
		default:
			// 丢弃
			log.Println("discard")
		}
		i++
		time.Sleep(time.Second)
	}
}
// 顾客
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	for {
		i := <-ch
		consume(i) // 消费元素
	}
}

出产者不断产生元素,顾客消费元素。出产者不会等候顾客消费完毕(不然或许影响其他使命),假如 channel 现已满了,也便是说明顾客消费不过来,出产者就会丢弃这个使命。

出产者均匀一秒生成1个,顾客0.7秒消费一个。正常情况下顾客是消费得过来的,可是许多时候顾客协程还需求做一些守时使命,比方一些守时整理作业。假如这个整理作业每2秒触发一次,整理时刻一般需求1.5秒,也便是假如每次都做每一秒有0.75秒会被整理作业占有了,可是它不是一定要十分及时的,能够等空闲时再进行。 如下代码:

// 顾客
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
        defer t.Stop()
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func() {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	for {
		select {
		case i := <-ch:
			consume(i) // 消费元素:
		case <-t.C:
			clear() // 整理
		}
	}
}

运转程序到第15秒的时候,出产者发现 channel满了,所以开端丢包:

0
1
clear
2
3
4
5
6
clear
7
clear
8
clear
9
clear
clear
10
clear
11
12
13
14
clear
15
clear
clear
discard
16
clear
discard
discard

解决方案

已然整理使命的优先级并不高,那么它就不应该堵塞消费元素流程,而是应该在空闲时才去履行。由于 Golang 里边,假如 select 两个 case 都一起满意,会随机选一个履行,因而榜首想到的或许会运用如下代码完成优先级case:

// 顾客
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
        defer t.Stop()
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func() {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	for {
		select {
		case i := <-ch:
			consume(i) // 消费元素
			continue   // 或许还有元素,不走整理逻辑
		default:
		}
		// 没有元素才走整理逻辑
		select {
		case <-t.C:
			clear() // 整理
		default:
		}
	}
}

假如运转这个程序,能够发现它能够满意优先级的需求,先消费元素,空闲时再履行整理使命。

可是,在没有元素能够消费,也没有整理使命能够履行的时候,这里的for将会不断地循环,浪费CPU资源。

其实,能够运用下面的方法完成优先级case,它能够在没有元素安排妥当的时候堵塞在 select,而不是不断循环:

// 顾客
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
        defer t.Stop()
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func() {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	for {
		select {
		case i := <-ch:
			consume(i) // 消费元素
		case <-t.C:
		priority:
			for { // 整理前先把元素消费完
				select {
				case i := <-ch:
					consume(i) // 消费元素
				default:
					break priority // 注:这里会越过这个循环,而不是再次履行
				}
			}
			clear() // 整理
		}
	}
}

这里的关键是在触发整理case的时候,先去把channel里边的元素消费完,再进行整理,然后保证能够留下足够的channel缓冲区给出产者放置出产的元素。

一个封装

上面那段优先级case代码其实挺常用的,可是几乎都是模板代码,特别是需求在两个地方写consume(i),因而我们能够封装一下这段代码,方便运用,减少犯错:

// 优先级select ch1 的使命先履行完毕后才会履行 ch2 里边的使命
func PrioritySelect[T1, T2 any](ch1 <-chan T1, f1 func(T1), ch2 <-chan T2, f2 func(T2)) {
	for {
		select {
		case a := <-ch1:
			f1(a)
		case b := <-ch2:
		priority:
			for {
				select {
				case a := <-ch1:
					f1(a)
				default:
					break priority
				}
			}
			f2(b)
		}
	}
}

这样,我们的顾客代码就能够简化为:

// 顾客
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
        defer t.Stop()
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func(time.Time) {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	PrioritySelect(ch, consume, t.C, clear)
}

代码

pselect