go 语言可利用流水型模型提高并发能力, 充分利用计算机的性能。本文主要介绍流水线模型的搭建, 以及并发情况下如何优雅的退出。
什么是流水线模型
流水线模型由多个阶段组成, 不同阶段通过 channel 进行通信, 每个阶段可由多个功能相同的 goroutine 构成。每个阶段的 goroutine 都有一下特征:
- 从上游的 channle 中接收数据
- 处理数据, 通常会产生新的数据
- 通过 channel 处理后的数据送到下游
除了开始阶段「只有输入」和结束阶段「只有输出」外, 其余每个阶段均会使用到发送 channel 和接收 channel.
使用流线求平方
下面通过一个计算切片中每个元素平方的例子来介绍一下流水线。开始阶段, 新启一个 goroutine 将切片数据输出到 channel 中
func producer(input ...int64) <-chan int64 { out := make(chan int64) go func() { // 记得关闭 channel, 不关闭会造成泄漏 defer close(out) for _, k := range input { out <- k } }() return out }
处理阶段, 从 channel 中读取数据, 对每个数据求平台, 输出到 channel 中
func process(input <-chan int64) <-chan int64 { out := make(chan int64) go func() { defer close(out) // 接收 channel 数据, 直到 channel 被关闭 for v := range input { out <- v * v } }() return out }
结束阶段, 将 channel 输出
func end(input <-chan int64) <-chan int64 { out := make(chan int64) go func() { defer close(out) // 接收 channel 数据, 直到 channel 被关闭 for v := range input { out <- v } }() return out }
使用 mian 函数进行调用
func main() { p := producer(1, 2, 3, 4, 5, 6) proc := process(p) e := end(proc) fmt.Println("goroutines: ", runtime.NumGoroutine()) for v := range e { fmt.Println(v) } time.Sleep(time.Second) fmt.Println("goroutines: ", runtime.NumGoroutine()) }
扇入「Fan-In」和扇出「Fan-Out」
在上节的例子中,每个阶段均创建了一个 goroutine 来接收, 处理和发送数据。在生产环境中, 多个 goroutine 可以从同一个 channel 中去数据进行消耗处理「扇出」, 还需要有一个 goroutine 能够将多个 channel 统一收集到一个 channel 中进行输出「扇入」。
在上节的例子中, 我们可以运行多次 process 函数来并发处理数据。同时, 新定一个一个函数 merge 用于接收多个 goroutine 输出的 channel
// 扇入 func merge(inputs ...<-chan int64) <-chan int64 { out := make(chan int64) var wg sync.WaitGroup wg.Add(len(inputs)) for _, input := range inputs { // 并发接收多个 channel 数据, 扇入到一个 channel go func(input <-chan int64) { defer wg.Done() for v := range input { out <- v } }(input) } go func() { // 所有 channel 处理完之后关闭 out wg.Wait() close(out) }() return out } func main() { p := producer(1, 2, 3, 4, 5, 6) // 扇出, 多个 goroutine 处理同一个 channel proc1 := process(p) proc2 := process(p) // 扇入, 多个 channel 输出到一个 channel res := merge(proc1, proc2) fmt.Println("goroutines: ", runtime.NumGoroutine()) for v := range res { fmt.Println(v) } time.Sleep(time.Second) fmt.Println("goroutines: ", runtime.NumGoroutine()) }
退出流水线
在上节的例子中, 如果 main 函数没有全部消费 res, 则流水线中的所有 channel 都无法被 close, 从而导致所有有的 goroutine 都无法退出, 造成资源泄漏。
func main() { p := producer(1, 2, 3, 4, 5, 6) proc1 := process(p) proc2 := process(p) res := merge(proc1, proc2) fmt.Println("goroutines: ", runtime.NumGoroutine()) fmt.Println(<-res) time.Sleep(time.Second) // goroutine 仍然为 7 fmt.Println("goroutines: ", runtime.NumGoroutine()) }
当下有出现异常, 或者不在需要上游数据时, 需要使用一个 channel 来通知各个阶段的 goroutine, 关闭 channel, 退出 goroutine, 释放系统资源。在 main 函数中新建一个 channel, 用于和不同阶段的 goroutine 发送信号
func producer(done <-chan struct{}, input ...int64) <-chan int64 { out := make(chan int64) go func() { defer close(out) for _, k := range input { select { case out <- k: case <-done: // 退出 return } } }() return out } func process(done <-chan struct{}, input <-chan int64) <-chan int64 { out := make(chan int64) go func() { defer close(out) // 接收 channel 数据, 直到 channel 被关闭 for v := range input { select { case out <- v * v: case <-done: return } } }() return out } func merge(done <-chan struct{}, inputs ...<-chan int64) <-chan int64 { out := make(chan int64) var wg sync.WaitGroup wg.Add(len(inputs)) for _, input := range inputs { // 并发接收多个 channel 数据, 扇入到一个 channel go func(input <-chan int64) { defer wg.Done() for v := range input { select { case out <- v: case <-done: return } } }(input) } go func() { // 所有 channel 处理完之后关闭 out wg.Wait() close(out) }() return out } func main() { done := make(chan struct{}) //defer close(done) p := producer(done, 1, 2, 3, 4, 5, 6) proc1 := process(done, p) proc2 := process(done, p) res := merge(done, proc1, proc2) fmt.Println("goroutines: ", runtime.NumGoroutine()) fmt.Println(<-res) // 退出流水线 close(done) time.Sleep(time.Second) fmt.Println("goroutines: ", runtime.NumGoroutine()) }
结论
我们可以把流水线模型理解为一套并发的模式, 将一个任务拆分为不同的子任务「并发处理」, 最后将子任务的结果收集在一起输出。只不过在 go 语言中, 我们可以快速的创建 goroutine, 通过 channel 在 goroutine 之间进行通信。如果使用其他编程语言, 可能需要使用全局变量, 读写锁来实现这种模式。
欢迎添加微信公众号「洛小妍」交流学习