本文首要研究一下gost的GenericTaskPool

GenericTaskPool

gost/sync/task_pool.go

// GenericTaskP作业总结ool represents an generic task p公积金ool.
type Ge公积金nericTaskPool interface {
// AddTask wait idle wor作业细胞ker add task
AddTask(t task) bool
// AddTaskAlways add task to queues or do it immediatel工商银行y
AddTaskAlways(t task)
// AddTaskBalance add task to idle queue
AddTaskBalance(t task)
// Close use to close the task pool
Close()
// IsClosed use to check pool status.
IsClosed() bool
}

Gener宫崎骏icTaskPool接口界说了AddTask、AddTaskAlways、AddTaskBalance、Close、IsClosed接口

TaskPool

gost/sync/googletask_pool.go

type TaskPoo宫外孕l struct {
TaskPoolOptions
idx    u作业总结int32 // round robin index
qArray []chan task
wg     sync.狗狗币WaitGroup
once sync.Once
done chan struct{}
}
// return false when the p宫颈癌ool is stop
func (p *TaskPool)狗狗币 AddTask(t task) (ok bool) {
idx := atomic.AddUint32(&p.idx, 1)
id := idx % uint32(p.tQNumber)
select {宫崎骏
case <-p.done:
return公积金 false
default:
p.qArray[id] <- t
return true
}
}
func (p *TaskPool) AddTaskAlways(t task) {
id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNum作业细胞ber)
select {
case p.qArray[id] <- t:
r作业总结eturn
default:
goSafely(t)
}
}
// do it immediately when no idle queue
func (狗狗币p *TaskPool) AddTaskBa作业总结lance(t t作业总结ask) {
length := l作业细胞en(p.qArr宫外孕ay)
//作业总结 tr公积金y len/2 times to作业总结 lookup idle queue
for i := 0; i &lt公积金; length公积金/2; i++ {
select {
case p.qArray[rand.Intn(length)] <- t:
return
default:
continue
}
}
goSaf作业细胞el狗狗币y(t)
}
// check whether the session has been closed.
func (p *Tas公积金kPool) IsClosed公积金() bool {
s宫崎骏elect {
case <-p.done:
return true
default:狗狗币
return false
}
}
func (p *TaskPool) C公积金lose() {
p.stop(工商银行)
p.wg.Wait()
for i工商银行 := range p.qArray {
close(p.qArray[i])
}
}

TaskPo作业细胞ol界说了TaskPoolOptions、idx、qArray、wg、once、done特征;它完成了GenericTaskPool接口;AddTask办法在pogoogleol是done的时分会回来false,公积金其他状况会递加idx,然后依据tQNumber核算id,往qArraygoogle[id]写入task;AddTaskAlways办法会疏忽pool的关闭信息;AddTaskBalance办法会尝试len/2次随机往qArray写入task,都写入不成功则goSafely实施;IsClosed首要是读取done的channel信息;Close办法实作业细胞行stop及wg.Wait(),毕竟遍历qArray挨个实施close

NewTaskPool

gost/sync/task_pool.go

func NewgoogleTaskPool(opts ...TaskPoolOption) GenericTaskPool {
var tOpts TaskPo宫外孕olOpti宫外孕ons
for _, opt := range opts {
opt(&tOpts)
}
tOpt枸杞s.validate()
p := &TaskPo作业总结ol{
TaskPoolOptions: t宫崎骏Opts,
qArray:          make([]chan tas狗狗币k, t作业总结Opts.tQNumber),
done:            make(chan struct{}),
}
for i := 0; i < p.tQNumber; i++ {
p.qArray[i] = make(chan task,工商银行 p.tQLen)
}
p.start()
return p
}

NewTaskPool经过TaskPoolOptions来创立TaskPool

TaskPoolOption

gost/s作业总结ync/options.go

const (
defaultTa工商银行skQNumber = 10
defaultTaskQLen    = 128
)
/////////////////////////////////////////
// Task Po公积金ol Options
///////////////////////////////google//////////
// TaskPoolOptions is optional se公积金ttings for t作业总结ask pool
type TaskPoolOptions struct {
tQLen      int //狗狗币 task queue length. b宫颈癌uffer s宫外孕ize per queue
tQNumber   int // task queue number. number of queue
tQPoolSizgooglee int // task pool size. number of workers
}
func (o *TaskPoolOptions) validate() {
i宫颈癌f o.tQPoolSize < 1 {
panic公积金(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
}
if o.tQLen < 1 {
o.tQLen = defaultTaskQLen
}
if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}
iGof o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}
type TaskPoolOption func(*TaskPoolOptions)
// WithTaskPoolTaskPoolSize set @size of the task queue pool sizeGo
func Wit枸杞hTaskPoolTaskPoolSize(size int作业细胞) TaskPoolOption {
return fugooglenc(o *TaskPoolOptions) {
o.tQPoolSize = size
}
}
// WithTaskPoolTaskQueueLength set @length of the task queue length
func WithTaskPoolTaskQueueLength(length int)宫外孕 TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQLen = length
}
}
// WithTaskPoolTaskQueueNumber set @nu工商银行mber of the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
return func(o *Ta宫颈癌skPoolOptiogooglens) {
o.tQNumb工商银行er = number
}
}

