Gochannelgoroutinegoroutine
goroutine
Golang
GoroutineChannel Channel Channel
Channel
Channel Goroutine Channel Goroutine
FIFOChannel
基本操作
channel 声明
Channel
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道
channel 初始化
Channel
ch1 := make(chan bool)
ch2 := make(chan bool, 1)
channel 操作
channel 操作一般有三种: 发送、接收、关闭
发送语法: [channel对象] <- [发送数据]
接收语法: [变量] := <- [channel对象] 或者 <- [channel对象]
关闭语法: close([channel对象])
cha := make(chan int)
// 发送
cha <- 10 // 将10发送到cha中
// 接收
x := <- cha // 从cha中接收值并赋值给x变量
<- cha // 从cha中接收值,忽略结果
// 关闭
close(cha)
缓冲通道
ChannelbufferChannelChannelmake(chan T, N)bufferchannelN0ChannelNChannel
func main() {
ch := make(chan int) // make(chan int) 创建的就是无缓冲通道
ch <- 10
fmt.Println("发送成功")
}
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 创建一个 goroutine 从通道接收值
ch <- 10
fmt.Println("发送成功")
}
func main() {
ch := make(chan int, 5) // 创建一个容量为5的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
}
单向通道
Gochannel
声明单向通道的语法如下:
发送通道(Send-only Channel)的声明:
var ch chan<- int
接收通道(Receive-only Channel)的声明:
var ch <-chan int
注意,单向通道的类型是基于普通的双向通道类型而来的。因此,你需要首先创建一个双向通道,然后使用类型转换将其转换为单向通道。
例如,如果你想声明一个只允许发送数据的通道,可以按照以下步骤进行声明和初始化:
var ch chan<- int
ch = make(chan int)
类似地,如果你想声明一个只允许接收数据的通道,可以按照以下方式进行声明和初始化:
var ch <-chan int
ch = make(chan int)
需要注意的是,单向通道的实际用途在于函数参数和返回值中,用于限制通道的使用范围,并提高代码的可读性和安全性。在一般情况下,我们使用双向通道来进行通信和同步操作。
多返回值模式
Gochannel
通常情况下,一个函数只能返回一个值,但是通过使用通道,可以将多个值封装在一个通道中,然后在调用方进行接收。这种方式使得函数可以返回多个值,而不需要显式地声明多个返回类型。
下面是一个示例,演示了如何在函数中使用通道实现多返回值的机制:
func computeSumAndProduct(a, b int) <-chan int {
resultChan := make(chan int)
go func() {
sum := a + b
product := a * b
resultChan <- sum
resultChan <- product
close(resultChan)
}()
return resultChan
}
func main() {
values := computeSumAndProduct(2, 3)
sum := <-values
product := <-values
fmt.Println("Sum:", sum)
fmt.Println("Product:", product)
}
computeSumAndProductab
需要注意的是:
closepanic0
底层数据结构
runtime.hchan
GoChannelruntime.hchanruntime/chan.goGoChannel
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
make(chan T,N) NChannel Channel goroutinegoroutine
runtime.hchan
该示意图只是展示相关字段对应的数据关联结构,具体的各个字段的配合后续具体操作会详细讲解。
环形队列
channel
通道的环形队列主要由以下字段组成:
bufsendxrecvx
sendxsendxrecvxrecvx
sendxrecvx
下图列出环形队列的各种状态:
使用环形队列的好处是,可以避免频繁地进行内存分配和释放,提高了数据传输的效率。而且,环形队列的结构可以简单地通过索引运算来实现元素的读写,避免了复杂的指针操作。
Go
Go
waitq & sudog
Gochannelwaitqgoroutine
它的作用是在通道的发送和接收操作中提供阻塞和唤醒的机制。
goroutinegoroutine
runtime.hchanhchan.sendqhchan.recvqwaitq
type waitq struct {
first *sudog //指向等待队列中第一个等待的 sudog 结构体的指针
last *sudog //指向等待队列中最后一个等待的 sudog 结构体的指针
}
waitqchannelsudog
goroutinegoroutinesudogsudog
sudog
waitqfirstlastsudoggoroutine
通过等待队列的机制,通道能够实现发送和接收操作之间的同步,确保发送和接收的配对正确,并避免竞态条件和数据竞争的问题。
sudoggoroutinegoroutine
type sudog struct {
g *g // 关联的 goroutine
next *sudog // 下一个 sudog 结构体
prev *sudog // 上一个 sudog 结构体
elem unsafe.Pointer // 元素指针
acquiretime int64 // 获取时间
releasetime int64 // 释放时间
ticket uint32 // 票据(用于调度)
isSelect bool // 是否处于 select 操作
success bool // 操作是否成功
parent *sudog // 父 sudog(用于嵌套等待)
waitlink *sudog // 等待链接(用于等待队列)
waittail *sudog // 等待尾部(用于等待队列)
c *hchan // 关联的通道
}
sudogGosudog
waitq
创建 channel
GoChannelmakemake(chan int, 10)OMAKEOMAKEOMAKECHAN
// go1.20.3 path:/src/cmd/compile/internal/typecheck/func.go
func tcMake(n *ir.CallExpr) ir.Node {
args := n.Args
......
case types.TCHAN:
l = nil
if i < len(args) {
l = args[i]
i++
l = Expr(l)
l = DefaultLit(l, types.Types[types.TINT])
if l.Type() == nil {
n.SetType(nil)
return n
}
if !checkmake(t, "buffer", &l) {
n.SetType(nil)
return n
}
} else {
l = ir.NewInt(0)
}
nn = ir.NewMakeExpr(n.Pos(), ir.OMAKECHAN, l, nil)
}
if i < len(args) {
base.Errorf("too many arguments to make(%v)", t)
n.SetType(nil)
return n
}
nn.SetType(t)
return nn
}
makemake0Channel
OMAKECHANSSAruntime.makechanruntime.makechan64
// go1.20.3 path:/src/cmd/compile/internal/walk/builtin.go
func walkMakeChan(n *ir.MakeExpr, init *ir.Nodes) ir.Node {
size := n.Len
fnname := "makechan64"
argtype := types.Types[types.TINT64]
if size.Type().IsKind(types.TIDEAL) || size.Type().Size() <= types.Types[types.TUINT].Size() {
fnname = "makechan"
argtype = types.Types[types.TINT]
}
return mkcall1(chanfn(fnname, 1, n.Type()), n.Type(), init, reflectdata.MakeChanRType(base.Pos, n), typecheck.Conv(size, argtype))
}
runtime.makechanruntime.makechan64Channel Channel runtime.makechan
//go 1.20.3 path:/src/runtime/chan.go
func makechan(t *chantype, size int) *hchan {
//获取channel类型元数据所在的地址指针
elem := t.elem
/**
编译器会检查类型是否安全,主要检查下面内容:
1. 类型大小大与 1<<16 时会法生异常(即大与65536)
2. 内存对齐,当大与maxAlign(最大内存8字节数)时会发生异常
3. 传入的size大小大与堆可分配的最大内存时会发成异常
*/
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//获取需要分配的内存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
//chan的size或元素的size为0,就不必创建buf
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 竞争检测器使用此位置进行同步
c.buf = c.raceaddr()
// 元素不是指针,分配一块连续的内存给hchan数据结构和buf
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
//hchan数据结构后面紧接着就是buf
c.buf = add(unsafe.Pointer(c), hchanSize)
//元素包含指针,单独为hchan 和缓冲区分配内存
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
//更新chan的元素大小、类型、容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
Channel runtime.hchan
Channel runtime.hchanChannel Channelruntime.hchan
runtime.hchanelemsizeelemtypedataqsiz
makechannelheaphchanchchannel
发送数据
Channelch <- iOSENDruntime.chansend1
//go 1.20.3 path: /src/cmd/compile/internal/walk/expr.go
func walkSend(n *ir.SendStmt, init *ir.Nodes) ir.Node {
n1 := n.Value
n1 = typecheck.AssignConv(n1, n.Chan.Type().Elem(), "chan send")
n1 = walkExpr(n1, init)
n1 = typecheck.NodAddr(n1)
return mkcall1(chanfn("chansend1", 2, n.Chan.Type()), nil, init, n.Chan, n1)
}
runtime.chansend1runtime.chansend
//go 1.20.3 path: /src/runtime/chan.go
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
runtime.chansendChannelblocktrue
//go 1.20.3 path: /src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//如果channel为nil
if c == nil {
//如果非堵塞模式,则直接返回false
if !block {
return false
}
// nil channel 发送数据会永远阻塞下去
// 挂起当前 goroutine
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//如果非堵塞模式,如果chan没有被close并且chan缓冲满了,直接返回false
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
//未启用阻塞分析,由于CPU分支预测
if blockprofilerate > 0 {
t0 = cputicks()
}
//上锁
lock(&c.lock)
//chan已经关闭,解锁,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果在接收等待队列上存在正在等待的G,则直接将数据发送
// 不必将数据缓存到队列中
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
/**
如果当前chan的缓存区未满,将数据缓存到队列中;否则阻塞当前G
*/
//当前chan的缓存区未满
if c.qcount < c.dataqsiz {
//计算下一个缓存区位置指针
qp := chanbuf(c, c.sendx)
//将数据保存到缓冲区队列
typedmemmove(c.elemtype, qp, ep)
//sendx位置往后移动一位
c.sendx++
//如果c.sendx == c.dataqsiz,表示sendx索引已经达到缓冲队列最尾部了,则将sendx移动到0(第一个位置),这个是环形队列思维
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//Chan中的元素个数+1
c.qcount++
//解锁,返回即可
unlock(&c.lock)
return true
}
//如果未堵塞模式,缓冲区满了则直接解锁,返回false
if !block {
unlock(&c.lock)
return false
}
//缓冲队列已满或者创建的不带缓冲的channel,则阻塞当前G
//获取当前goroutine
gp := getg()
// 获取一个sudog对象并设置其字段
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep //将指向发送数据的指针保存到 elem 中
mysg.waitlink = nil
mysg.g = gp //将g指向当前的goroutine
mysg.isSelect = false
mysg.c = c //当前阻塞的 channel
gp.waiting = mysg
gp.param = nil // param 可以用来传递数据,其他 goroutine 唤醒该 goroutine 时可以设置该字段,然后根据该字段做一些判断
c.sendq.enqueue(mysg)// 将sudog加入到channel的发送等待队列hchan.sendq中
atomic.Store8(&gp.parkingOnChan, 1)
// 当前 Goroutine 切换为等待状态并阻塞等待其他的Goroutine从 channel 接收数据并将其唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 在没有其他的接收队列将数据复制到队列中时候,需要保证当前需要被发送的的值一直是可用状态
KeepAlive(ep)
/**
协程被唤醒后
*/
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
//更新goroutine相关的对象信息
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
//释放sudog对象
releaseSudog(mysg)
//如果channel已经关闭
if closed {
// close标志位为0,则抛出假性唤醒异常
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
//直接panic
panic(plainError("send on closed channel"))
}
return true
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 是发送的数据源地址,dst 是接收数据的地址
// src 在当前的 goroutine 栈中,而 dst 在其他栈上
dst := sg.elem
// 使用 memove 直接进行内存 copy
// 因为 dst 指向其他 goroutine 的栈,如果它发生了栈收缩,那么就没有修改真正的 dst 位置
// 所以会加读写前加一个屏障
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
......
// sg.elem 是指向待接收 goroutine 中接收数据的指针
// ep是指当前发送数据所在的指针
// 如果待接收 goroutine 需要接收具体的数据,那么直接将数据 copy 到 sg.elem
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
//gp是指接收的goroutine
gp := sg.g
unlockf()
// 赋值 param,待接收者被唤醒后会根据 param 来判断是否是被发送者唤醒的
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//将gp唤醒,放入处理器P的本地运行队列,等待被调度
goready(gp, skip+1)
}
// 计算缓冲区下一个可以存储数据的位置
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
总结一下发送流程:
channelsudogchannelchannelchanbuf()c.qcount=c.dataqsizchannelsudogchannelGoroutine
runtime.chansend
除了流程以外,从代码中可以重点列出几个要点信息:
channelnilfalsenil channel gopark channel
接收数据
Channel GoChannel
i <- ch
i, ok <- ch
ORECVOAS2RECV
runtime.chanrecv1runtime.chanrecv2runtime.chanrecv
runtime.chanrecv
//go 1.20.3 path: /src/runtime/chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
//如果chan为空且是非阻塞调用,那么直接返回 (false,false)
if !block {
return
}
// 阻塞调用直接等待
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
/**
快速检测,在非阻塞模式下,和发送一样有些条件不需要加锁就可以直接判断返回。
当前非阻塞并且chan未关闭,并符合下列条件之一:
1. 非缓冲channel且没有待发送者
2. 缓冲channel且是缓冲区为空
*/
if !block && empty(c) {
//chan未关闭,直接返回(false,false)
if atomic.Load(&c.closed) == 0 {
return
}
//channel 处于关闭,并且empty(c),返回(true,false)
if empty(c) {
if ep != nil {
//将接收的值置为空值
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
//未启用阻塞分析,由于CPU分支预测
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//加锁
lock(&c.lock)
//channel 处于关闭
if c.closed != 0 {
//如果channel元素为空
if c.qcount == 0 {
//如果竞态检测功能已启用(即 raceenabled 为 true),则调用 raceacquire() 函数检测
if raceenabled {
raceacquire(c.raceaddr())
}
//解锁
unlock(&c.lock)
if ep != nil {
//将接收的值置为空值
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
//待发送队列sendq中有 goroutine,说明是非缓冲channel或者缓冲已满的 channel,将数据从待发送者复制给接收者
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
//chan的缓存队列中还有数据
if c.qcount > 0 {
//获取一个缓存队列数据的指针地址
qp := chanbuf(c, c.recvx)
if ep != nil {
//将该数据复制到接收对象
typedmemmove(c.elemtype, ep, qp)
}
//清空该指针地址的数据
typedmemclr(c.elemtype, qp)
//recvx+1
c.recvx++
//如果接收游标 等于环形链表的值,则接收游标清零。
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//循环数组buf元素数量-1
c.qcount--
unlock(&c.lock)
return true, true
}
//非阻塞接收,因为chan的缓存中没有数据,则解锁,selected 返回 false,因为没有接收到值
if !block {
unlock(&c.lock)
return false, false
}
// 缓冲区队列没有数据可以读取,则将当前G打包成Sudo结构并加入到接收等待队列
gp := getg()
/**
创建一个sudog结构体,并将其与当前的goroutine (gp) 关联。
sudog结构体用于在并发环境中进行同步操作和调度。其中的字段和赋值操作可能会在其他代码中使用
*/
//创建一个新的sudog结构体,并将其赋值给变量mysg
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 加入到接收等待队列recvq中
atomic.Store8(&gp.parkingOnChan, 1)
// 阻塞等待被唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
//唤醒后,设置goroutine的部分字段值,并释放该g的Sudo
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
func empty(c *hchan) bool {
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//无缓冲 channel
if c.dataqsiz == 0 {
//如果ep 不为 nil,那么直接从发送 goroutine 中将数据 copy 到接收位置
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
//从数据缓冲区队列中取出一个元素地址
qp := chanbuf(c, c.recvx)
if ep != nil {
// 将待接收数据复制到接收位置
typedmemmove(c.elemtype, ep, qp)
}
//将数据取出后,会腾出一个位置,此时将从sendq队列中的取出的数据sg.elem复制到该位置gp
typedmemmove(c.elemtype, qp, sg.elem)
//调整 recvx
c.recvx++
//如果recvx已经到达队列尾部,则将recvx移动到0位置
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 通过c.sendx = (c.sendx+1) % c.dataqsiz计算得出,环形队列方式
c.sendx = c.recvx
}
sg.elem = nil //清空发送者数据
gp := sg.g //获取发送者协程
unlockf()
gp.param = unsafe.Pointer(sg) //赋值发送者的 param,发送者被唤醒后会根据 param 来判断是否是关闭唤醒的
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//将G重新放入处理器P的本地运行队列,等待被调度处理
goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
总结一下发送流程:
Channel epruntime.recvchannelruntime.recvDirectChannel Goroutine elemchannelGGgoroutinerecvqGoroutine Channel
除了流程以外,几个注意点:
channelnilgopark channelchannelchannelsendq
简单接收数据流程图如下:
关闭channel
channel closecloseOCLOSEruntime.closechanchannel
func closechan(c *hchan) {
//当chan为空的时候,close会panic
if c == nil {
panic(plainError("close of nil channel"))
}
//上锁
lock(&c.lock)
......
//当chan已经关闭状态,close会panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
//设置c.closed为1
c.closed = 1
//保存channel中所有等待队列的G的list
var glist gList
//将 channel所有等待接收队列的里 sudog 释放
for {
//接收队列中出一个sudog
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//取出goroutine
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
// 加入队列中
glist.push(gp)
}
//将channel中等待接收队列里的sudog释放,如果存在这些goroutine将会panic
for {
//从发送队列中出一个sudog
sg := c.sendq.dequeue()
if sg == nil {
break
}
//发送者panic
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//取出goroutine
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 加入队列中
glist.push(gp)
}
//解锁
unlock(&c.lock)
//唤醒所有的glist中的goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
channelrecvqGGnilsendqGGpanic
panic
nilchannelchannelchannel
参考资料:
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel
https://blog.csdn.net/y1391625461/article/details/124292119?spm=1001.2101.3001.6650.3&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-3-124292119-blog-124413145.pc_relevant_3mothn_strategy_recovery&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-3-124292119-blog-124413145.pc_relevant_3mothn_strategy_recovery&utm_relevant_index=6
「zhangkaixuan456」 https://blog.csdn.net/zhangkaixuan456/article/details/128577123
「IceberGu」 https://blog.csdn.net/DAGU131/article/details/108385060