点击上方蓝色“ Go语言中文网 ”关注, 每天一起学 Go

欢迎来到 Golang 系列教程[1]的第 23 篇。

什么是缓冲信道?

在上一教程里,我们讨论的主要是无缓冲信道。我们在 Channel 的教程里详细讨论了,无缓冲信道的发送和接收过程是阻塞的。

我们还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。

make
ch := make(chan type, capacity)
capacity

我们开始编写代码,创建一个缓冲信道。

示例一

package main

import (
 "fmt"
)


func main() {
 ch := make(chan string, 2)
 ch "naveen"
 ch "paul"
 fmt.Println( fmt.Println(}

在线运行程序[2]

在上面程序里的第 9 行,我们创建了一个缓冲信道,其容量为 2。由于该信道的容量为 2,因此可向它写入两个字符串,而且不会发生阻塞。在第 10 行和第 11 行,我们向信道写入两个字符串,该信道并没有发生阻塞。我们又在第 12 行和第 13 行分别读取了这两个字符串。该程序输出:

naveen
paul

示例二

我们再看一个缓冲信道的示例,其中有一个并发的 Go 协程来向信道写入数据,而 Go 主协程负责读取数据。该示例帮助我们进一步理解,在向缓冲信道写入数据时,什么时候会发生阻塞。

package main

import (
 "fmt"
 "time"
)

func write(ch chan int) {
 for i := 0; i 5; i++ {
  ch   fmt.Println("successfully wrote", i, "to ch")
 }
 close(ch)
}
func main() {
 ch := make(chan int, 2)
 go write(ch)
 time.Sleep(2 * time.Second)
 for v := range ch {
  fmt.Println("read value", v,"from ch")
  time.Sleep(2 * time.Second)

 }
}

在线运行程序[3]

chchwritewritewritechwritechch
successfully wrote 0 to ch
successfully wrote 1 to ch
writechchchch
read value 0 from ch
successfully wrote 2 to ch
write
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch

死锁

package main

import (
 "fmt"
)

func main() {
 ch := make(chan string, 2)
 ch "naveen"
 ch "paul"
 ch "steve"
 fmt.Println( fmt.Println(}

在线运行程序[4]

在上面程序里,我们向容量为 2 的缓冲信道写入 3 个字符串。当在程序控制到达第 3 次写入时(第 11 行),由于它超出了信道的容量,因此这次写入发生了阻塞。现在想要这次写操作能够进行下去,必须要有其它协程来读取这个信道的数据。但在本例中,并没有并发协程来读取这个信道,因此这里会发生死锁(deadlock)。程序会在运行时触发 panic,信息如下:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
 /tmp/sandbox274756028/main.go:11 +0x100

长度 vs 容量

make

缓冲信道的长度是指信道中当前排队的元素个数。

代码可以把一切解释得很清楚。:)

package main

import (
 "fmt"
)

func main() {
 ch := make(chan string, 3)
 ch "naveen"
 ch "paul"
 fmt.Println("capacity is", cap(ch))
 fmt.Println("length is", len(ch))
 fmt.Println("read value",  fmt.Println("new length is", len(ch))
}

在线运行程序[5]

在上面的程序里,我们创建了一个容量为 3 的信道,于是它可以保存 3 个字符串。接下来,我们分别在第 9 行和第 10 行向信道写入了两个字符串。于是信道有两个字符串排队,因此其长度为 2。在第 13 行,我们又从信道读取了一个字符串。现在该信道内只有一个字符串,因此其长度变为 1。该程序会输出:

capacity is 3
length is 2
read value naveen
new length is 1

WaitGroup

WaitGroupWaitGroup
WaitGroupWaitGroup

理论说完了,我们编写点儿代码吧。:)

package main

import (
 "fmt"
 "sync"
 "time"
)

func process(i int, wg *sync.WaitGroup) {
 fmt.Println("started Goroutine ", i)
 time.Sleep(2 * time.Second)
 fmt.Printf("Goroutine %d ended\n", i)
 wg.Done()
}

func main() {
 no := 3
 var wg sync.WaitGroup
 for i := 0; i   wg.Add(1)
  go process(i, &wg)
 }
 wg.Wait()
 fmt.Println("All go routines finished executing")
}

在线运行程序[6]

WaitGroupWaitGroupWaitGroupAddintWaitGroupAddWaitGroupDone()Wait()
wg.Add(1)processwg.Wait()processwg.Donewg.Done()
wgwgWaitGroupmain

该程序输出:

started Goroutine  2
started Goroutine  0
started Goroutine  1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing

由于 Go 协程的执行顺序不一定,因此你的输出可能和我不一样。:)

工作池的实现

缓冲信道的重要应用之一就是实现工作池[8]

一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。

我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。

我们工作池的核心功能如下:

  • 创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。
  • 将作业添加到该输入型缓冲信道中。
  • 作业完成后,再将结果写入一个输出型缓冲信道。
  • 从输出型缓冲信道读取并打印结果。

我们会逐步编写这个程序,让代码易于理解。

第一步就是创建一个结构体,表示作业和结果。

type Job struct {
 id       int
 randomno int
}
type Result struct {
 job         Job
 sumofdigits int
}
Jobidrandomnorandomno
Resultjobsumofdigits

第二步是分别创建用于接收作业和写入结果的缓冲信道。

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
jobsresults
digitsdigits
func digits(number int) int {
 sum := 0
 no := number
 for no != 0 {
  digit := no % 10
  sum += digit
  no /= 10
 }
 time.Sleep(2 * time.Second)
 return sum
}

然后,我们写一个创建工作协程的函数。

func worker(wg *sync.WaitGroup) {
 for job := range jobs {
  output := Result{job, digits(job.randomno)}
  results  }
 wg.Done()
}
jobsjobdigitsResultresultsworkerWaitGroupwgjobsDone()
createWorkerPool
func createWorkerPool(noOfWorkers int) {
 var wg sync.WaitGroup
 for i := 0; i   wg.Add(1)
  go worker(&wg)
 }
 wg.Wait()
 close(results)
}
wg.Add(1)WaitGroupworkerwgwg.Wait()resultsresults

