定义

不要通过共享内存来通信,而是通过通信来实现内存共享

多个 goroutine 借助 channel 来传输数据,实现了跨 goroutine 间的数据传输,多者独立运行,不需要强关联,更不影响对方的 goroutine 状态。不存在 goroutine1 对 goroutine2 进行直传的情况。

channel 基本特性

分类

chan Tchan <- T<- chan T

channel 中还分为 “无缓冲 channel” 和 “缓冲 channel”。

等待队列

从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。
向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

被阻塞的goroutine将会挂在channel的等待队列中:

  • 因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
  • 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;
数据结构

channel 图解

hchan 结构体

// src/runtime/chan.go
type hchan struct {
 qcount   uint      
 dataqsiz uint     
 buf      unsafe.Pointer 
 elemsize uint16
 closed   uint32
 elemtype *_type 
 sendx    uint  
 recvx    uint  
 recvq    waitq  
 sendq    waitq  

 lock mutex
}

/*
- qcount:队列中的元素总数量。
- dataqsiz:循环队列的长度。
- buf:指向长度为 dataqsiz 的底层数组,仅有当 channel 为缓冲型的才有意义。
- elemsize:能够接受和发送的元素大小。
- closed:是否关闭。
- elemtype:能够接受和发送的元素类型。
- sendx:已发送元素在循环队列中的索引位置。
- recvx:已接收元素在循环队列中的索引位置。
- recvq:接受者的 sudog 等待队列(缓冲区不足时阻塞等待的 goroutine)。
- sendq:发送者的 sudog 等待队列。
*/
recvqsendqruntime.waitq
type waitq struct {
 first *sudog
 last  *sudog
}

// 无论是 first 属性又或是 last,其类型都为 runtime.sudog 结构体
type sudog struct {
 g *g

 next *sudog
 prev *sudog
 elem unsafe.Pointer
 ...
}

/*
g:指向当前的 goroutine。
next:指向下一个 g。
prev:指向上一个 g。
elem:数据元素,可能会指向堆栈。
*/
Chan使用

创建chan

ch := make(chan string)
ch := make(chan string, 1024)

创建 channel 的逻辑主要分为三大块:

mallocgchchan
mallocgcclose

向chan中写入数据

向一个channel中写数据过程

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
 if c.closed != 0 { // 会对 channel 进行一次状态判断(是否关闭
  unlock(&c.lock)
  panic(plainError("send on closed channel"))
 }

 if sg := c.recvq.dequeue(); sg != nil {
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
 }
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
 if c.qcount < c.dataqsiz {
  qp := chanbuf(c, c.sendx)
  typedmemmove(c.elemtype, qp, ep)
  c.sendx++
  if c.sendx == c.dataqsiz {
   c.sendx = 0
  }
  c.qcount++
  unlock(&c.lock)
  return true
 }

 if !block {
  unlock(&c.lock)
  return false
 }
}
/*
会对缓冲区进行判定(`qcount` 和 `dataqsiz` 字段),以此识别缓冲区的剩余空间。紧接进行如下操作:

- 调用 `chanbuf` 方法,以此获得底层缓冲数据中位于 sendx 索引的元素指针值。
- 调用 `typedmemmove` 方法,将所需发送的数据拷贝到缓冲区中。
- 数据拷贝后,对 sendx 索引自行自增 1。同时若 sendx 与 dataqsiz 大小一致,则归 0(环形队列)。
- 自增完成后,队列总数同时自增 1。解锁互斥锁,返回结果。
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
 gp := getg()
 mysg := acquireSudog()
 mysg.releasetime = 0
 if t0 != 0 {
  mysg.releasetime = -1
 }

 mysg.elem = ep
 mysg.waitlink = nil
 mysg.g = gp
 mysg.isSelect = false
 mysg.c = c
 gp.waiting = mysg
 gp.param = nil
 c.sendq.enqueue(mysg)

 atomic.Store8(&gp.parkingOnChan, 1)
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

 KeepAlive(ep)
}
/*
- 调用 getg 方法获取当前 goroutine 的指针,用于后续发送数据。
- 调用 acquireSudog 方法获取 sudog 结构体,并设置当前 sudog 具体的待发送数据信息和状态。
- 调用 c.sendq.enqueue 方法将刚刚所获取的 sudog 加入待发送的等待队列。
- 调用 gopark 方法挂起当前 goroutine(会记录执行位置),状态为 waitReasonChanSend,阻塞等待 channel。
- 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
*/

流程图

从chan中读取数据

前置处理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 if c == nil {
  if !block {
   return
  }
  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }
