总是依赖别人的话,就永远长不大。——《哆啦A梦》
1 Channel的创建
创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。
创建channel的源码如下:
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
debugChan = false
)
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 编译器决定size是否超过类型大小
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
//对齐字节 与内存模型有关 内存模型可以看我前面的文章
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//分配的内存必须是在预留内存范围之内
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 队列 or 元素size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
case elem.ptrdata == 0:
// 元素不包含指针
// 分配 hchan and buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size) //元素大小
c.elemtype = elem //元素类型
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
2 向channel写数据
向一个channel中写数据简单过程如下:
- 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
- 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
- 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
简单流程图如下:
源码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
- 第一个if判断 就是chan是nil的情况 如果是nil就调用gopark永远阻塞 所以throw("unreachable")就不会执行到。
- 因为我们block=true,所以目的就是让send阻塞,所以第二个if进不去
- 第三个if判断如果队列关闭了即closed=1 就会panic 即想关闭的chan发送数据会导致panic
- 第四部分就是如果接受队列中有等待着,那么就把它从队列中弹出,数据直接交给它(通过memmove(dst,src,size)实现),而不需要放入buf中,速度更快。
- 如果走到这里代表接受队列没有等待的receiver了,那么就判断如果队列没有满,就把数据存储到buf中,返回成功。(注意变量的变化,比如sendx索引指向下一个位置,qcount+1,以及sendx和队列容量相等,从头开始记。就是循环队列嘛)
- 能走到这里就代表buf满了,将要发送的数据和当前goroutine打包成sudog对象放入sendq中,并将当前goroutine的状态设置成waiting状态。
- goparkunclok函数 其实就是阻塞并且解锁传入的mutex,阻塞的goroutine没必要在持有锁,同时切出goroutine,并设置状态是waiting。gopark和goready互为逆操作,调用gopark在用户测看来就是向chan发送数据阻塞了。goready作用是唤醒对应的goroutine。
- 当唤醒的时候,看唤醒的是不是当前的goroutine,如果不是,直接抛异常,否则换新的就是当前goroutine 那么被唤醒之后params会被赋值sudog对象指针(recv函数中),所以这里判断是否是空,如果是空,就是params=nil,就代表这个chan已经close了,closechan的时候会清空params。那么如果closed==0就代表异常状态,不存在的,所以抛异常。否则就是chan关闭了你还发送数据,那么就panic。这里设计到recv函数 closchan函数对params的处理。
- 接下来就是一些资源的释放和环境的清理
3 从channel读数据
从一个channel读数据简单过程如下:
- 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
- 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
- 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
- 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;
简单流程图如下:
源码如下
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
- 第一个if判断就是chan是不是nil 是nil 那么代表从chan接受数据会被永远阻塞
- 第二个if判断就是如果队列关闭了且队列没有数据了,那么把接受到的这个数据ep内存清0 然后直接返回
- 第三个if判断就是 如果sendq有等待发送的sender,如果是unbufferd的chan直接将sender的数据复制给receiver,否则就从队首读取一个值,然后把sender的值加入到队列尾部。
- 第四个if判断就是没有等待的sender,且buf中有数据,那么就取出一个元素给receiver
- 第五个if进不去
- 第六部分是buf中没有元素,那么当前receiver就会阻塞,直到它从sender中接受了数据或者是chan被close,才返回。
- 第六部分如果被执行,那么receiver被唤醒了,唤醒之后就会判断chan关闭了吗 如果关闭params==nil成立 否则就是被send函数调用唤醒,params有数据。所以清理完资源之后,如果最终closed是false就代表接受到send发送的数据 是true代表chan关闭了。
4 关闭Channel
关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。
除此之外,panic出现的常见场景还有:
- 关闭值为nil的channel
- 关闭已经被关闭的channel
- 向已经关闭的channel写数据
源码如下:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
- 如果chan是nil,你close会panic
- chan已经closed了,你再次close就会panic
- 否则就是chan没有close,且chan不为nil
- 就把等待队列中sender和receiver从队列中全部移除并且唤醒 清除的时候将params设置为nil。
- 为啥还要唤醒呢,因为他们还在阻塞,你只是清理了数据而已,close chan 唤醒他们 让他们继续工作 否则永远阻塞了。
5 关注公众号
微信公众号: 堆栈future
扫我关注
