Go channel结构剖析《二》

总是依赖别人的话,就永远长不大。——《哆啦A梦》

1 Channel的创建

创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。

创建channel的源码如下:

  
const (  
 maxAlign = 8  
 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))  
 debugChan = false  
)  
  
func makechan(t *chantype, size int) *hchan {  
 elem := t.elem  
  
 // 编译器决定size是否超过类型大小  
 if elem.size >= 1<<16 {  
 throw("makechan: invalid channel element type")  
 }  
   
 //对齐字节 与内存模型有关 内存模型可以看我前面的文章  
 if hchanSize%maxAlign != 0 || elem.align > maxAlign {  
 throw("makechan: bad alignment")  
 }  
  
 //分配的内存必须是在预留内存范围之内  
 mem, overflow := math.MulUintptr(elem.size, uintptr(size))  
 if overflow || mem > maxAlloc-hchanSize || size < 0 {  
 panic(plainError("makechan: size out of range"))  
 }  
  
 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.  
 // buf points into the same allocation, elemtype is persistent.  
 // SudoG's are referenced from their owning thread so they can't be collected.  
 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.  
 var c *hchan  
 switch {  
 case mem == 0:  
 // 队列 or 元素size is zero.  
 c = (*hchan)(mallocgc(hchanSize, nil, true))  
   
 case elem.ptrdata == 0:  
 // 元素不包含指针  
 // 分配 hchan and buf  
 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))  
 c.buf = add(unsafe.Pointer(c), hchanSize)  
   
 default:  
 // 元素包含指针  
 c = new(hchan)  
 c.buf = mallocgc(mem, elem, true)  
 }  
  
 c.elemsize = uint16(elem.size) //元素大小  
 c.elemtype = elem //元素类型  
 c.dataqsiz = uint(size)  
 lockInit(&c.lock, lockRankHchan)  
  
 return c  
}

2 向channel写数据

向一个channel中写数据简单过程如下:

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq取出G,并把数据写入,最后把该G唤醒,结束发送过程;
  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;

简单流程图如下:

picture.image

