Go Worker Pool 并发模型实践
wxk1991 Lv5

Go Worker Pool 并发模型实践

Go 启动 goroutine 很容易,但并发不是越多越好。任务很多时,如果来一个任务就启动一个 goroutine,内存、数据库连接、下游接口都可能被打爆。

Worker Pool 的作用就是限制并发,让任务有序进入固定数量的 worker。


一、最小 Worker Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type Job struct {
ID int
}

func worker(id int, jobs <-chan Job) {
for job := range jobs {
fmt.Printf("worker %d handle job %d\n", id, job.ID)
}
}

func main() {
jobs := make(chan Job, 100)

for i := 0; i < 5; i++ {
go worker(i, jobs)
}

for i := 0; i < 100; i++ {
jobs <- Job{ID: i}
}

close(jobs)
}

这里最多只有 5 个 worker 同时处理任务,不会无限创建 goroutine。


二、等待所有任务完成

实际项目需要等待任务处理结束,可以配合 sync.WaitGroup

1
2
3
4
5
6
7
8
9
10
11
12
var wg sync.WaitGroup

for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs)
}(i)
}

close(jobs)
wg.Wait()

close(jobs) 表示没有更多任务,worker 读完 channel 后自然退出。


三、加入 context 取消

长期运行的任务最好支持取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
func worker(ctx context.Context, id int, jobs <-chan Job) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
handle(job)
}
}
}

这样服务关闭、请求超时、用户取消时,worker 能及时退出。


四、处理错误结果

如果任务会失败,可以增加结果 channel:

1
2
3
4
5
6
type Result struct {
JobID int
Err error
}

results := make(chan Result, 100)

worker 处理完后写入结果:

1
results <- Result{JobID: job.ID, Err: err}

调用方统一消费结果,决定重试、记录日志或返回响应。


五、不要让队列无限增长

channel 缓冲不是越大越好。缓冲太大,只是把压力藏起来。

如果任务生产速度持续大于消费速度,应该做限流、拒绝、降级或扩容,而不是无限堆积任务。


六、实践建议

Worker Pool 适合批量任务、爬虫、消息消费、图片处理、异步导出等场景。

核心不是“并发处理”,而是“可控并发”。把 worker 数、队列长度、超时、失败重试都设计清楚,Go 的并发能力才能真正发挥出来。