通道是什么?

通道 是用于协程间交流的通信载体。严格地来说,通道就是数据传输的管道,数据通过这根管道被“传入”或被“读出”。 因此协程可以发送数据到通道中,而另一个协程可以从该通道中读取数据。

声明一个通道

chan
cnil
:=
type of `c` is chan int
value of `c` is 0xc0420160c0

注意看通道C的值。它是一个指针内存地址。通道变量默认是一个指针。多数情况下,当你想要和一个协程沟通的时候,你可以给函数或者方法传递一个通道作为参数。当从协程接收到通道参数后,你不需要再对其进行解引用就可以从通道接收或者发送数据。

通道的读写操作

<-
c <- data
datacdatacdatac
<- c
cc
var data int
data = <- c

现在来自int类型通道c的数据就可以被存储到int类型变量data中。

:=
data := <- c

Go语言将根据通道C的数据类型来自动判断data变量的数据类型。

time.Sleepconcurrency chapter

通道实战

通道已经介绍差不多了,让我们结合协程来实战一下。

让我们一步步剖析下上面程序的执行步骤。

greetccmain startedgreetmain goroutinecJohngreetmain stopped

死锁

正如上面所说,当通道读写数据时,所在协程会阻塞并且调度控制权会转移到其他未阻塞的协程。

如果当前协程正在从一个没有任何值的通道中读取数据,那么当前协程会阻塞并且等待其他协程往此通道写入值。因此,读操作将被阻塞。类似的,如果你发送数据到一个通道,它将阻塞当前协程直到有其他协程从通道中读取数据。此时写操作将阻塞.

下面是一个主线程在进行通道操作的时候造成死锁的例子

上面的程序将抛出下面的运行时错误

main() started
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        program.Go:10 +0xfd
exit status 2

fatal error: all goroutines are asleep — deadlock!. 这是在说所有协程都进入休眠状态,没有协程是可被调度的。

☛ 关闭通道

val, ok := <- channeloktrueokfalseclose(channel)
c <- "John"greet<-cc<-ccmainclose(c)
for

☛ For循环

forfor{}
squares
val, ok := <-coksquaresclose(c)oktruevalokfalsebreak
main() started
0 true
1 true
4 true
9 true
16 true
25 true
36 true
49 true
64 true
81 true
0 false <-- loop broke!
main() stopped