源码如下:

  
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  
 if c == nil {  
 if !block {  
 return false  
 }  
 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)  
 throw("unreachable")  
 }  
  
 lock(&c.lock)  
  
 if c.closed != 0 {  
 unlock(&c.lock)  
 panic(plainError("send on closed channel"))  
 }  
  
 if sg := c.recvq.dequeue(); sg != nil {  
 send(c, sg, ep, func() { unlock(&c.lock) }, 3)  
 return true  
 }  
  
 if c.qcount < c.dataqsiz {  
 qp := chanbuf(c, c.sendx)  
 typedmemmove(c.elemtype, qp, ep)  
 c.sendx++  
 if c.sendx == c.dataqsiz {  
 c.sendx = 0  
 }  
 c.qcount++  
 unlock(&c.lock)  
 return true  
 }  
  
 if !block {  
 unlock(&c.lock)  
 return false  
 }  
  
 // Block on the channel. Some receiver will complete our operation for us.  
 gp := getg()  
 mysg := acquireSudog()  
 mysg.releasetime = 0  
 if t0 != 0 {  
 mysg.releasetime = -1  
 }  
  
 mysg.elem = ep  
 mysg.waitlink = nil  
 mysg.g = gp  
 mysg.isSelect = false  
 mysg.c = c  
 gp.waiting = mysg  
 gp.param = nil  
 c.sendq.enqueue(mysg)  
 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)  
 // stack tracer.  
 KeepAlive(ep)  
  
 // someone woke us up.  
 if mysg != gp.waiting {  
 throw("G waiting list is corrupted")  
 }  
 gp.waiting = nil  
 if gp.param == nil {  
 if c.closed == 0 {  
 throw("chansend: spurious wakeup")  
 }  
 panic(plainError("send on closed channel"))  
 }  
 gp.param = nil  
 if mysg.releasetime > 0 {  
 blockevent(mysg.releasetime-t0, 2)  
 }  
 mysg.c = nil  
 releaseSudog(mysg)  
 return true  
}
  1. 第一个if判断 就是chan是nil的情况 如果是nil就调用gopark永远阻塞 所以throw("unreachable")就不会执行到。
  2. 因为我们block=true,所以目的就是让send阻塞,所以第二个if进不去
  3. 第三个if判断如果队列关闭了即closed=1 就会panic 即想关闭的chan发送数据会导致panic
  4. 第四部分就是如果接受队列中有等待着,那么就把它从队列中弹出,数据直接交给它(通过memmove(dst,src,size)实现),而不需要放入buf中,速度更快。
  5. 如果走到这里代表接受队列没有等待的receiver了,那么就判断如果队列没有满,就把数据存储到buf中,返回成功。(注意变量的变化,比如sendx索引指向下一个位置,qcount+1,以及sendx和队列容量相等,从头开始记。就是循环队列嘛)
  6. 能走到这里就代表buf满了,将要发送的数据和当前goroutine打包成sudog对象放入sendq中,并将当前goroutine的状态设置成waiting状态。
  7. goparkunclok函数 其实就是阻塞并且解锁传入的mutex,阻塞的goroutine没必要在持有锁,同时切出goroutine,并设置状态是waiting。gopark和goready互为逆操作,调用gopark在用户测看来就是向chan发送数据阻塞了。goready作用是唤醒对应的goroutine。
  8. 当唤醒的时候,看唤醒的是不是当前的goroutine,如果不是,直接抛异常,否则换新的就是当前goroutine 那么被唤醒之后params会被赋值sudog对象指针(recv函数中),所以这里判断是否是空,如果是空,就是params=nil,就代表这个chan已经close了,closechan的时候会清空params。那么如果closed==0就代表异常状态,不存在的,所以抛异常。否则就是chan关闭了你还发送数据,那么就panic。这里设计到recv函数 closchan函数对params的处理。
  9. 接下来就是一些资源的释放和环境的清理

3 从channel读数据

从一个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,最后把G唤醒,结束读取过程;
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,结束读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  4. 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;

简单流程图如下:

picture.image

源码如下

  
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  
 if c == nil {  
 if !block {  
 return  
 }  
 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)  
 throw("unreachable")  
 }  
  
 lock(&c.lock)  
  
 if c.closed != 0 && c.qcount == 0 {  
 unlock(&c.lock)  
 if ep != nil {  
 typedmemclr(c.elemtype, ep)  
 }  
 return true, false  
 }  
  
 if sg := c.sendq.dequeue(); sg != nil {  
 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)  
 return true, true  
 }  
  
 if c.qcount > 0 {  
 // Receive directly from queue  
 qp := chanbuf(c, c.recvx)  
 if raceenabled {  
 raceacquire(qp)  
 racerelease(qp)  
 }  
 if ep != nil {  
 typedmemmove(c.elemtype, ep, qp)  
 }  
 typedmemclr(c.elemtype, qp)  
 c.recvx++  
 if c.recvx == c.dataqsiz {  
 c.recvx = 0  
 }  
 c.qcount--  
 unlock(&c.lock)  
 return true, true  
 }  
  
 if !block {  
 unlock(&c.lock)  
 return false, false  
 }  
  
 // no sender available: block on this channel.  
 gp := getg()  
 mysg := acquireSudog()  
 mysg.releasetime = 0  
 if t0 != 0 {  
 mysg.releasetime = -1  
 }  
 // No stack splits between assigning elem and enqueuing mysg  
 // on gp.waiting where copystack can find it.  
 mysg.elem = ep  
 mysg.waitlink = nil  
 gp.waiting = mysg  
 mysg.g = gp  
 mysg.isSelect = false  
 mysg.c = c  
 gp.param = nil  
 c.recvq.enqueue(mysg)  
 goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)  
  
 // someone woke us up  
 if mysg != gp.waiting {  
 throw("G waiting list is corrupted")  
 }  
 gp.waiting = nil  
 if mysg.releasetime > 0 {  
 blockevent(mysg.releasetime-t0, 2)  
 }  
 closed := gp.param == nil  
 gp.param = nil  
 mysg.c = nil  
 releaseSudog(mysg)  
 return true, !closed  
}
  1. 第一个if判断就是chan是不是nil 是nil 那么代表从chan接受数据会被永远阻塞
  2. 第二个if判断就是如果队列关闭了且队列没有数据了,那么把接受到的这个数据ep内存清0 然后直接返回
  3. 第三个if判断就是 如果sendq有等待发送的sender,如果是unbufferd的chan直接将sender的数据复制给receiver,否则就从队首读取一个值,然后把sender的值加入到队列尾部。
  4. 第四个if判断就是没有等待的sender,且buf中有数据,那么就取出一个元素给receiver
  5. 第五个if进不去
  6. 第六部分是buf中没有元素,那么当前receiver就会阻塞,直到它从sender中接受了数据或者是chan被close,才返回。
  7. 第六部分如果被执行,那么receiver被唤醒了,唤醒之后就会判断chan关闭了吗 如果关闭params==nil成立 否则就是被send函数调用唤醒,params有数据。所以清理完资源之后,如果最终closed是false就代表接受到send发送的数据 是true代表chan关闭了。

