go 语言可利用流水型模型提高并发能力, 充分利用计算机的性能。本文主要介绍流水线模型的搭建, 以及并发情况下如何优雅的退出。

什么是流水线模型

图片说明
流水线模型由多个阶段组成, 不同阶段通过 channel 进行通信, 每个阶段可由多个功能相同的 goroutine 构成。每个阶段的 goroutine 都有一下特征:

  • 从上游的 channle 中接收数据
  • 处理数据, 通常会产生新的数据
  • 通过 channel 处理后的数据送到下游
    除了开始阶段「只有输入」和结束阶段「只有输出」外, 其余每个阶段均会使用到发送 channel 和接收 channel.

使用流线求平方

下面通过一个计算切片中每个元素平方的例子来介绍一下流水线。开始阶段, 新启一个 goroutine 将切片数据输出到 channel 中

func producer(input ...int64) <-chan int64 {
   out := make(chan int64)

   go func() {
      // 记得关闭 channel, 不关闭会造成泄漏
      defer close(out)
      for _, k := range input {
         out <- k
      }
   }()

   return out
}

处理阶段, 从 channel 中读取数据, 对每个数据求平台, 输出到 channel 中

func process(input <-chan int64) <-chan int64 {
   out := make(chan int64)

   go func() {
      defer close(out)
      // 接收 channel 数据, 直到 channel 被关闭
      for v := range input {
         out <- v * v
      }
   }()

   return out
}

结束阶段, 将 channel 输出

func end(input <-chan int64) <-chan int64 {
   out := make(chan int64)

   go func() {
      defer close(out)
      // 接收 channel 数据, 直到 channel 被关闭
      for v := range input {
         out <- v
      }
   }()

   return out
}

使用 mian 函数进行调用

func main() {
   p := producer(1, 2, 3, 4, 5, 6)
   proc := process(p)
   e := end(proc)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
   for v := range e {
      fmt.Println(v)
   }
   time.Sleep(time.Second)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
}

扇入「Fan-In」和扇出「Fan-Out」

图片说明
在上节的例子中,每个阶段均创建了一个 goroutine 来接收, 处理和发送数据。在生产环境中, 多个 goroutine 可以从同一个 channel 中去数据进行消耗处理「扇出」, 还需要有一个 goroutine 能够将多个 channel 统一收集到一个 channel 中进行输出「扇入」。
在上节的例子中, 我们可以运行多次 process 函数来并发处理数据。同时, 新定一个一个函数 merge 用于接收多个 goroutine 输出的 channel

// 扇入
func merge(inputs ...<-chan int64) <-chan int64 {
   out := make(chan int64)
   var wg sync.WaitGroup
   wg.Add(len(inputs))

   for _, input := range inputs {
      // 并发接收多个 channel 数据, 扇入到一个 channel
      go func(input <-chan int64) {
         defer wg.Done()
         for v := range input {
            out <- v
         }
      }(input)
   }

   go func() {
      // 所有 channel 处理完之后关闭 out
      wg.Wait()
      close(out)
   }()

   return out
}

func main() {
   p := producer(1, 2, 3, 4, 5, 6)
   // 扇出, 多个 goroutine 处理同一个 channel
   proc1 := process(p)
   proc2 := process(p)
   // 扇入, 多个 channel 输出到一个 channel
   res := merge(proc1, proc2)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
   for v := range res {
      fmt.Println(v)
   }
   time.Sleep(time.Second)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
}

退出流水线

在上节的例子中, 如果 main 函数没有全部消费 res, 则流水线中的所有 channel 都无法被 close, 从而导致所有有的 goroutine 都无法退出, 造成资源泄漏。

func main() {
   p := producer(1, 2, 3, 4, 5, 6)
   proc1 := process(p)
   proc2 := process(p)
   res := merge(proc1, proc2)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
   fmt.Println(<-res)
   time.Sleep(time.Second)
   // goroutine 仍然为 7
   fmt.Println("goroutines: ", runtime.NumGoroutine())
}

当下有出现异常, 或者不在需要上游数据时, 需要使用一个 channel 来通知各个阶段的 goroutine, 关闭 channel, 退出 goroutine, 释放系统资源。在 main 函数中新建一个 channel, 用于和不同阶段的 goroutine 发送信号

func producer(done <-chan struct{}, input ...int64) <-chan int64 {
   out := make(chan int64)

   go func() {
      defer close(out)
      for _, k := range input {
         select {
         case out <- k:
         case <-done: // 退出
            return
         }
      }
   }()

   return out
}

func process(done <-chan struct{}, input <-chan int64) <-chan int64 {
   out := make(chan int64)

   go func() {
      defer close(out)
      // 接收 channel 数据, 直到 channel 被关闭
      for v := range input {
         select {
         case out <- v * v:
         case <-done:
            return
         }
      }
   }()

   return out
}

func merge(done <-chan struct{}, inputs ...<-chan int64) <-chan int64 {
   out := make(chan int64)
   var wg sync.WaitGroup
   wg.Add(len(inputs))

   for _, input := range inputs {
      // 并发接收多个 channel 数据, 扇入到一个 channel
      go func(input <-chan int64) {
         defer wg.Done()
         for v := range input {
            select {
            case out <- v:
            case <-done:
               return
            }
         }
      }(input)
   }

   go func() {
      // 所有 channel 处理完之后关闭 out
      wg.Wait()
      close(out)
   }()

   return out
}

func main() {
   done := make(chan struct{})
   //defer close(done)
   p := producer(done, 1, 2, 3, 4, 5, 6)
   proc1 := process(done, p)
   proc2 := process(done, p)
   res := merge(done, proc1, proc2)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
   fmt.Println(<-res)
   // 退出流水线
   close(done)
   time.Sleep(time.Second)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
}

结论

我们可以把流水线模型理解为一套并发的模式, 将一个任务拆分为不同的子任务「并发处理」, 最后将子任务的结果收集在一起输出。只不过在 go 语言中, 我们可以快速的创建 goroutine, 通过 channel 在 goroutine 之间进行通信。如果使用其他编程语言, 可能需要使用全局变量, 读写锁来实现这种模式。

欢迎添加微信公众号「洛小妍」交流学习