Golang中channel的优雅设计

1. 前言

这篇文章将会介绍Golang并发编程CSP模型中最核心的数据结构及其实现,即channel。若你对协程/线程以及多线程编程没有任何了解,则不建议阅读本篇文章。

2. 简介

Channel在Golang中常被用为Goroutine间的通信工具,当然Golang也能通过共享内存加互斥锁的方式来通信。但是在Go中常提及的设计模式是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。这句话有点绕,让我们来看一个例子,这个例子也经常会被用来考量一个Golang工程师对协程间通信的数量度。

如何使用两个协程有序打印0-9,即“0123456789”?想一想,然后我们直接看代码。

const (
    LOOPS = 10
)

func main() {
    chan1 := make(chan int, 1)
    chan2 := make(chan int, 1)
    chan3 := make(chan struct{})
	chan1 <- 1
    //打印协程A
    go func() {
        for i := 0; i < LOOPS; i += 2 {
            <- chan1
            fmt.Println(i)
            chan2 <- 1
        }
    }()
    //打印协程B
    go func() {
        for i := 1; i < LOOPS; i += 2 {
            <-chan2
            fmt.Println(i)
            chan1 <- 1
        }
        chan3 <- struct{}{}
    }()

    <-chan3
}

这段代码使用了三个channel:

  1. chan1和chan2,用来控制交替打印的顺序
  2. chan3,阻塞主协程,等待两个打印协程完成工作

以上就是channel的一个用法,其中设计了多协程以及协程中的通信,接下来我们看看channel的数据结构是怎样的。

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    lock mutex
}
  • qcount —— channel中存在的元素个数
  • dataqsiz —— 循环队列的大小
  • buf —— 实际存放元素的数组指针
  • closed —— channel关闭标志,1为关闭
  • sendx —— channel发送处理到的位置
  • recvx —— channel接受处理到的位置
  • recvq,sendq —— 阻塞在此channel的goroutine

3. 创建Channel

在Go中通常使用make来创建一个管道,如代码所示:

chan1 := make(chan int) //无缓冲
chan2 := make(chan int, 10) //有缓冲

在编译期会被转化为runtime.makechan, 缓冲大小将作为参数传入,若是无缓冲,则参数中的size为0 :

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }


    var c *hchan
    switch {
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

makechan的逻辑也比较直白: 1. 计算所需的缓冲区内存空间mem,判断是否溢出 2. 如果缓冲区大小为0,则只为hchan分配大小为hchanSize的空间,hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1));如果缓冲区不为0,且元素类型不为指针,则分配一段连续的空间;否则分别为缓冲区和hchan分配空间。 3. 设置hchan的其他参数。

4. 向channel发送数据

chan <-
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

chansend1中调用了chansend函数,我们分成几段来看:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

}

chansend参数中c为待操作channel,ep为待插入元素,block用来表示此操作是否允许阻塞,若不允许阻塞,则一旦不满足插入条件则立马返回false,例如缓冲区已满。这段代码中,对于关闭的channel,插入时会直接panic退出。

if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

若在recvq队列中有等待接收的goroutine,则使用send函数将元素直接发送至等待goroutine中,例如我们在开头给出的示例,则是通过这种方式。send函数如下:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

在send中,会使用sendDirect将元素ep直接拷贝至等待队列sq的插入位置,并且使用goready将goroutine标记为runnable可运行状态,加入调度。回到chansend:

if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

当recvq中没有等待接收的goroutine且缓冲区未填满时,通过chanbuf计算出下一个接收位置qp,并将元素拷贝至qp,同时调整sendx以及qcount。

gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    KeepAlive(ep)

    // 被唤醒
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    if closed {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    return true

当recvq没有goroutine等待接收且缓冲区已填满时,send操作进入阻塞过程: 1. 使用getg()获得当前goroutine 2. 使用acquireSudog()获得sudog结构,并且填充相关字段,例如插入元素,当前goroutine,channel等。 3. 将2中的sudog插入等待接收队列,并使用gopark()将此goroutine改为沉睡,等待唤醒。 4. 当goroutine被唤醒后,清除之前阻塞设置的状态。

到此,channel就完成了一次插入操作,让我们总结一下send操作可能进入分支: 1. 由于channel关闭,或者block字段导致send操作直接失败。 2. 当recvq存在等待接收的goroutine,则直接发送至此goroutine并且唤醒之。 3. recvq不存在值且缓冲区未填满,则插入缓冲区buf。 4. recvq不存在且缓冲区被填满,则插入sendq并且阻塞当前goroutine。

5. 从channel接收数据

在Go中可以通过两种方式接收数据:

value <- ch
value, ok <- ch

这两种方法分别被转换成runtime.chanrecv1和runtime.chanrecv2,而chanrecv1和chanrecv2最终都是调用runtime.chanrecv。在实现上,chanrecv与chansend非常相似,先判断有无等待中的goroutine,再判断缓存是否被填满,由此决定chanrecv的操作: 1. 从等待发送的Goroutine上接收数据; 2. 从缓冲区接收数据; 3. 等待其他Goroutine向channel发送数据;

从等待发送的Goroutine上接收数据:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
value, ok <- ch_, ok <- ch
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx 
    }
    ...
    goready(gp, skip+1)
}

根据缓冲区的情况做不同的处理: - 缓冲区大小为0: 通过recvDirect直接将元素拷贝至目的地址。 - 缓冲区大小不为0:将发送队列头的数据拷贝至发送方,并且将发送方加入调度。

从缓冲区接收数据:

if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

计算出接收位置qp,若ep不为空则拷贝一份至ep。紧接着清空qp位置内容,修改recvx指向位置。

等待其他Goroutine向channel发送数据:

gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

若以上条件都不满足,则将此goroutine加入等待接收队列并且阻塞,等待唤醒。

可以看到,chanrecv的逻辑与chansend有一定的相似性,理解了chansend之后,chanrecv就变得非常容易阅读。

6. 关闭channel

在Go中可以通过close来关闭channel,虽然通常我们不建议这么做。在close channel之后,会唤醒并释放所有的等待接收者和等待发送者。