gochannelchannelCSPchannelchannel
channel
channel
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
可能看源代码不是很好看得懂,这里我个人画了一张图方便大家查看,我在上面标注了不同颜色,并且注释其作用。
传送带队列FIFOgoroutine
buf
hchanbufsendxrecvxdataqsizeqcount
ch := make(chan int, 10)bufbufmake元素大小*元素个数buf
ch = make(chan int,6)bufheap
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
mallocgc
sendx and recvx
ring
// Queue cycle buffer
type CycleQueue struct {
data []interface{} // 存放元素的数组,准确来说是切片
frontIndex, rearIndex int // frontIndex 头指针,rearIndex 尾指针
size int // circular 的大小
}
// NewQueue Circular Queue
func NewQueue(size int) (*CycleQueue, error) {
if size <= 0 || size < 10 {
return nil, fmt.Errorf("initialize circular queue size fail,%d not legal,size >= 10", size)
}
cq := new(CycleQueue)
cq.data = make([]interface{}, size)
cq.size = size
return cq, nil
}
// Push add data to queue
func (q *CycleQueue) Push(value interface{}) error {
if (q.rearIndex+1)%cap(q.data) == q.frontIndex {
return errors.New("circular queue full")
}
q.data[q.rearIndex] = value
q.rearIndex = (q.rearIndex + 1) % cap(q.data)
return nil
}
// Pop return queue a front element
func (q *CycleQueue) Pop() interface{} {
if q.rearIndex == q.frontIndex {
return nil
}
v := q.data[q.frontIndex]
q.data[q.frontIndex] = nil // 拿除元素 位置就设置为空
q.frontIndex = (q.frontIndex + 1) % cap(q.data)
return v
}
font=reargolangchannelqcountlenqcount
recvxsendx
sendq and recvq
当写入数据的如果缓冲区已经满或者读取的缓冲区已经没有数据的时候,就会发生协程阻塞。
sendqrecvq
g-wsendqg-rg-wthread5gothread
gosendqsendqgopher