go 语言可利用流水型模型提高并发能力, 充分利用计算机的性能。本文主要介绍流水线模型的搭建, 以及并发情况下如何优雅的退出。
什么是流水线模型

流水线模型由多个阶段组成, 不同阶段通过 channel 进行通信, 每个阶段可由多个功能相同的 goroutine 构成。每个阶段的 goroutine 都有一下特征:
- 从上游的 channle 中接收数据
- 处理数据, 通常会产生新的数据
- 通过 channel 处理后的数据送到下游
除了开始阶段「只有输入」和结束阶段「只有输出」外, 其余每个阶段均会使用到发送 channel 和接收 channel.
使用流线求平方
下面通过一个计算切片中每个元素平方的例子来介绍一下流水线。开始阶段, 新启一个 goroutine 将切片数据输出到 channel 中
func producer(input ...int64) <-chan int64 {
out := make(chan int64)
go func() {
// 记得关闭 channel, 不关闭会造成泄漏
defer close(out)
for _, k := range input {
out <- k
}
}()
return out
}处理阶段, 从 channel 中读取数据, 对每个数据求平台, 输出到 channel 中
func process(input <-chan int64) <-chan int64 {
out := make(chan int64)
go func() {
defer close(out)
// 接收 channel 数据, 直到 channel 被关闭
for v := range input {
out <- v * v
}
}()
return out
}结束阶段, 将 channel 输出
func end(input <-chan int64) <-chan int64 {
out := make(chan int64)
go func() {
defer close(out)
// 接收 channel 数据, 直到 channel 被关闭
for v := range input {
out <- v
}
}()
return out
}使用 mian 函数进行调用
func main() {
p := producer(1, 2, 3, 4, 5, 6)
proc := process(p)
e := end(proc)
fmt.Println("goroutines: ", runtime.NumGoroutine())
for v := range e {
fmt.Println(v)
}
time.Sleep(time.Second)
fmt.Println("goroutines: ", runtime.NumGoroutine())
}扇入「Fan-In」和扇出「Fan-Out」

在上节的例子中,每个阶段均创建了一个 goroutine 来接收, 处理和发送数据。在生产环境中, 多个 goroutine 可以从同一个 channel 中去数据进行消耗处理「扇出」, 还需要有一个 goroutine 能够将多个 channel 统一收集到一个 channel 中进行输出「扇入」。
在上节的例子中, 我们可以运行多次 process 函数来并发处理数据。同时, 新定一个一个函数 merge 用于接收多个 goroutine 输出的 channel
// 扇入
func merge(inputs ...<-chan int64) <-chan int64 {
out := make(chan int64)
var wg sync.WaitGroup
wg.Add(len(inputs))
for _, input := range inputs {
// 并发接收多个 channel 数据, 扇入到一个 channel
go func(input <-chan int64) {
defer wg.Done()
for v := range input {
out <- v
}
}(input)
}
go func() {
// 所有 channel 处理完之后关闭 out
wg.Wait()
close(out)
}()
return out
}
func main() {
p := producer(1, 2, 3, 4, 5, 6)
// 扇出, 多个 goroutine 处理同一个 channel
proc1 := process(p)
proc2 := process(p)
// 扇入, 多个 channel 输出到一个 channel
res := merge(proc1, proc2)
fmt.Println("goroutines: ", runtime.NumGoroutine())
for v := range res {
fmt.Println(v)
}
time.Sleep(time.Second)
fmt.Println("goroutines: ", runtime.NumGoroutine())
}退出流水线
在上节的例子中, 如果 main 函数没有全部消费 res, 则流水线中的所有 channel 都无法被 close, 从而导致所有有的 goroutine 都无法退出, 造成资源泄漏。
func main() {
p := producer(1, 2, 3, 4, 5, 6)
proc1 := process(p)
proc2 := process(p)
res := merge(proc1, proc2)
fmt.Println("goroutines: ", runtime.NumGoroutine())
fmt.Println(<-res)
time.Sleep(time.Second)
// goroutine 仍然为 7
fmt.Println("goroutines: ", runtime.NumGoroutine())
}当下有出现异常, 或者不在需要上游数据时, 需要使用一个 channel 来通知各个阶段的 goroutine, 关闭 channel, 退出 goroutine, 释放系统资源。在 main 函数中新建一个 channel, 用于和不同阶段的 goroutine 发送信号
func producer(done <-chan struct{}, input ...int64) <-chan int64 {
out := make(chan int64)
go func() {
defer close(out)
for _, k := range input {
select {
case out <- k:
case <-done: // 退出
return
}
}
}()
return out
}
func process(done <-chan struct{}, input <-chan int64) <-chan int64 {
out := make(chan int64)
go func() {
defer close(out)
// 接收 channel 数据, 直到 channel 被关闭
for v := range input {
select {
case out <- v * v:
case <-done:
return
}
}
}()
return out
}
func merge(done <-chan struct{}, inputs ...<-chan int64) <-chan int64 {
out := make(chan int64)
var wg sync.WaitGroup
wg.Add(len(inputs))
for _, input := range inputs {
// 并发接收多个 channel 数据, 扇入到一个 channel
go func(input <-chan int64) {
defer wg.Done()
for v := range input {
select {
case out <- v:
case <-done:
return
}
}
}(input)
}
go func() {
// 所有 channel 处理完之后关闭 out
wg.Wait()
close(out)
}()
return out
}
func main() {
done := make(chan struct{})
//defer close(done)
p := producer(done, 1, 2, 3, 4, 5, 6)
proc1 := process(done, p)
proc2 := process(done, p)
res := merge(done, proc1, proc2)
fmt.Println("goroutines: ", runtime.NumGoroutine())
fmt.Println(<-res)
// 退出流水线
close(done)
time.Sleep(time.Second)
fmt.Println("goroutines: ", runtime.NumGoroutine())
}结论
我们可以把流水线模型理解为一套并发的模式, 将一个任务拆分为不同的子任务「并发处理」, 最后将子任务的结果收集在一起输出。只不过在 go 语言中, 我们可以快速的创建 goroutine, 通过 channel 在 goroutine 之间进行通信。如果使用其他编程语言, 可能需要使用全局变量, 读写锁来实现这种模式。
欢迎添加微信公众号「洛小妍」交流学习