Go 语言中的管道 Channel 是一个非常有趣的数据结构,作为语言中一种核心的数据类型,多个 Goroutine 在进行通信时就会使用 Channel 作为中间的通信方式,我们在一节中要介绍的就是 Golang 中 Channel 的实现原理。

这一节中的内容总共包含四个部分,我们会先介绍 Channel 的设计原理以及它在 Go 语言中的数据结构,接下来我们会分析常见的 Channel 操作,例如创建、发送、接收和关闭的实现原理,由于在 Range[1] 和 Select 两节中我们会提到 Channel 在不同的控制结构中组合使用时的现象,所以这一节还是会将重点放到 Channel 的常见操作上。

概述

作为 Go 语言中核心的数据结构和 Goroutine 之间的通信方式,Channel 是支撑 Go 语言高性能并发编程模型的结构之一,我们首先需要了解 Channel 背后的设计原理以及它的底层数据结构。

设计原理

在 Go 语言中,一个最常见的也是经常被人提及的设计模式就是不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存,在很多主流的编程语言中,当我们想要并发执行一些代码时,我们往往都会在多个线程之间共享变量,同时为了解决线程冲突的问题,我们又需要在读写这些变量时加锁。

Go 语言对于并发编程的设计与上述这种共享内存的方式完全不同,虽然我们在 Golang 中也能使用共享内存加互斥锁来实现并发编程,但是与此同时,Go 语言也提供了一种不同的并发模型,也就是 CSP,即通信顺序进程(Communicating sequential processes),Goroutine 其实就是 CSP 中的实体,Channel 就是用于传递信息的通道,使用 CSP 并发模型的 Goroutine 就会通过 Channel 来传递消息。

上图中的两个 Goroutine,一个会负责向 Channel 中发送消息,另一个会负责从 Channel 中接收消息,它们两者并没有任何直接的关联,能够独立地工作和运行,但是间接地通过 Channel 完成了通信。

我们在这一节中不会展开介绍 Go 语言选择实现 CSP 模型的原因,也不会比较不同并发模型的优劣,而是会将重点放到与 Channel 本身更相关的内容上,有关并发模型和程序设计的其他问题,作者将在其他的文章中展开介绍。

数据结构

chan inthchan
type hchan struct {    qcount   uint    dataqsiz uint    buf      unsafe.Pointer    elemsize uint16    closed   uint32    elemtype *_type    sendx    uint      recvx    uint    recvq    waitq    sendq    waitq
   lock mutex}
hchanqcountdataqsizebufsendxrecvqcountdataqsizebufdataqsizsendxrecvx
elemsizeelemtypesendqrecvq
type waitq struct {    first *sudog    last  *sudog}
sudogsudog

基本操作

当我们在使用 Channel 时,能够执行的操作其实也就只有创建、发送、接收和关闭几种:

ch := make(chan int, 1)ch <- 1<-chclose(ch)

我们在这一节中要介绍的就是这四种不同的 Channel 操作,在 Go 语言中的实现原理,包括它们的 编译过程[2] 以及底层实际调用的方法和执行过程。

创建

makemakemake
make(chan int, 10)OMAKEmakeOMAKEOMAKECHAN
func typecheck1(n *Node, top int) (res *Node) {    switch n.Op {    case OMAKE:        // ...        switch t.Etype {        case TCHAN:            l = nil            if i < len(args) {                l = args[i]                i++                l = typecheck(l, ctxExpr)                l = defaultlit(l, types.Types[TINT])                if l.Type == nil {                    n.Type = nil                    return n                }                if !checkmake(t, "buffer", l) {                    n.Type = nil                    return n                }                n.Left = l            } else {                n.Left = nodintconst(0)            }            n.Op = OMAKECHAN        }    }
make
OMAKECHANmakechanmakechan64
func walkexpr(n *Node, init *Nodes) *Node {    switch n.Op {    case OMAKECHAN:        size := n.Left        fnname := "makechan64"        argtype := types.Types[TINT64]
       if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {            fnname = "makechan"            argtype = types.Types[TINT]        }
       n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))    }}
makechanmakechan64makechan
func makechan(t *chantype, size int) *hchan {    elem := t.elem    mem, _ := math.MulUintptr(elem.size, uintptr(size))
   var c *hchan    switch {    case mem == 0:        c = (*hchan)(mallocgc(hchanSize, nil, true))        c.buf = c.raceaddr()    case elem.kind&kindNoPointers != 0:        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)    default:        c = new(hchan)        c.buf = mallocgc(mem, elem, true)    }
   c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)
   return c}
hchan
hchanhchan
hchanelemsizeelemtypedataqsiz

发送