当通道被关闭后我们在主线程中还可以读取0值数据。因为这个通道是用来传输int类型数据,默认情况下int的空值是0被返回。(注:从已经关闭的通道接收数据或者正在接收数据时,将会接收到通道类型的零值

for range
for val := range cfor{}range
main() started
0
1
4
9
16
25
36
49
64
81
main() stopped
for range

☛ 通道容量

make

当缓冲区参数不是0的时候。协程将不会阻塞除非缓冲区被填满。 当缓冲区满了之后,想要再往缓冲区发送数据只有等到有其他协程从缓冲区接收数据, (此时的发送协程是阻塞的). 有一点需要注意, 读缓冲区的操作是渴望式读取. 意味着一旦读操作开始它将读取缓冲区所有数据,直到缓冲区为空。原理上来说读操作的协程将不会阻塞直到缓冲区为空.

我们可以用以下语法定义有缓冲通道

c := make(chan Type, n)
Typen

上面说了当前协程不会阻塞直到缓冲区满了,让我们用代码实践一下

c33

让我们再加一个值

c <- 4squares

通道的长度和容量

lencap

你是否很疑惑上面的程序为什么没有发生死锁。这是因为这个c通道容量为3,但只盛放了2个数据。Go就不用去阻塞主线程去调度其他协程。 你也可以在主线程中去读取这些数据, 因为 虽然通道没有放满,也不会阻止你去从通道读取数据.

下面是一个很酷的例子

来测试一下自己的是否理解

for range

缓冲区通道像 Pythagoras Cup, 感兴趣的可以观看这个视频 Pythagoras Cup.

多协程协同工作

让我们写两个协程,一个用来计算数字的平方,另一个用来计算数字的立方。

下面来一步步梳理下程序的执行流程。

squarecubeccnumcsquareChancubeChansquarecubetestNumbsquareChan:=

下面是程序的运行结果

[main] main() started
[main] sent testNum to squareChan
[square] reading
[main] resuming
[main] sent testNum to cubeChan
[cube] reading
[main] resuming
[main] reading from channels
[main] sum of square and cube of 3  is 36
[main] main() stopped

☛ 单向通道

目前为止,我们已经学习到可以双向传递数据的通道,或者说,我们可以对通道做读操作写操作。但是事实上我们也可以创建单向通道。比如只读通道只允许读操作,只写通道只允许写操作。

单向通道也可以使用make函数创建,不过需要额外加一个箭头语法。

roc := make(<-chan int)
soc := make(chan<- int)
rocsoc

但是单向通道有什么作用呢?使用单向通道可以 提高程序的类型安全性, 使得程序不容易出错。

但是假如你在一个协程中只需要读操作某通道,但是在主线程中却需要读写操作这个通道该怎么办呢?

幸运的是Go提供了一个简单的语法去把双向通道转化为单向通道

greetc"invalid operation: roc <- "some text" (send to receive-only type <-chan string)"

☛ 匿名协程

goroutines

这是之前的例子

下面是我们用匿名协程修改后的例子

☛ 通道类型的通道

通道也是一种类型可以像其他类型一样被用在很多地方: 比如结构体元素,函数参数, 函数返回值,甚至是作为其他通道的数据传输类型. 下面我们了解下怎么使用通道作为通道的数据传输类型。

☛ Select

selectswitchcase

让我们来看下面的例子,讨论下其执行原理

selectswitch

如果所有的case语句(通道操作)被阻塞,那么select语句将阻塞直到这些case条件的一个不阻塞(通道操作),case块执行。如果有多个case块(通道操作)都没有阻塞,那么运行时将随机选择一个不阻塞的case块立即执行。

chan1chan2selectcase
selectservice1service2service13 秒chan1service25 秒chan2service1service2main

上述程序真实模拟了一个数百万请求的服务器负载均衡的例子,它从多个有效服务中返回其中一个响应。 使用协程,通道和select语句, 我们可以向多个服务器请求数据并获取其中最快响应的那个。

Sleep

以下是上面程序的运行结果 (你可能有不一样的结果)

main() started 0s
service2() started 481µs
Response from service 2 Hello from service 2 981.1µs
main() stopped 981.1µs

有时也会出现这样的结果

main() started 0s
service1() started 484.8µs
Response from service 1 Hello from service 1 984µs
main() stopped 984µs
chan1chan2

为了证明当所有case块都是非阻塞的时候,golang会随机选择一个代码块执行打印reaponse,我们使用缓冲通道来改造程序。

以下是上面程序的运行结果

main() started 0s
Response from chan2 Value 1 0s
main() stopped 1.0012ms

也可能是这种结果

main() started 0s
Response from chan1 Value 1 0s
main() stopped 1.0012ms

在上面的程序中,两个通道在其缓冲区中都有两个值。因为我们向容量为2的缓冲区通道分别发送了两个值, 所以这些通道发送操作不会阻塞并且会执行下面的select块。 select块中的所有case操作都不会阻塞,因为每个通道中都有两个值,而我们的case操作只需要取出其中一个值。因此,go运行时会随机选择一个case操作并执行其中的代码。

default
switchselectdefaultdefaultselect发送接收
selectdefault
defaultselectdefaultselect
defaultselecttime.Sleep

以下是上面程序的执行结果

main() started 0s
service1() started 0s
service2() started 0s
Response from service 1 Hello from service 1 3.0001805s
main() stopped 3.0001805s

有时可能是这种结果

main() started 0s
service1() started 0s
service2() started 0s
Response from service 2 Hello from service 2 3.0000957s
main() stopped 3.0000957s

Deadlock

defaultdefault
default
nil
nilnilselect
select (no cases)selectselect{}mainservicenilchan send (nil chan)default
casedefaultservicenil

☛ 添加超时操作

defaultdefaulttimeAfter

上面的程序在2秒后返回以下结果

main() started 0s
No response received 2.0010958s
main() stopped 2.0010958s
<-time.After(2 * time.Second)
10 * time.Secondservice1

☛ 空 select

for{}select{}selectselect{}case
selectmainservice
main() started
Hello from service!
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:
main.main()
        program.Go:16 +0xba
exit status 2

☛ WaitGroup

有一种业务场景是你需要知道所有的协程是否已执行完成他们的任务。这个和只需要随机选择一个条件为true的select不同,他需要你满足所有的条件都是 `true` 才可以激活主线程继续执行。 这里的 条件 指的是非阻塞的通道操作。

结构体

让我们来通过一个例子了解上面的概念

sync.WaitGroupwgnoCopystate1semaAddWaitDone
AdddeltaintAdd计数器
Wait计数器
Done
wgDonewgwg
forwg.Wait()Done

下面是程序运行结果

main() started
Service called on instance 2
Service called on instance 3
Service called on instance 1
main() stopped

因为协程执行顺序的不同,上面的结果可能和你会有出入。

Addint

☛ 工作池

工作池WaitGroup

所以工作池其实就是维护了多个工作协程,这些协程的功能是可以收到任务,执行任务并返回结果。他们完成任务后我们就可以收到结果。这些协程使用相同的通道来达到自己的目的。

让我们看下面一个例子:

下面我来一步步解释执行原理

sqrWorkertasksresultsidtasksresultstasksresultsqrWorkertasksresultsidtasksresultsresultsresultsfor rangeresultsresults
time.Sleep()

因为系统运行速度的不同,你可能得到的运行结果和我的不一样。因为即使全部工作协程都被阻塞的时间在微秒级别,主线程都有可能会得到重新调度的机会。

WaitGroup
wg.Wait()resultsresultswaitGroup

☛ Mutex

互斥是Go中一个简单的概念。在我解释它之前,先要明白什么是竞态条件。 goroutines 都有自己的独立的调用栈,因此他们之间不分享任何数据。但是有一种情况是数据存放在堆上,并且被多个goroutines使用。 多个goroutines试图去操作一个内存区域的数据会造成意想不到的后果.。下面我们展示一个简单的例子:

iWaitGroupwg.Wait()
value of i after 1000 operations is 937

是不是很惊讶,为什么得到的值会小于1000? 似乎有些goroutines 没有执行。事实上是我们的程序出现了竞态条件, 让我们看下可能会发生的事情,

i = i + 1
  • (1) 得到 i 的值
  • (2) 给 i 的值加1
  • (3) 更新 i 的值

让我们想象一下多个goroutines在这三个步骤之间调用会发生什么. 例如让我们从1000个goroutines中拿出两个举个例子,标明 G1 和 G2。

i1i
concurrencyi = i + 1

有关答案请点击这个链接 stackoverflow. 在任何情况下,都不应该依赖Go的调度算法,而应该实现自己的逻辑来同步不同的goroutine.

实现方法之一就是使用我们上面提到的互斥锁。互斥锁是一个编程概念,它保证了在同一时间只能有一个线程或者协程去操作同一个数据。当一个协程想要操作数据的时候,必须获取该数据的一个锁,操作完成后必须释放锁,如果没有获取到该数据的锁,那么就不能操作这个数据。

syncmutex.Lock()i = i + 1mutext.Unlock()

让我们使用互斥锁修改上面的例子

mim.Lock()m.Unlock()
value of i after 1000 operations is 1000

从上面的结果来看互斥锁可以帮助我们解决竞态条件。 但首要规则是避免goroutine之间共享资源.

Go run -race program.Go

并发应用场景

这有很多使用并发的方式可以简化我们的编程开发。下面介绍一些使用并发加速编程的技巧和概念。

生成器

使用通道,我们可以更好地实现生成器。如果生成器的计算开销很大,那么我们也可以并发生成数据。这样,生成数据的逻辑就不会阻塞主程序。比如生成斐波那契数列。

fibfib只读for rangefib

扇入 & 扇出

扇入是一种多路复用的策略,把几个通道的输入整合到一个输出的通道。扇出是一种多路分解策略,将单个通道的数据分散到多个通道。

package main

import (
    "fmt"
    "sync"
)
// 从切片中读取元素,写入intput channel
func getInputChan() <-chan int {
    //实例化容量为100,int类型的 input channel
    input := make(chan int, 100)

    // 用来写到通道的数据
    numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

    // 启动协程,把数字写到input channel
    go func() {
        for num := range numbers {
            input <- num
        }
        // 数据写入完毕,关闭channel
        close(input)
    }()

    return input
}

// 把从input channel 读到的数字,做平方运算,再写入output channel
func getSquareChan(input <-chan int) <-chan int {
    // 实例化容量为100的int类型的output channel
    output := make(chan int, 100)

    // 启动协程
    go func() {
        // 遍历input channel,把读到的数字,平方运算,写入output
        for num := range input {
            output <- num * num
        }

        //关闭output channel
        close(output)
    }()

    return output
}

//  返回 对`outputsChan` 通道合并之后的通道
//  这会产生扇入通道
// 这是一个可变参数的函数
func merge(outputsChan ...<-chan int) <-chan int {
    // 申明  WaitGroup
    var wg sync.WaitGroup

    // 实例化 merged 通道
    merged := make(chan int, 100)

    // 增加一个计数器,计数器的参数为outputsChan的长度
  // 因为我们将会创建多个goroutine,其中goroutine的数量就是要准备进行合并的通道的数量
    wg.Add(len(outputsChan))

    // 从sc channel读取数据,写入到merged 通道
    output := func(sc <-chan int) {
        // 遍历 
        for sqr := range sc {
            merged <- sqr
        }
     //一旦sc通道关闭,
    //在`WaitGroup`上调用`Done`来递减计数器
        wg.Done()
    }

    //把`output`函数运行为groutines,
    // 启动n个协程
    //其中n等于作为函数参数接收的通道数
    //这里我们在`outputsChan`上使用`for range`循环,因此无需手动告诉`n`
    for _, optChan := range outputsChan {
        go output(optChan)
    }

    // 一旦完成,运行goroutine关闭 merged 通道
    go func() {
        // 等到WaitGroup结束
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    //步骤1:获取输入数字通道
    //通过调用`getInputChan`函数,它运行一个goroutine,它将数字发送到返回的通道
    chanInputNums := getInputChan()

    //步骤2:对多个goroutine进行 `扇出` 平方操作
  // 这可以通过多次调用`getSquareChan`函数来完成,其中单个函数调用返回一个通道,该通道发送由`chanInputNums`通道提供的数字的平方
   //`getSquareChan`函数在内部运行goroutine,同时运行平方操作
    chanOptSqr1 := getSquareChan(chanInputNums)
    chanOptSqr2 := getSquareChan(chanInputNums)

    //步骤3:扇入(合并)`chanOptSqr1`和`chanOptSqr2`输出到合并频道
    // 这是通过调用`merge`函数实现的,该函数将多个通道作为参数
    // 并使用`WaitGroup`和多个goroutines来接收平方数,我们可以发送平方数
    //到 `merged` 通道,并关闭
    chanMergedSqr := merge(chanOptSqr1, chanOptSqr2)

    //步骤4:计算0到9之间的所有整数的平方再求和,大约是'285`
    //这是通过在`chanMergedSqr`上使用`for range`循环来完成的
    sqrSum := 0

    //运行直到`chanMergedSqr`或合并频道关闭
    //当所有goroutines推送到合并频道完成时,在`merge`函数中发生
    for num := range chanMergedSqr {
        sqrSum += num
    }

    //步骤5:当`for循环'完成执行时,在`chanMergedSqr`通道关闭之后打印总和
    fmt.Println("Sum of squares between 0-9 is", sqrSum)
}

我不打算解释上述程序是如何工作的,因为我在程序中添加了注释来解释这一点。 以上程序产生以下结果

Sum of squares between 0-9 is 285