关于Golang的设计模式
2025-02-05
Golang 中的并行设计模式:errgroup 与 MapReduce 实践指南
概述
在 Go 语言开发中,高效处理并发任务是提升系统性能的关键。咱主要对比两种典型的并行模式(errgroup
和MapReduce
),分析它们的差异与适用场景。
1. errgroup 模式
设计理念
- 轻量级并行控制:基于标准库实现,适合简单并行任务
- 快速失败机制:任一子任务失败立即取消所有任务
- Context 集成:天然支持超时和手动取消
典型实现
func FetchAllData() (*Result, error) {
g, ctx := errgroup.WithContext(context.Background())
var resA DataA
var resB DataB
g.Go(func() error {
data, err := serviceA.Call(ctx)
resA = data
return err
})
g.Go(func() error {
data, err := serviceB.Call(ctx)
resB = data
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
return &Result{A: resA, B: resB}, nil
}
特性
- 代码直白简单
- 需预定义结果变量
- 适合固定数量任务
2.MapReduce 模式
设计理念
- 结构化数据流:明确分为生成(Generate)→ 映射(Map)→ 归约(Reduce)三阶段
- 工作池机制:内置 Worker 控制并发数
- 弹性扩展:天然支持动态任务列表
典型实现
type Result struct {
A DataA
B DataB
}
func ProcessDynamicTasks() (*Result, error) {
return MapReduce(
// Generate阶段:动态生成任务
func(ch chan<- string) {
for _, task := range config.GetTasks() {
ch <- task
}
},
// Map阶段:并行处理任务
func(task string, writer Writer[any], cancel func(error)) {
data, err := process(task)
if err != nil {
cancel(err)
return
}
writer.Write(data)
},
// Reduce阶段:聚合结果
func(ch <-chan any, writer Writer[*Result], cancel func(error)) {
res := &Result{}
for data := range ch {
switch v := data.(type) {
case DataA: res.A = v
case DataB: res.B = v
}
}
writer.Write(res)
},
WithWorkers(10),
)
}
特性
- 自动处理结果收集
- 支持动态任务扩展
- 内置 panic 恢复机制
这里参考了 Golang 的微服务框架:GoZero 的 MR 模式实现,更多细节可参考GoZero。
场景对比
适合使用 errgroup 的场景(业务逻辑层面)
固定数量的微服务调用
// 并发调用用户服务、订单服务、支付服务
services := []func() error{getUser, getOrder, getPayment}
批量资源初始化
// 并行初始化数据库、缓存、消息队列
inits := []func() error{initDB, initCache, initMQ}
快速失败型任务
// 任一健康检查失败立即返回
checks := []func() error{checkAPI, checkDB, checkDisk}
适合使用 MapReduce 的场景(微服务层面)
数据处理流水线
// 日志处理:收集→解析→统计
GenerateLogs → ParseLog → CountMetrics
动态任务执行
// 根据用户输入动态生成分析任务
userSelectedTasks := getDynamicTasks()
大规模数据批处理
// 处理 10 万条数据:分片 → 处理 → 聚合
WithWorkers(100) // 使用 100 个 worker
我自己的一个实际工作中的设计
我称之为多级生产者-消费者模式,即将一个大的单体应用拆分成多个模块,每个模块都是生产者和消费者,通过 Channel 进行模块之间的通信,层层传递,每个模块都是一个独立的 goroutine。
之前在开发中遇到了一个 task,需要 5 分钟进行一次,从数据库中拉去信息提交给 FFmpeg 进行处理,本地做 fingerprints,然后提交到云端,最后再保存。本身这个 task 并不难,因为单个 batch 可能数据量很多,要满足 5 分钟一次的话,FFmpeg 的部分可能会出现上一 batch 还没处理完,下一 batch 就来了的情况,所以我将这个 task 拆分成了几个模块。
系统架构概览
实现了从直播流中实时识别背景音乐的完整流水线,核心流程包含四个阶段:
- 直播流获取:定期从数据库获取直播流信息
- 音频提取:使用 FFmpeg 提取音频片段
- 音乐识别:调用 ACRCloud API 进行识别
- 结果存储:批量存储识别结果到数据库
核心并行设计模式
1. 多级生产者-消费者模式
// 三级缓冲通道设计
liveStreamQueue := make(chan m.Live, 500) // 原始直播流队列
audioChannel := make(chan m.AudioData, 200) // 音频数据通道
recognitionChannel := make(chan m.RecognitionResult, 300) // 识别结果通道
...
func (p *Pipeline) Run(ctx context.Context) {
go p.fetchLiveStreams(ctx)
// 通过liveStreamQueue传递数据
go p.dispatchExtractors(ctx)
// 通过audioChannel传递数据
go p.dispatchRecognizers(ctx)
// 通过recognitionChannel传递数据
go p.storeResults(ctx)
}
这种设计模式不同于 errgroup 的简单并行,也不同于 MapReduce 的结构化流程。
虽然设计起来稍微有一点点复杂,难点主要在于各个 channel 的 load balance,但是这种设计模式可以很好的将一个大任务拆分成多个小任务,每个小任务都是一个独立的 goroutine,可以很好的利用多核 CPU 的优势,提高整体的处理速度。