ch <- iOSENDOSENDchansend1
func walkexpr(n *Node, init *Nodes) *Node {    switch n.Op {    case OSEND:        n1 := n.Right        n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")        n1 = walkexpr(n1, init)        n1 = nod(OADDR, n1, nil)        n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)    }}
chansend1chansendchansendblock=truechansend
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    lock(&c.lock)
   if c.closed != 0 {        unlock(&c.lock)        panic(plainError("send on closed channel"))    }
closedpanic"send on closed channel"

直接发送

chansenddequeuerecvq
    if sg := c.recvq.dequeue(); sg != nil {        send(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true    }

我们可以从下面图中简单了解一下如果 Channel 中存在等待消息的 Goroutine 时,发送消息的处理过程:

sendsendDirectx = <-cx
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    if sg.elem != nil {        sendDirect(c.elemtype, sg, ep)        sg.elem = nil    }    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    goready(gp, skip+1)}
goreadyGrunnable

我们在这里可以简单总结一下执行的过程,当我们向 Channel 发送消息并且 Channel 中存在处于等待状态的 Goroutine 协程时,就会执行以下的过程:

sendDirectGrunnablerunnextrunnext
recvqsudogrunnext
package main
func main() { // breakpoint 1    ch := make(chan int)    go func() {        for i := range ch {            println(i) // breakpoint 2        }    }()
   ch <- 1
   wait := make(chan int) // breakpoint 3    <-wait}

我们在上述代码的三个地方打了断点,当使用 delve[7] 进行调试时其实就会发现执行的顺序其实是 1 -> 3 -> 2,具体的调试过程作者就不详细介绍了,感兴趣的读者可以学习一下 delve 的使用并亲自试验一下;总而言之,在这种情况下,向管道 Channel 中发送数据的过程并不是阻塞的。

缓冲区

向 Channel 中发送数据时遇到的第二种情况就是创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,在这时就会执行下面的这段代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    // ...    if c.qcount < c.dataqsiz {        qp := chanbuf(c, c.sendx)        typedmemmove(c.elemtype, qp, ep)        c.sendx++        if c.sendx == c.dataqsiz {            c.sendx = 0        }        c.qcount++        unlock(&c.lock)        return true    }
chanbuftypedmemmovesendxqcount
sendxsendxbufsendxdataqsiz

阻塞发送

block=falsefalse

在常见的场景中,向 Channel 发送消息的操作基本上都是阻塞的,在这时就会执行下面的代码,我们可以简单梳理一下这段代码的逻辑:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    // ...    if !block {        unlock(&c.lock)        return false    }
   gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    mysg.elem = ep    mysg.waitlink = nil    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.waiting = mysg    gp.param = nil    c.sendq.enqueue(mysg)    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   gp.waiting = nil    gp.param = nil    mysg.c = nil    releaseSudog(mysg)    return true}
getgacquireSudogsudogsendqsudoggoparkunlockGwaitinggoreadysudog
true

小结

ch <- i
recvqsendxsudogsendq
runnextsendqgoparkunlock

接收

分析了 Channel 发送数据的过程之后,我们就可以继续介绍数据处理的另一端,也就是数据的接收了,我们在 Go 语言中其实有两种不同的方式去接收管道中的数据:

i <- chi, ok <- ch
ORECVOAS2RECV
chanrecv1chanrecv2chanrecv
chanrecvgopark
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    if c == nil {        if !block {            return        }        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)        throw("unreachable")    }
   lock(&c.lock)
   if c.closed != 0 && c.qcount == 0 {        unlock(&c.lock)        if ep != nil {            typedmemclr(c.elemtype, ep)        }        return true, false    }
ep

直接接收

sendqsendrecv
    if sg := c.sendq.dequeue(); sg != nil {        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true, true    }
recvsend
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    if c.dataqsiz == 0 {        if ep != nil {            recvDirect(c.elemtype, sg, ep)        }    } else {        qp := chanbuf(c, c.recvx)        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }        typedmemmove(c.elemtype, qp, sg.elem)        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz    }    sg.elem = nil    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    goready(gp, skip+1)}
recvDirectsendqelemtypedmemmovegoreadyrunnext<-ch
sendq<-chsendqsudogsendx/recvx<-ch

缓冲区

<-chrecvx
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    // ...    if c.qcount > 0 {        qp := chanbuf(c, c.recvx)        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }        typedmemclr(c.elemtype, qp)        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }        c.qcount--        unlock(&c.lock)        return true, true    }
typedmemmove
recvxqcount

阻塞接收

sendq
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    // ...    if !block {        unlock(&c.lock)        return false, false    }
   gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    mysg.elem = ep    mysg.waitlink = nil    gp.waiting = mysg    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.param = nil    c.recvq.enqueue(mysg)    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
   gp.waiting = nil    closed := gp.param == nil    gp.param = nil    mysg.c = nil    releaseSudog(mysg)    return true, !closed}
selectblock=falsesudogrecvq
goparkunlockGwaitinggp.waiting = nil

小结

我们简单梳理一下从 Channel 中接收数据时的几种情况:

goparkchanrecvsendqrecvxsendqrecvxsudogrecvq

在从管道中接收数据的过程中,其实会在两个时间点触发 Goroutine 的调度,首先空的 Channel 意味着永远接收不到消息,那么就会直接挂起当前 Goroutine,第二个时间点是缓冲区中不存在数据,在这时也会直接挂起当前的 Goroutine 等待发送方发送数据。

关闭

closeOCLOSEclosechanpanic
func closechan(c *hchan) {    if c == nil {        panic(plainError("close of nil channel"))    }
   lock(&c.lock)    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("close of closed channel"))    }
recvqsendqgListsudog
    c.closed = 1
   var glist gList    for {        sg := c.recvq.dequeue()        if sg == nil {            break        }        if sg.elem != nil {            typedmemclr(c.elemtype, sg.elem)            sg.elem = nil        }        gp := sg.g        gp.param = nil        glist.push(gp)    }
   for {        sg := c.sendq.dequeue()        if sg == nil {            break        }        sg.elem = nil        gp := sg.g        gp.param = nil        glist.push(gp)    }    unlock(&c.lock)
   for !glist.empty() {        gp := glist.pop()        gp.schedlink = 0        goready(gp, 3)    }}
goreadygoready

总结

Channel 作为 Go 语言中的基础数据结构,是 Go 语言能够提供强大并发能力的原因之一,我们在这一节中其实只介绍了 Channel 的数据结构以及相关的基本操作,在后面的章节中会提到各种关键字是如何与 Channel 结合并且发挥作用的,除此之外在谈到并发编程和协程调度等内容时,我们还是会重新提及 Channel。