Golang 并发形式:扇入、扇出

个人喜爱 Golang 的最突出原因之一是:咱们能够轻松构建高可用且非堵塞的程序。

在本系列文章中,我将测验回忆 Golang 中可用的形式。我将采用每种形式,并具体讨论它们合适的位置以及怎么有效地运用它们。

什么是扇入扇出。这是一种将数据从多个流或从一个流汇聚到多个流或管道的单一数据流的办法。

generate函数

为了讨论这个形式,咱们首要需求一个数据源。这是一个能够用作数据源的数据生成器。

func generate( data string) <-chan string{
    channel := make(chan string)
    go func() {
        for {
            channel <- data
            time.Sleep(time.Duration(100*time.Millisecond))
        }
    }()
    return channel
}

上面的函数显然返回一个只接纳通道。

该函数的顾客只能从通道接纳数据。

注意,在这个函数里面,channel的定义简化运用make定义并创立。但是经过在响应类型前面加<-,咱们使它限定为一个只接纳通道。

扇入

现在咱们有了数据源,让咱们来创立形式中的“扇子”。让咱们看看这个函数:

func fanIn() {
    c1 := generate("Hello")
    c2 := generate("There")
    fanin := make(chan string)
    go func() {
        for {
            select {
            case str1 := <-c1: fanin <- str1
            case str2 := <-c2: fanin <- str2
            }
        }
    }()
    go func() {
        for {
        fmt.Println(<-fanin)
        }
    }()
}

分析

  1. 在第 2、3 行中,咱们创立了 2 个数据生成器c1和c2。
  2. 在第 5 行中,咱们正在创立fanin通道,它将成为从c1和c2获取数据的汇聚通道。
  3. 在第 9 行和第 10 行中,根据通道c1和c2的数据可用性,将挑选恰当的状况并将相同的数据推送到通道fanin。

有用的场景

想想咱们有必要兼并一切事件的场景

扇出

关于扇出功能,咱们需求一组接受器,咱们的生成器函数将在其中继续发送要处理的音讯或作业。

关于这种状况,让咱们将生成器函数更改为有一些推迟。

package main
import (
"fmt"
"time"
)
func generate(data string) <- chan string {
    channel := make(chan string)
    go func() {
        for {
            channel <- data
            time.Sleep(1000)
        }
    }()
    return channel
}
type Processor struct {
    jobChannel chan string
    done chan *Worker
    workers []*Worker
}
type Worker struct{
    name string
}
func (w * Worker) processJob(data string, done chan *Worker) {
    // Use the data and process the job
    go func() {
        fmt.Println("Working on data ", data, w.name)
        time.Sleep(3000)
        done <- w
    }()
}
func GetProcessor() *Processor {
    p := &Processor{
        jobChannel: make(chan string),
        workers: make([]*Worker, 5),
        done: make( chan *Worker),
    }
    for i := 0; i < 5; i++ {
        w := &Worker{name : fmt.Sprintf("<Worker - %d>", i)}
        p.workers[i] = w
    }
    p.startProcess()
    return p
}
func (p *Processor) startProcess() {
    go func() {
        for {
            select {
            default :
                if len(p.workers) > 0 {
                    w := p.workers[0]
                    p.workers = p.workers[1:]
                    w.processJob( <- p.jobChannel, p.done)
                }
            case w := <- p.done:
                p.workers = append(p.workers, w)
            }
        }
    }()
}
func (p *Processor) postJob(jobs <-chan string) {
    p.jobChannel <- <-jobs
}
func main() {
    source := generate("data string")
    process := GetProcessor()
    for i := 0; i < 12; i++ {
        process.postJob(source)
    }
}

让咱们逐行讨论。

  1. 在第 21 和 26 行中,咱们声明了一个Processor和一个Worker结构。

    处理器有一个工人列表,将用作后台进程来处理来自生成器函数(数据源)的数据

  2. 第 40 行定义了一个函数来创立Processor的实例并在第 50 行开始处理。
  3. 咱们在第 73 行运用postJob办法与处理器实例进行交互,这发生在第 85 行。咱们正在向处理器实例发送11条音讯进行处理。
  4. 在第 74 行,咱们从生成器获取音讯并将其传送到处理器实例中的jobChannel通道。
  5. startProcess办法中,咱们有 2 个挑选。在第 62 行,咱们在postJob办法中获取生成器在第 74 行发送的音讯,仅当有作业人员时(第 59 行)。
  6. 咱们在第 61 行挑选 worker (它始终是处理器实例中 worker 切片的顶部 worker)。

    在实际场景中,咱们应该构建一个基于优先级队列的作业池,以便作业均匀分布而且处理器不被堵塞。

    此设置也不是背压感知的。如果没有作业,第 62 行会堵塞。在这些状况下,请保证增加背压处理。

  7. 在第 62 行中,咱们将数据提供给在第 61 行中挑选的 worker,并发送处理器实例的完成通道。
  8. worker 在第 32 行的单独 goroutine 中进行处理,并经过done通道告诉处理器实例。
  9. worker 的信号在第 64 行被捕获,而且 worker 被再次增加到 worker 列表中。

如果咱们运转代码,咱们将看到下面的结果:

Golang 并发模式:扇入、扇出

到这里,咱们就结束对扇入和扇出形式的学习。我将在接下来的帖子中发布另一个规划形式。

快乐学习和分享。

翻译自:Golang Concurrency Patterns : Fan in, Fan out