TaskPoolOptions界说了tQLen、tQNumber、tQPoolSize特征,供给了WithTaskPoolTa宫崎骏skPoolSize、WithTaskPoolTaskQueueLengthGo、WithTaskPool作业总结TaskQugoogleeueNumbergoogle、validate办法

start

gost/sync/task_pool.go

func (p *Tas公积金kPool) start() {
for公积金 i := 0; i < p.tQPoolSize; i++ {
p.wg.公积金Add(1)
workerID := i
q := p.qArray[worke宫颈癌rID%p.tQNumber]
p.safeRun(workerID, q)
}
}
f作业总结unc (p *TaskPool) safeRun(workerID int, q chan task) {
gxruntime.GoS狗狗币afely(nil, false作业总结,
func() {
err := p.run(int(workerID), q)
if err != nil {
// log error to std作业细胞e作业细胞rr
log.Printf("gost/TaskPool.run error: %s", err.Error())
}
},
nil,
)
}

start办法依据tQPoolSize挨个实施saf工商银行eRun;safeRun办法经过公积金GoSafely实施p.run(int(workerID), q)

run

gost/sync/task_pool.go

// worker
func (p *TaskPool) run(id int, q chan task) error {
defer p.wg.Done()
var (
ok bool
t  task
)
for {狗狗币
select {
case <-p.done:
if 0 < len(q) {
return fmt.Errorf("task worker %d exit now while its taskgoogle buf枸杞fer length %d is grea作业细胞ter than 0",
id, le枸杞n(q))
}
return nil
case t, ok = <-q:
if ok {
func() {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "%s gorouti枸杞ne panic: %vn%sn",
time.Now(), r, string(debug.Sta公积金ck()))
}
}()
t()
}()
}
}
}
}

run办法经过for循环进行select,若读取到p.done则退出循环;若是读取到task则实施task

taskPoolSimple

g作业总结ost/sync/task_pool.go

type taskPoolSimple struct {
work chan tagooglesk     // task channel
sem  chan struct{Go} // gr po宫外孕ol size
wg syn工商银行c.WaitGroup
once sync.Once
done chan struct{}
}

taskPoGoolSimple界说了w狗狗币ork、sem、wg、once、done特征;它完成了GenericTa工商银行skPool接口;googleAddTask办法先判别done,之后写入work及sem;AddTaskAlways办法在select不到channel的时分会实施goSafely;AddTask公积金Balance办法实践实施的是AddTaskAlways工商银行;IsClosed办法读取done信息;Close办法实施stop及wg.Wait()

作业细胞

gost/sync/task_pool_test.go

func TestTaskPool(t *testing.T) {
numCPU := runtime.NumCPU()
//task宫颈癌Cnt := int64(numCPU * numCPU公积金 * 100)
tp := NewTaskPool(
W狗狗币ithTaskPoolTaskPoolSize(1),
WithTaskPoolTaskQueueNumber(1),作业细胞
With狗狗币TaskPoolTaskQueueLength(1),
)
//task, cnt := newCoun宫外孕tTask()
task, _ := newCountTask()
var wg sync.WaitGroup
for i := 0; i < numCPU*nu宫外孕m作业总结CPU; i++ {
wg.Add(1)
go宫颈癌 func() {
for j := 0; j < 100; j++ {
ok := tp.Ad作业总结dTask(task)
if !ok {
t.Log(j)
}
}
wg.Done()
}()
}
wg.W宫崎骏ait()
tp.Close()
//if taskCnt != atomic.LoadInt64(cnt) {
//	//tgoogle.Error("want ", taskCnt, " g宫颈癌ot ", *cnt)
//公积金}
}
func TestTaskPoolSimple(t *testing.T) {
numCPU := runtime.NumCPU()
taskCnt := int64(numCPU * numCPU * 100)
tp := NewTaskPoolSimple(1)
task, cnt := newCountTask()
var wg s宫崎骏ync.WaitGroup
for i := 0; i < numGoCPU*numCPU; i++ {
wg.Add(1)
go func() {
fo作业细胞r j := 0; j < 100; j++ {
ok := tp.AddTask(task)
if !ok {
t.Log(j)
}
}
wg.Done()
}()
}
wg.Wait()
cntValue := atomic.LoadInt64(cnt)
if taskCnt != cntValu宫外孕e {
t.Error("want ", taskCnt, " got ", cntVal枸杞ue)
}
}

TaskP宫颈癌o工商银行olSimple的创立比较简单,只需求供给size参数即可;TaskPool的创立需Go求供给T宫颈癌askPoolOption,有WithTaskPoolTaskPoolSize、WithTaskPoolTaskQueueNumber、WithTaskPoolTaskQueue宫崎骏Length这些option

小结

gost的GenericTaskPool接口宫颈癌界说了AddTask、AddTaskAlways、AddTaskBalance、Close、IsClosed接口;这里有TaskPool、taskPoolS公积金imple两个完成。

doc

  • gost枸杞