channel 与 Select

channel 与 Select

make 操作返回 chan 引用,实际的结构体被分配在堆中

缓冲 chan

数据结构:

  • 基于数组的循环队列 ring buffer 用来暂存数据
  • 基于链表的单向队列sudog,用于保存阻塞在此 channel 上的 goroutine

理想情况下的收发过程:

  1. 发送方拿到互斥锁

  2. 将数据拷贝并入队

  3. 释放锁

  4. 接收方拿到锁

  5. 出队将数据取出

  6. 释放锁

  7. 如果发送方的内容占满了缓冲区 发送方会被阻塞直到接收方接收,发送方会在满的时候通过 gopark() 告知运行时将它休眠,新的元素和发送方的指针会保存在 channel 的结构体内部,当接收方出队取走元素的时候就可以自行将下一个元素入队(避免恢复运行后再多抢一次锁),并在此时通过 goready() 通知运行时发送方可以从阻塞队列放入就绪队列了,然后运行时再进行调度

  8. 如果接收方尝试从空的 channel 中接收 同样,会转变成阻塞状态,但不同的是,接收方即将接收元素的内存指针也被存入了channel,当发送方发送的时候,它可以无锁地往接收方的栈内存中直接写入元素

试图从未被初始化(值为nil)的通道接收元素值会造成永久阻塞!

无缓冲 channel

接收者优先,发送者总是可以无锁将元素直接写入接收者的栈空间中 发送者优先,发送内容和休眠goroutine指针总是会存放在channel结构体中

值得注意的是:发送方向通道发送的值会被复制,接收方接收的总是该值得副本,而不是该值本身。传递的值至少会被复制一次,至多会被复制两次

Select 实现

根据代码编译信息创建 select 对象,预分配好存放case的内存,运行时存放所有的 channel 到 scase 集合中,然后获得所有case 中 channel 的锁

// 打乱遍历的顺序
for i := 1; i < ncases; i++ {
	j := fastrandn(uint32(i + 1))
	pollorder[i] = pollorder[j]
	pollorder[j] = uint16(i)
}
// 对 ncases 中的 hchan 进行堆排序
for i := 0; i < ncases; i++ {
	j := i
	// Start with the pollorder to permute cases on the same channel.
	c := scases[pollorder[i]].c
	for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
		k := (j - 1) / 2
		lockorder[j] = lockorder[k]
		j = k
	}
	lockorder[j] = pollorder[i]
}
...

随机遍历 channel ,查看是否有 case 已经准备好接收或者发送,如果都没有,则尝试运行 default 分支,如果没有 default 分支,则将自身G封装好并分别挂入每个 channel 的读写者队列中,G进入阻塞队列,等待被 channel 中的任意一个唤醒,被唤醒后找到对应的 case (CAS 操作确保只选择其中一个case),在执行之前放弃在其他 channel 中的等待,最后进入某个分支运行,然后 select 对象就可以等待释放了

实际使用中,常常将 select 放在单独的 G 中运行防止阻塞程序

定时器与 chan

对接收操作进行超时设定

timeout := time.Millisecond * 500
var timer *time.Timer
for {
	// 首次使用进行初始化,否则使用 reset 对定时器对象进行复用
	if timer == nil {
		timer = time.NewTimer(timeout)
	} else {
		timer.Reset(timeout)
	}

	select {
	case e, ok := <-intChan:
		if !ok {
			return
		}
	case <-timer.C:
		fmt.Println("Timeout!")
	}
}

如果不想使用通道来查看到期,可以使用 time.AfterFunc 传入相对到期时间以及到期时需要执行的函数

断续器与 chan

通知方式与定时器类似,但是会在到期之后立即进入下一个周期等待再次到期直到被停止,常被作为定时任务的触发器来使用

如果不想停止,可以使用 time.Tick(..) 直接通过时间间隔获取只读到期信号通道