Go完结后台使命调度体系

<大众号:仓库future>

一、背景

平常咱们在开发API的时分,前端传递过来的大批数据需求通过后端处理,如果后端处理的速度快,前端呼应就快,反之则很慢,影响用户体会。针对这种场景咱们一般都是后台异步处理,不需求前端等待所有的都履行完才回来。为了处理这一问题,需求咱们自己完结后台使命调度体系。

二、使命调度器完结

Go实现后台任务调度系统

poll.go

package poller
import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)
type Poller struct {
	routineGroup *goroutineGroup // 并发控制
	workerNum    int // 记录一起在运转的最大goroutine数
	sync.Mutex
	ready  chan struct{} // 某个goroutine现已预备好了
	metric *metric // 计算当时在运转中的goroutine数量
}
func NewPoller(workerNum int) *Poller {
	return &Poller{
		routineGroup: newRoutineGroup(),
		workerNum:    workerNum,
		ready:        make(chan struct{}, 1),
		metric:       newMetric(),
	}
}
// 调度器
func (p *Poller) schedule() {
	p.Lock()
	defer p.Unlock()
	if int(p.metric.BusyWorkers()) >= p.workerNum {
		return
	}
	select {
	case p.ready <- struct{}{}: // 只要满足当时goroutine数量小于最大goroutine数量 那么就通知poll去调度goroutine履行使命
	default:
	}
}
func (p *Poller) Poll(ctx context.Context) error {
	for {
		// step01
		p.schedule() // 调度
		select {
		case <-p.ready: // goroutine预备好之后 这儿就会有消息
		case <-ctx.Done():
			return nil
		}
	LOOP:
		for {
			select {
			case <-ctx.Done():
				break LOOP
			default:
				// step02
				task, err := p.fetch(ctx) // 获取使命
				if err != nil {
					log.Println("fetch task error:", err.Error())
					break
				}
				fmt.Println(task)
				p.metric.IncBusyWorker() // 当时正在运转的goroutine+1
				// step03
				p.routineGroup.Run(func() { // 履行使命
					if err := p.execute(ctx, task); err != nil {
						log.Println("execute task error:", err.Error())
					}
				})
				break LOOP
			}
		}
	}
}
func (p *Poller) fetch(ctx context.Context) (string, error) {
	time.Sleep(1000 * time.Millisecond)
	return "task", nil
}
func (p *Poller) execute(ctx context.Context, task string) error {
	defer func() {
		p.metric.DecBusyWorker() // 履行完结之后 goroutine数量-1
		p.schedule() // 从头调度下一个goroutine去履行使命 这一步是有必要的
	}()
	return nil
}

metric.go

package poller
import "sync/atomic"
type metric struct {
	busyWorkers uint64
}
func newMetric() *metric {
	return &metric{}
}
func (m *metric) IncBusyWorker() uint64 {
	return atomic.AddUint64(&m.busyWorkers, 1)
}
func (m *metric) DecBusyWorker() uint64 {
	return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}
func (m *metric) BusyWorkers() uint64 {
	return atomic.LoadUint64(&m.busyWorkers)
}

goroutine_group.go

package poller
import "sync"
type goroutineGroup struct {
	waitGroup sync.WaitGroup
}
func newRoutineGroup() *goroutineGroup {
	return new(goroutineGroup)
}
func (g *goroutineGroup) Run(fn func()) {
	g.waitGroup.Add(1)
	go func() {
		defer g.waitGroup.Done()
		fn()
	}()
}
func (g *goroutineGroup) Wait() {
	g.waitGroup.Wait()
}

三、测试

package main
import (
	"context"
	"fmt"
	"ta/poller"
	"go.uber.org/goleak"
	"testing"
)
func TestMain(m *testing.M)  {
	fmt.Println("start")
	goleak.VerifyTestMain(m)
}
func TestPoller(t *testing.T) {
	producer := poller.NewPoller(5)
	producer.Poll(context.Background())
}

成果:

Go实现后台任务调度系统

四、总结

我们用别的方法也能够完结,核心关键便是控制并发节奏,防止很多请求打到task service,在这儿起到核心作用的便是schedule,它控制着整个使命体系的调度。一起还封装了WaitGroup,这在大多数开源代码中都比较常见,我们能够去尝试。另外便是test case必定得跟上,防止goroutine走漏。

来自于我的大众号:仓库future,请我们多多支撑