4 关闭Channel

关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。

除此之外,panic出现的常见场景还有:

  1. 关闭值为nil的channel
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

源码如下:

  
func closechan(c *hchan) {  
 if c == nil {  
 panic(plainError("close of nil channel"))  
 }  
  
 lock(&c.lock)  
 if c.closed != 0 {  
 unlock(&c.lock)  
 panic(plainError("close of closed channel"))  
 }  
  
 if raceenabled {  
 callerpc := getcallerpc()  
 racewritepc(c.raceaddr(), callerpc, funcPC(closechan))  
 racerelease(c.raceaddr())  
 }  
  
 c.closed = 1  
  
 var glist gList  
  
 // release all readers  
 for {  
 sg := c.recvq.dequeue()  
 if sg == nil {  
 break  
 }  
 if sg.elem != nil {  
 typedmemclr(c.elemtype, sg.elem)  
 sg.elem = nil  
 }  
 if sg.releasetime != 0 {  
 sg.releasetime = cputicks()  
 }  
 gp := sg.g  
 gp.param = nil  
 if raceenabled {  
 raceacquireg(gp, c.raceaddr())  
 }  
 glist.push(gp)  
 }  
  
 // release all writers (they will panic)  
 for {  
 sg := c.sendq.dequeue()  
 if sg == nil {  
 break  
 }  
 sg.elem = nil  
 if sg.releasetime != 0 {  
 sg.releasetime = cputicks()  
 }  
 gp := sg.g  
 gp.param = nil  
 if raceenabled {  
 raceacquireg(gp, c.raceaddr())  
 }  
 glist.push(gp)  
 }  
 unlock(&c.lock)  
  
 // Ready all Gs now that we've dropped the channel lock.  
 for !glist.empty() {  
 gp := glist.pop()  
 gp.schedlink = 0  
 goready(gp, 3)  
 }  
}
  1. 如果chan是nil,你close会panic
  2. chan已经closed了,你再次close就会panic
  3. 否则就是chan没有close,且chan不为nil
  4. 就把等待队列中sender和receiver从队列中全部移除并且唤醒 清除的时候将params设置为nil。
  5. 为啥还要唤醒呢,因为他们还在阻塞,你只是清理了数据而已,close chan 唤醒他们 让他们继续工作 否则永远阻塞了。

5 关注公众号

微信公众号: 堆栈future

picture.image

扫我关注

0
0
0
0
评论
未登录
暂无评论