在上一篇文章 《Go 并发:Channel 通道》中,我们讨论了通道的一些基本特性,需要指出的是,之前讨论的通道都是不带缓冲的通道,也就是说从通道接收数据和往通道发送数据都会阻塞,直到有其他 goroutine 往通道发送数据或从通道接收数据。
在这篇文章中,我们继续讨论带缓冲的通道的使用。
带缓冲的通道
可以创建带缓冲的通道。往通道发送数据时,只有当缓冲满时才会阻塞。同理,从通道接收数据时,只有当缓冲为空时才会阻塞。
make
ch := make(chan type, capacity)
capacitycapacitycapacity
例子一
package main
import "fmt"
func main() {
ch := make(chan string, 2)
ch <- "lee"
ch <- "leo"
fmt.Println(<-ch)
fmt.Println(<-ch)
}
ch := make(chan string, 2)
lee
leo
例子二
接着我们来实现一个更复杂点的例子。
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("wrote ", i, "to ch")
}
close(ch)
}
func main() {
ch := make(chan int, 2)
go write(ch)
time.Sleep(2 * time.Second)
for v := range ch {
fmt.Println("read value", v, "from ch")
time.Sleep(2 * time.Second)
}
}
writewritewritewrite
wrote 0 to ch
wrote 1 to ch
writechwrite
read value 0 from ch
wrote 2 to ch
write
wrote 0 to ch
wrote 1 to ch
read value 0 from ch
wrote 2 to ch
wrote 3 to ch
read value 1 from ch
read value 2 from ch
wrote 4 to ch
read value 3 from ch
read value 4 from ch
死锁
带缓冲的通道同样会发生死锁的情况。例如以下代码:
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "leo"
ch <- "lee"
ch <- "hao"
fmt.Println(<-ch)
fmt.Println(<-ch)
}
运行程序,将会发生报错:
fatal error: all goroutines are asleep - deadlock!
在上面的程序中,我们往一个缓冲大小为 2 的通道连续发送了3 个字符串数据。由于通道缓冲大小为 2,且不存其他 goroutine 从该通道接收数据,故程序会出现死锁的问题。
关闭带缓冲的通道
在上一篇文章 《Go 并发:Channel 通道》中,我们讨论了关闭通道的操作。同样地,我们可以对带缓冲的通道进行关闭操作。
当关闭带缓冲的通道后,可以继续从该通道接收数据,直到缓冲的数据读取完毕,这时,通道将会返回“零值”。
以程序来说明这一过程:
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 5)
ch <- 5
ch <- 6
close(ch)
n, open := <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
n, open = <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
n, open = <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
}
在上面的程序中,我们创建了一个带缓冲的通道,并往该通道发送数字 5 和 6 ,然后关闭通道。关闭通道后,可以继续从该通道接收数据,当接收到数字 5 和 6 后,最后接收到数字 0 ,即该整型通道的零值。程序输出:
Received: 5, open: true
Received: 6, open: true
Received: 0, open: false
同样可以使用 for range 循环来改写上面的程序:
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 5)
ch <- 5
ch <- 6
close(ch)
for n := range ch {
fmt.Println("Received:", n)
}
}
程序输出:
Received: 5
Received: 6
长度和容量
make
带缓冲通道的长度是指缓冲中实际数据的数量。
说得有点拗,通过程序来说明长度和容量的区别。
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch <- "leo"
ch <- "lee"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
}
运行程序,得到输出:
capacity is 3
length is 2
read value leo
new length is 1
在创建通道时,我们指定了缓冲的容量大小为 3,所以容量输出为 3。由于往通道中发送了两个数据,所以长度输出为 2。从通道接收一个数据后,这时通道的缓冲长度为 1。
WaitGroup
WaitGroupWaitGroup
WaitGroupWaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
WaitGroupWaitGroupAddDoneWait
WaitGroupwgprocessprocessAddwgprocessDonewg
wg.Wait()process
started Goroutine 1
started Goroutine 0
started Goroutine 2
Goroutine 2 ended
Goroutine 0 ended
Goroutine 1 ended
All go routines finished executing
工作线程池实现
最后我们来看下如何实现一个工作线程池(worker pool)。工作线程池是一组用来执行任务的线程的集合,这些线程可以不断执行任务队列的任务,一旦执行完毕一个任务,马上可以执行另一个任务。
与传统线程池不同的是,我们使用带缓冲的通道与 goroutine 来实现工作线程池。其执行的任务是计算一个数包含的所有数字的和。例如,数 234 的输出为 (2 + 3 + 4)= 9。
下面的工作线程池需要实现的功能:
- 创建一个 goroutine 池,这些 goroutine 监听任务通道,以等待任务的分配
- 往工作任务通道发送工作任务
- 任务执行完毕将结果发送至存放结果的通道
- 从存放结果的通道接收结果,并打印出来
完整的源代码如下:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
jobsresults
allocatejobsresultdonecreateWorkerPoolworkerworkerresultsWaitGroupreslutsresultdone
运行程序,得到输出:
Job id 1, input random no 636 , sum of digits 15
Job id 9, input random no 150 , sum of digits 6
Job id 2, input random no 407 , sum of digits 11
Job id 3, input random no 983 , sum of digits 20
Job id 4, input random no 895 , sum of digits 22
...
total time taken 20.009264 seconds
即 100 个任务,每个执行 2 秒,由于工作线程数量为 10 ,每次可同时执行 10 个任务,实际总执行时间为 20 秒。
如果将工作线程数量调整为 20 ,可以看到总执行时间为 10 秒左右。
参考资料
- https://golangbot.com/buffered-channels-worker-pools/