现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者。

func allocate(noOfJobs int) {
 for i := 0; i   randomno := rand.Intn(999)
  job := Job{i, randomno}
  jobs  }
 close(jobs)
}
allocateJobijobsjobjobs
results
func result(done chan bool) {
 for result := range results {
  fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
 }
 done true
}
resultresultsjobidresultdonedone
main()
func main() {
 startTime := time.Now()
 noOfJobs := 100
 go allocate(noOfJobs)
 done := make(chan bool)
 go result(done)
 noOfWorkers := 10
 createWorkerPool(noOfWorkers)
  endTime := time.Now()
 diff := endTime.Sub(startTime)
 fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
mainendTimestartTime
noOfJobsallocatejobs
doneresult
createWorkerPoolmaindone

为了便于参考,下面是整个程序。我还引用了必要的包。

package main

import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)

type Job struct {
 id       int
 randomno int
}
type Result struct {
 job         Job
 sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
 sum := 0
 no := number
 for no != 0 {
  digit := no % 10
  sum += digit
  no /= 10
 }
 time.Sleep(2 * time.Second)
 return sum
}
func worker(wg *sync.WaitGroup) {
 for job := range jobs {
  output := Result{job, digits(job.randomno)}
  results  }
 wg.Done()
}
func createWorkerPool(noOfWorkers int) {
 var wg sync.WaitGroup
 for i := 0; i   wg.Add(1)
  go worker(&wg)
 }
 wg.Wait()
 close(results)
}
func allocate(noOfJobs int) {
 for i := 0; i   randomno := rand.Intn(999)
  job := Job{i, randomno}
  jobs  }
 close(jobs)
}
func result(done chan bool) {
 for result := range results {
  fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
 }
 done true
}
func main() {
 startTime := time.Now()
 noOfJobs := 100
 go allocate(noOfJobs)
 done := make(chan bool)
 go result(done)
 noOfWorkers := 10
 createWorkerPool(noOfWorkers)
  endTime := time.Now()
 diff := endTime.Sub(startTime)
 fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

在线运行程序[9]

为了更精确地计算总时间,请在你的本地机器上运行该程序。

该程序输出:

Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken  20.01081009 seconds

程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。你的输出会和我的不同,因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。在我的例子中,运行程序大约花费了 20 秒。

mainnoOfWorkers
...
total time taken  10.004364685 seconds
mainnoOfJobsnoOfWorkers

本教程到此结束。祝你愉快。

上一教程 - 信道

下一教程 - Select[10]

推荐阅读

  • Go 经典入门系列 22:Channel

福利 我为大家整理了一份 从入门到进阶的Go学习资料礼包 ,包含学习建议:入门看什么,进阶看什么。 关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

bb3a3a679049f2080c354e334a7a11d6.png