参考 github.com/panjf2000/ants
package mainimport ("fmt""sync""sync/atomic""time"_ "github.com/panjf2000/ants"
)type pool struct {// 协程池最大容量cap int32// 当前运行的协程个数run int32block bool// 空闲goroutinueidleWorkers []*worker// 最大空闲时间,超过就挥手idleTimeoutSec uint32lock sync.Mutexcond *sync.Condclose chan struct{}wg sync.WaitGroup
}func NewPool(cap int32) *pool {c := &pool{cap: cap,//idleWorkers: make([]*worker, 0, cap),close: make(chan struct{}),}c.cond = sync.NewCond(&c.lock)go c.checkIdleWork()return c
}func (p *pool) checkIdleWork() {for {select {case <-time.After(time.Second * 5):{p.lock.Lock()defer p.lock.Unlock()var i = 0for ; i < len(p.idleWorkers) &&uint32(time.Now().Unix())-p.idleWorkers[i].lastWorkTs >= p.idleTimeoutSec; i++ {}fmt.Printf("checkIdleWork del=%d\n", i)if i > 0 {for j := 0; j < i; j++ {atomic.AddInt32(&p.run, -1)p.idleWorkers[j].task <- nil}p.idleWorkers = p.idleWorkers[i:]fmt.Printf("checkIdleWork now run=%d, del=%d\n", len(p.idleWorkers), i)}}}}
}func (p *pool) Done() {p.wg.Done()
}func (p *pool) Close() {fmt.Printf("pool Close\n")close(p.close)p.wg.Wait()
}func (p *pool) backPool(w *worker) {p.lock.Lock()p.idleWorkers = append(p.idleWorkers, w)p.lock.Unlock()p.cond.Signal()fmt.Printf("backPool\n")
}func (p *pool) Submit(task func()) error {p.lock.Lock()if len(p.idleWorkers) > 0 {work := p.idleWorkers[len(p.idleWorkers)-1]p.idleWorkers = p.idleWorkers[:len(p.idleWorkers)-1]work.task <- taskp.lock.Unlock()fmt.Printf("use idle\n")return nil}p.lock.Unlock()if p.run < p.cap {fmt.Printf("use new\n")work := &worker{pool: p,task: make(chan func()),}p.wg.Add(1)work.run()atomic.AddInt32(&p.run, 1)work.task <- taskreturn nil} else {loop:if p.block {return fmt.Errorf("pool max")}fmt.Printf("pool max, wait\n")p.lock.Lock()p.cond.Wait()if len(p.idleWorkers) > 0 {work := p.idleWorkers[len(p.idleWorkers)-1]p.idleWorkers = p.idleWorkers[:len(p.idleWorkers)-1]work.task <- taskp.lock.Unlock()fmt.Printf("use idle\n")return nil}p.lock.Unlock()if p.run < p.cap {work := &worker{pool: p,task: make(chan func()),}p.wg.Add(1)work.run()atomic.AddInt32(&p.run, 1)work.task <- taskfmt.Printf("use new\n")return nil}goto loopreturn nil}
}type worker struct {pool *pooltask chan func()lastWorkTs uint32
}func (w *worker) run() {go func() {defer w.pool.Done()for {select {case <-w.pool.close:returncase taskFunc := <-w.task:if taskFunc == nil {return}taskFunc()w.lastWorkTs = uint32(time.Now().Unix())w.pool.backPool(w)}}}()
}func main() {p := NewPool(20)for i := 0; i < 5; i++ {p.Submit(func() {func(idx int) {fmt.Printf("work %d ..\n", idx)time.Sleep(time.Second * 4)}(i)})}time.Sleep(time.Second * 10)p.Close()
}