select是Golang在语言层面提供的多路IO复用的机制,其可以检测多个channel是否ready(即是否可读或可写),使用起来非常方便。本文将尝试通过源码的方式,带大家了解Select的基本用法以及其实现原理。
1.基础用法:
提前总结一下select的几个特点:
select中各个case执行顺序是随机的;
如果某个case中的channel已经ready,则执行相应的语句并退出select流程;
如果所有的case的channel都没有ready,则有default会走default然后退出select,没有default,select将阻塞直至channel ready;
case后面不一定是读channel,也可以写channel,只要是对channel的操作就可以;
空的select语句将被阻塞,直至panic;
1.1 带default的语句:
首先通过以下的代码,想一想输出是什么:
package main
import (
"fmt"
"time"
)
func main() {
chan1 := make(chan int)
chan2 := make(chan int)
go func() {
chan1 <- 1
time.Sleep(5 * time.Second)
}()
go func() {
chan2 <- 1
time.Sleep(5 * time.Second)
}()
select {
case <-chan1:
fmt.Println("chan1 ready.")
case <-chan2:
fmt.Println("chan2 ready.")
default:
fmt.Println("default")
}
fmt.Println("main exit.")
}
这里需要了解一下这几个点:
- select中各个case执行顺序是随机的;
- 如果某个case中的channel已经ready,则执行相应的语句并退出select流程,如果所有case中的channel都未ready,则执行default中的语句然后退出select流程;
因为启动协程和select语句都不能保证顺序执行,因而该代码输出的结果可能是以下三种:
可能1:
chan1 ready.
main exit.
可能2:
chan2 ready.
main exit.
可能3:
default
main exit.
1.2 不带default的语句:
package main
import (
"fmt"
"time"
)
func main() {
chan1 := make(chan int)
chan2 := make(chan int)
writeFlag := false
go func() {
for {
if writeFlag {
chan1 <- 1
}
time.Sleep(time.Second)
}
}()
go func() {
for {
if writeFlag {
chan2 <- 1
}
time.Sleep(time.Second)
}
}()
select {
case <-chan1:
fmt.Println("chan1 ready.")
case <-chan2:
fmt.Println("chan2 ready.")
}
fmt.Println("main exit.")
}
这里有一个Select的知识点需要注意:
- 不带default语句的select,如果case中的channel都没准备好,则select所在协程将会阻塞知道有case中的channel处在ready状态;
如上代码,因为writeFlag一直是false,chan1和chan2永远不会被写入数据,所以select将会一直阻塞,没有内容被打印出
1.3 case后是被关闭的channel
观察如下代码:
package main
import (
"fmt"
)
func main() {
chan1 := make(chan int)
chan2 := make(chan int)
go func() {
close(chan1)
}()
go func() {
close(chan2)
}()
select {
case <-chan1:
fmt.Println("chan1 ready.")
case <-chan2:
fmt.Println("chan2 ready.")
}
fmt.Println("main exit.")
}
这里有一个知识点:
- 关闭的channel是可读的
基于以上,所以可以得出结论:由于case的执行是随机的,chan1和chan2的关闭无法保证执行顺序,所以,两个case都有可能被执行到,其中一个case执行以后,select也就退出了,所以,打印的有以下两种情况:
情况1:
chan1 ready.
main exit.
情况2:
chan2 ready.
main exit.
1.4 空的select语句将一直阻塞
package main
func main() {
select {
}
}
对于空的select语句,程序会被阻塞,准确的说是当前协程被阻塞,同时Golang自带死锁检测机制,当发现当前协程再也没有机会被唤醒时,则会panic。所以上述程序会panic。
2.使用场景:
2.1 超时控制
select-timer模式,例如等待tcp节点发送连接包,超时后则关闭连接。
func (n *node) waitForConnectPkt() {
select {
case <-n.connected:
log.Println("接收到连接包")
case <-time.After(time.Second * 5):
log.Println("接收连接包超时")
n.conn.Close()
}
}
2.2 无阻塞获取值
select-default模式,节选自fasthttp1.19/client.go#L1955-L1963。
// waiting reports whether w is still waiting for an answer (connection or error).
func (w *wantConn) waiting() bool {
select {
case <-w.ready:
return false
default:
return true
}
}
2.3 类事件驱动循环
for-select模式,例如监控tcp节点心跳是否正常。
func (n *node) heartbeatDetect() {
for {
select {
case <-n.heartbeat:
// 收到心跳信号则退出select等待下一次心跳
break
case <-time.After(time.Second*3):
// 心跳超时,关闭连接
n.conn.Close()
return
}
}
}
2.4 带优先级的任务队列
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
defer done()
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}
3.源码分析:
敬请期待