今天这篇文章主要是针对 Go channel 的重点分析,一开始写的时候以为范围不会太大,但洋洋洒洒还是写破了万字,成为了一篇覆盖面较广和有一定深度的长文分析。
大家觉得不错的话,欢迎关注煎鱼和三连一波 ✍️。
接下来和煎鱼一起正式开始 Go channel 的学习之旅!
go
Channel 自然而然就成为了 Go 语言开发者中必须要明白明了的一个 “东西” 了,更别提实际工程应用和日常面试了,属于必知必会。
本文目录:
什么是 channel
在 Go 语言中,channel 可以称其为通道,也可以叫管道。channel 主要常见于与 goroutine+select 搭配使用,再结合语录的描述。可以知道 channel 就是用于 goroutine 的数据通信:
演示代码如下:
func main() {
ch := make(chan string)
go func() {
ch <- "煎鱼"
}()
msg := <-ch
fmt.Println(msg)
}
chch
在此 channel 承载着一个衔接器的桥梁:
这也是 channel 的经典思想了,不要通过共享内存来通信,而是通过通信来实现内存共享(Do not communicate by sharing memory; instead, share memory by communicating)。
从模式上来看,其就是在多个 goroutine 借助 channel 来传输数据,实现了跨 goroutine 间的数据传输,多者独立运行,不需要强关联,更不影响对方的 goroutine 状态。不存在 goroutine1 对 goroutine2 进行直传的情况。
这里思考一个问题,那 goroutine1 和 goroutine2 又怎么互相知道自己的数据 ”到“ 了呢?
channel 基本特性
chan<-
chan Tchan <- T<- chan T
channel 中还分为 “无缓冲 channel” 和 “缓冲 channel”。
演示代码如下:
// 无缓冲
ch1 := make(chan int)
// 缓冲区为 3
ch2 := make(chan int, 3)
接下来我们进一步展开这两类来看。
无缓冲 channel
无缓冲的 channel(unbuffered channel),其缓冲区大小则默认为 0。在功能上其接受者会阻塞等待并阻塞应用程序,直至收到通信和接收到数据。
这种常用于两个 goroutine 间互相同步等待的应用场景:
unbuffered channel(via @William Kennedy)
缓冲 channel
有缓存的 channel(buffered channel),其缓存区大小是根据所设置的值来调整。在功能上,若缓冲区未满则不会阻塞,会源源不断的进行传输。当缓冲区满了后,发送者就会阻塞并等待。而当缓冲区为空时,接受者就会阻塞并等待,直至有新的数据:
buffered channel(via @William Kennedy)
在实际的应用场景中,两者根据业务情况选用就可以了,不需要太过纠结于两者是否有性能差距,没意义。
channel 本质
channel 听起来实现了一个非常酷的东西,也是日常工作中常常会被面试官问到的问题。
但其实 channel 并没有那么的 "神秘",就是一个环形队列的配合。
接下来我们一步步的剖开 channel,看看里面到底是什么,怎么实现的跨 goroutine 通信,数据结构又是什么,两者又如何实现数据传输的?
基本原理
mutex
channel 是一个有锁的环形队列:
数据结构
hchan 结构体是 channel 在运行时的具体表现形式:
// src/runtime/chan.go
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
-
qcount:队列中的元素总数量。
-
dataqsiz:循环队列的长度。
-
buf:指向长度为 dataqsiz 的底层数组,仅有当 channel 为缓冲型的才有意义。
-
elemsize:能够接受和发送的元素大小。
-
closed:是否关闭。
-
elemtype:能够接受和发送的元素类型。
-
sendx:已发送元素在循环队列中的索引位置。
-
recvx:已接收元素在循环队列中的索引位置。
-
recvq:接受者的 sudog 等待队列(缓冲区不足时阻塞等待的 goroutine)。
-
sendq:发送者的 sudog 等待队列。
recvqsendqruntime.waitq
type waitq struct {
first *sudog
last *sudog
}
firstlastruntime.sudog
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
...
}
-
g:指向当前的 goroutine。
-
next:指向下一个 g。
-
prev:指向上一个 g。
-
elem:数据元素,可能会指向堆栈。
sudog 是 Go 语言中用于存放协程状态为阻塞的 goroutine 的双向链表抽象,你可以直接理解为一个正在等待的 goroutine 就可以了。
在后续的实现原理分析中,基本围绕着上述数据结构进行大量的讨论,建议可以认真思考一下。
channel 实现原理
在了解了 channel 的基本原理后,我们进入到与应用工程中更紧密相关的部分,那就是 channel 的四大块操作,分别是:“创建、发送、接收、关闭”。
我们将针对这四块进行细致的分析和讲解。因此接下来的内容比较庞大,内容上将分为两个角度来讲述,分别是先从源码角度进行分析,再进行图示汇总。以便于大家更好的理解和思考
创建 chan
创建 channel 的演示代码:
ch := make(chan string)
runtime.makechanruntime.makechan64
// 通用创建方法
func makechan(t *chantype, size int) *hchan
// 类型为 int64 的进行特殊处理
func makechan64(t *chantype, size int64) *hchan
hchan
makechan
源码如下:
// src/runtime/chan.go
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
创建 channel 的逻辑主要分为三大块:
mallocgchchan
mallocgcclose
makechanhchanbuf
发送数据
channel 发送数据的演示代码:
go func() {
ch <- "煎鱼"
}()
runtime.chansend1
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
chansend
前置处理
在第一部分中,我们先看看 chan 发送的一些前置判断和处理:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
// 省略一些调试相关
...
}
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil
}
return c.qcount == c.dataqsiz
}
chansendgoparkpanic
紧接着会对非阻塞的 channel 进行一个上限判断,看看是否快速失败。
失败的场景如下:
-
若非阻塞且未关闭,同时底层数据 dataqsiz 大小为 0(缓冲区无元素),则会返回失败。。
-
若是 qcount 与 dataqsiz 大小相同(缓冲区已满)时,则会返回失败。
上互斥锁
在完成了 channel 的前置判断后,即将在进入发送数据的处理前,channel 会进行上锁:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
}
上锁后就能保住并发安全。另外我们也可以考虑到,这种场景会相对依赖单元测试的覆盖,因为一旦没考虑周全,漏上锁了,基本就会出问题。
直接发送
在正式开始发送前,加锁之后,会对 channel 进行一次状态判断(是否关闭):
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}
这种情况是最为基础的,也就是当前 channel 有正在阻塞等待的接收方,那么只需要直接发送就可以了。
缓冲发送
非直接发送,那么就考虑第二种场景,判断 channel 缓冲区中是否还有空间:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
}
qcountdataqsiz
chanbuftypedmemmove
至此针对缓冲区的数据操作完成。但若没有走进缓冲区处理的逻辑,则会判断当前是否阻塞 channel,若为非阻塞,将会解锁并直接返回失败。
配合图示如下:
阻塞发送
在进行了各式各样的层层筛选后,接下来进入阻塞等待发送的过程:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
}
getgacquireSudogsudogc.sendq.enqueuesudoggoparkKeepAlive
配合图示如下:
在当前 goroutine 被挂起后,其将会在 channel 能够发送数据后被唤醒:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 从这里开始唤醒,并恢复阻塞的发送操作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
唤醒 goroutine(调度器在停止 g 时会记录运行线程和方法内执行的位置)并完成 channel 的阻塞数据发送动作后。进行基本的参数检查,确保是符合要求的(纵深防御),接着开始取消 mysg 上的 channel 绑定和 sudog 的释放。
至此完成所有类别的 channel 数据发送管理。
接收数据
channel 接受数据的演示代码:
msg := <-ch
msg, ok := <-ch
runtime.chanrecv1runtime.chanrecv2runtime.chanrecv
需要注意,发送和接受 channel 是相对的,也就是其核心实现也是相对的。因此在理解时也可以结合来看。
前置处理
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
chanrecv
场景如下:
gopark
而接下来对于非阻塞模式的 channel 会进行快速失败检查,检测 channel 是否已经准备好接收。
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}
其分以下几种情况:
-
无缓冲区:循环队列为 0 及等待队列 sendq 内没有 goroutine 正在等待。
-
有缓冲区:缓冲区数组为空。
随后会对 channel 的 closed 状态进行判断,因为 channel 是无法重复打开的,需要确定当前 channel 是否为未关闭状态。再确定接收失败,返回。
ep
直接接收
当发现 channel 上有正在阻塞等待的发送方时,则直接进行接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
...
}
缓冲接收
当发现 channel 的缓冲区中有元素时:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
...
}
chanbufrecvxtypedmemmove
typedmemclr
阻塞接收
当发现 channel 上既没有待发送的 goroutine,缓冲区也没有数据时。将会进入到最后一个阶段阻塞接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
gopark
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 被唤醒后从此处开始
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
被唤醒后,将恢复现场,回到对应的执行点,完成最后的扫尾工作。
关闭 chan
close
close(ch)
closechan
func closechan(c *hchan)
前置处理
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}
基本检查和关闭标志设置,保证 channel 不为 nil 和未关闭,保证边界。
释放接收方
在完成了异常边界判断和标志设置后,会将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中:
func closechan(c *hchan) {
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
...
}
_Gwaiting
释放发送方
同样,与释放接收方一样。会将发送方也加入到到待清除队列 glist 中:
func closechan(c *hchan) {
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
...
}
协程调度
_Gwaiting_Grunnable
func closechan(c *hchan) {
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
后续所有的 goroutine 允许被重新调度后。若原本还在被动阻塞的发送方或接收方,将重获自由,后续该干嘛就去干嘛了,再跑回其所属的应用流程。
channel send/recv 分析
send
send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
sendDirectmsg := <-chchmsgsg.ggoready_Gwaiting_Grunnable
recv
recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
该方法在接受上分为两种情况,分别是直接接收和缓冲接收:
recvDirectsendDirectchanbufrecvxsendxrecvx
goready
总结
在本文中我们针对 Go 语言的 channel 进行了基本概念的分析和讲解,同时还针对 channel 的设计原理和四大操作(创建、发送、接收、关闭)进行了源码分析和图示分析。
初步看过一遍后,再翻看。不难发现,Go 的 channel 设计并不复杂,记住他的数据结构就是带缓存的环形队列,再加上对称的 sendq、recvq 等双向链表的辅助属性,就能勾画出 channel 的基本逻辑流转模型。
在具体的数据传输上,都是围绕着 “边界上下限处理,上互斥锁,阻塞/非阻塞,缓冲/非缓冲,缓存出队列,拷贝数据,解互斥锁,协程调度” 在不断地流转处理。在基本逻辑上也是相对重合的,因为发送和接收,创建和关闭总是相对的。
如果更进一步深入探讨,还可以围绕着 CSP 模型、goroutine 调度等进一步的思考和理解。这一块会在后续的章节中再一步展开。