/*
一开始时 chanrecv 方法会判断其是否为 nil channel。

场景如下:

若 channel 是 nil channel,且为阻塞接收则调用 gopark 方法挂起当前 goroutine。
若 channel 是非阻塞模式,则直接返回。
而接下来对于非阻塞模式的 channel 会进行快速失败检查,检测 channel 是否已经准备好接收。
*/
    
 if !block && empty(c) {
  if atomic.Load(&c.closed) == 0 {
   return
  }

  if empty(c) {
   if ep != nil {
    typedmemclr(c.elemtype, ep)
   }
   return true, false
  }
 }
 ...
}
/*
其分以下几种情况:

无缓冲区:循环队列为 0 及等待队列 sendq 内没有 goroutine 正在等待。
有缓冲区:缓冲区数组为空。
随后会对 channel 的 closed 状态进行判断,因为 channel 是无法重复打开的,需要确定当前 channel 是否为未关闭状态。再确定接收失败,返回。

但若是 channel 已经关闭且不存在缓存数据了,则会清理 ep 指针中的数据并返回。
*/

从channel读数据过程

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

 lock(&c.lock)

 if sg := c.sendq.dequeue(); sg != nil {
  recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true, true
 }
 ...
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

 if c.qcount > 0 {
  qp := chanbuf(c, c.recvx)
  if ep != nil {
   typedmemmove(c.elemtype, ep, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
   c.recvx = 0
  }
  c.qcount--
  unlock(&c.lock)
  return true, true
 }

 if !block {
  unlock(&c.lock)
  return false, false
 }
 ...
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

 gp := getg()
 mysg := acquireSudog()
 mysg.releasetime = 0
 if t0 != 0 {
  mysg.releasetime = -1
 }

 mysg.elem = ep
 mysg.waitlink = nil
 gp.waiting = mysg
 mysg.g = gp
 mysg.isSelect = false
 mysg.c = c
 gp.param = nil
 c.recvq.enqueue(mysg)

 atomic.Store8(&gp.parkingOnChan, 1)
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
 ...
}

流程图

特性

  1. 读队列Recvq, 写入数据时G被唤醒; 写队列Sendq, 读取数据时G被唤醒
  2. sendq有G有数据; recvq有G无数据

Chan关闭

panic出现的常见场景:

  1. 关闭值为nil的channel(未make)
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

sendq、recvq G释放

释放接收方

在完成了异常边界判断和标志设置后,会将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中:

func closechan(c *hchan) {

 var glist gList
 for {
  sg := c.recvq.dequeue()
  if sg == nil {
   break
  }
  if sg.elem != nil {
   typedmemclr(c.elemtype, sg.elem)
   sg.elem = nil
  }
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }
 ...
}
_Gwaiting

释放发送方

同样,与释放接收方一样。会将发送方也加入到到待清除队列 glist 中:

func closechan(c *hchan) {

 // release all writers (they will panic)
 for {
  sg := c.sendq.dequeue()
  if sg == nil {
   break
  }
  sg.elem = nil
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }
 unlock(&c.lock)
 ...
}

协程调度

_Gwaiting_Grunnable
func closechan(c *hchan) {

 // Ready all Gs now that we've dropped the channel lock.
 for !glist.empty() {
  gp := glist.pop()
  gp.schedlink = 0
  goready(gp, 3)
 }
}

后续所有的 goroutine 允许被重新调度后。若原本还在被动阻塞的发送方或接收方,将重获自由,后续该干嘛就去干嘛了,再跑回其所属的应用流程。

channel send/recv 分析

send

send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if sg.elem != nil {
  sendDirect(c.elemtype, sg, ep)
  sg.elem = nil
 }
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
 goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
 dst := sg.elem
 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
 memmove(dst, src, t.size)
}
sendDirectmsg := <-chchmsgsg.ggoready_Gwaiting_Grunnable

recv

recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if c.dataqsiz == 0 {
  if ep != nil {
   recvDirect(c.elemtype, sg, ep)
  }
 } else {
  qp := chanbuf(c, c.recvx)
  if ep != nil {
   typedmemmove(c.elemtype, ep, qp)
  }
  typedmemmove(c.elemtype, qp, sg.elem)
  c.recvx++
  if c.recvx == c.dataqsiz {
   c.recvx = 0
  }
  c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
 }
 sg.elem = nil
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
 goready(gp, skip+1)
}

该方法在接受上分为两种情况,分别是直接接收和缓冲接收:

直接接收(不存在缓冲区):

recvDirectsendDirect

缓冲接收(存在缓冲区):

chanbufrecvxsendxrecvx
goready
细节
  1. channel 都是创建在堆上的。因此 channel 是会被 GC 回收的;
  2. 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
  3. 在具体的数据传输上,都是围绕着 “边界上下限处理,上互斥锁,阻塞/非阻塞,缓冲/非缓冲,缓存出队列,拷贝数据,解互斥锁,协程调度” 在不断地流转处理。在基本逻辑上也是相对重合的,因为发送和接收,创建和关闭总是相对的。
参考博客

Go语言基础之并发
一文带你解密 Go 语言之通道 channel
Go专家编程(书籍购买)