导言
用户空间与内核空间
I/O 模型
Non-blocking I/O
I/O 多路复用
select & poll
epoll
Go netpoller 核心
数据结构
netFD
pollDesc
实现原理
net.Listen
Listener.Accept()
Conn.Read/Conn.Write
pollDesc.waitRead/pollDesc.waitWrite
netpoll
Go netpoller 的价值
Goroutine 的调度
Go netpoller 的问题
Reactor 网络模型
gnet
🚀 功能
参考&延伸阅读
导言
netpollergoroutine-per-connection
Go netpollergoroutine-per-connection
Go netpoller 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。
netpoller
用户空间与内核空间
现代操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 Linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到 0xBFFFFFFF),供各个进程使用,称为用户空间。
现代的网络服务的主流已经完成从 CPU 密集型到 IO 密集型的转变,所以服务端程序对 I/O 的处理必不可少,而一旦操作 I/O 则必定要在用户态和内核态之间来回切换。
I/O 模型
在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:
阻塞 I/O (Blocking I/O)
非阻塞 I/O (Nonblocking I/O)
I/O 多路复用 (I/O multiplexing)
信号驱动 I/O (Signal driven I/O)
异步 I/O (Asynchronous I/O)
操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:
等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)
而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。
这 5 种 I/O 模型的对比如下:
Non-blocking I/O
什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。
fcntlO_NONBLOCK
当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。
「所以,Non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。下一节我们要讲的 I/O 多路复用需要和 Non-blocking I/O 配合才能发挥出最大的威力!」
I/O 多路复用
「所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O 事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。」
select & poll
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
// 和 select 紧密结合的四个宏:
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
select 是 epoll 之前 Linux 使用的 I/O 事件驱动技术。
理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:
0000,00000001,00110000,0011
基于上面的调用过程,可以得出 select 的特点:
可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set 的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限
将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET 判断是否有事件发生)
所以,select 有如下的缺点:
最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降
poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。
epoll
等待消息准备好返回的活跃连接 == select(全部待监控的连接)
高频
epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:
#include <sys/epoll.h>
int epoll_create(int size); // int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:
#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);
epoll 的工作原理如下:
与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是非频繁调用的,而 epoll_wait 则是会被高频调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。
ep_poll_callback
相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。
「由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。」
延伸
epoll_waitepoll_wait
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll_wait
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
...
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}
// 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理
// 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志
// ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
...
send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
// 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了
// 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,
// 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。
// 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
...
}
// ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到
// ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;
// 用户进程就可以访问到这些数据进行处理
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
// 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,
// 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}
// 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,
// 一旦有就绪 fd,就会调用 ep_send_events_proc 函数
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
...
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding ep->mtx, so no operations coming from userspace
* can change the item.
*/
revents = ep_item_poll(epi, &pt, 1);
// 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中
if (!revents)
continue;
// 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,
// 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存
if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}
...
}
do_epoll_wait__put_user__put_user
Go netpoller 核心
「Go netpoller 基本原理」
❝Go netpoller 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。
❞
selectpollepollkqueueiocp
src/runtime/netpoll_epoll.gosrc/runtime/netpoll_kqueue.gosrc/runtime/netpoll_solaris.gosrc/runtime/netpoll_windows.gosrc/runtime/netpoll_aix.gosrc/runtime/netpoll_fake.go
epollepoll
接下来让我们通过分析最新的 Go 源码(v1.15.3),全面剖析一下整个 Go netpoller 的运行机制和流程。
数据结构
netFD
net.Listen("tcp", ":8888")net.Listenerlistener.Accept()net.Connnet.connnetFDnetFD
netFDpoll.FD
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
pollDesc
前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:
type pollDesc struct {
runtimeCtx uintptr
}
runtime.pollDesc
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
// Network poller descriptor.
//
// No heap pointers.
//
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
// proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
// NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
// that will blow up when GC starts moving objects.
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}
rgwgpdReadypdWaitgnil
runtime.pollDescruntime.pollDescruntime.pollDescruntime.pollCache
type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
runtime.pollCache
const pollBlockSize = 4 * 1024
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}
poll_runtime_pollOpenruntime.pollCache.allocruntime.pollDescruntime.persistentallocepollkqueue
pollDesc
pollDescruntime.pollCache.free
func (c *pollCache) free(pd *pollDesc) {
lock(&c.lock)
pd.link = c.first
c.first = pd
unlock(&c.lock)
}
实现原理
使用 Go 编写一个典型的 TCP echo server:
package main
import (
"log"
"net"
)
func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
log.Println("listen error: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
log.Println("accept error: ", err)
break
}
// start a new goroutine to handle the new connection.
go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
packet := make([]byte, 1024)
for {
// block here if socket is not available for reading data.
n, err := conn.Read(packet)
if err != nil {
log.Println("read socket error: ", err)
return
}
// same as above, block here if socket is not available for writing.
_, _ = conn.Write(packet[:n])
}
}
goroutine-per-connection
Go 的这种同步模式的网络服务器的基本架构通常如下:
上面的示例代码中相关的在源码里的几个数据结构和方法:
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}
// Conn
type conn struct {
fd *netFD
}
type conn struct {
fd *netFD
}
func (c *conn) ok() bool { return c != nil && c.fd != nil }
// Implementation of the Conn interface.
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
net.Listen
net.ListensocketnetFDlistenStreamnetFDruntime.runtime_pollServerInitruntime.poll_runtime_pollServerInitruntime.netpollGenericInit
epollcreate1epfdruntime.nonblockingPipenetpollBreakRdepollevent
相关源码如下:
// 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
// On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
// introduced in 2.6.27 kernel and on FreeBSD both flags were
// introduced in 10 kernel. If we get an EINVAL error on Linux
// or EPROTONOSUPPORT error on FreeBSD, fall back to using
// socket without them.
socketFunc func(int, int, int) (int, error) = syscall.Socket
// 用上面创建的 listener fd 初始化 listener netFD
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
// 完成绑定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 完成监听操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
// 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例
var serverInit sync.Once
// netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,
// 它会创建 epoll 实例并把 listener fd 加入监听队列
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 通过 `go:linkname` 链接到具体的实现函数 poll_runtime_pollServerInit,
// 接着再调用 netpollGenericInit,然后会根据不同的系统平台去调用特定的 netpollinit 来创建 epoll 实例
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到
// epoll 实例中,另外,它会初始化一个 pollDesc 并返回
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
// 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,
// 后续使用直接通过该指针操作
pd.runtimeCtx = ctx
return nil
}
var (
// 全局唯一的 epoll fd,只在 listener fd 初始化之时被指定一次
epfd int32 = -1 // epoll descriptor
)
// netpollinit 会创建一个 epoll 实例,然后把 epoll fd 赋值给 epfd,
// 后续 listener 以及它 accept 的所有 sockets 有关 epoll 的操作都是基于这个全局的 epfd
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
// netpollopen 会被 runtime_pollOpen 调用,注册 fd 到 epoll 实例,
// 注意这里使用的是 epoll 的 ET 模式,同时会利用万能指针把 pollDesc 保存到 epollevent 的一个 8 位的字节数组 data 里
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
// Go 对上面三个调用的封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList
netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。
Listener.Accept()
netpoll
listenacceptsyscall.EAGAINwaitReadwaitRead
Listener.Accept()netFD.accept
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
// 调用 poll.FD 的 Accept 方法接受新的 socket 连接,返回 socket 的 fd
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
// 以 socket fd 构造一个新的 netFD,代表这个新的 socket
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// 调用 netFD 的 init 方法完成初始化
if err = netfd.init(); err != nil {
fd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
netFD.acceptpoll.FD.Acceptaccept
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
// 使用 linux 系统调用 accept 接收新连接,创建对应的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
// 因为 listener fd 在创建的时候已经设置成非阻塞的了,
// 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回
if err == nil {
return s, rsa, "", err
}
// 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
// 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.
// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4
pollDesc.waitRead
poll.FD.Accept() 返回之后,会构造一个对应这个新 socket 的 netFD,然后调用 init() 方法完成初始化,这个 init 过程和前面 net.Listen() 是一样的,调用链:netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),最终又会走到这里:
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
然后把这个 socket fd 注册到 listener 的 epoll 实例的事件队列中去,等待 I/O 事件。
Conn.Read/Conn.Write
Conn.ReadListener.AcceptnetFD.Readpoll.FD.Readsyscall.Read
// Implementation of the Conn interface.
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成
// 了非阻塞 I/O,所以这里同样也是直接返回,不管有没有可读的数据
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
// err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读
if err == syscall.EAGAIN && fd.pd.pollable() {
// 如果当前没有发生期待的 I/O 事件,那么 waitRead
// 会通过 park goroutine 让逻辑 block 在这里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
conn.Writeconn.ReadpollDesc.waitReadpollDesc.waitWrite
pollDesc.waitRead/pollDesc.waitWrite
pollDesc.waitReadpoll.runtime_pollWaitruntime.poll_runtime_pollWait
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != pollNoError {
return err
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,
// 这里的 for 循环是为了一直等到 io ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp 保存的是 goroutine 的数据结构 g,这里会根据 mode 的值决定是 rg 还是 wg,
// 前面提到过,rg 和 wg 是用来保存等待 I/O 就绪的 gorouine 的,后面调用 gopark 之后,
// 会把当前的 goroutine 的抽象数据结构 g 存入 gpp 这个指针,也就是 rg 或者 wg
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 这个 for 循环是为了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此时已有期待的 I/O 事件发生,
// 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
// waitio 此时是 false,netpollcheckerr 方法会检查当前 pollDesc 对应的 fd 是否是正常的,
// 通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark
// 把当前 goroutine 给 park 住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,
// 然后 unpark 返回,在 gopark 内部会把当前 goroutine 的抽象数据结构 g 存入
// gpp(pollDesc.rg/pollDesc.wg) 指针里,以便在后面的 netpoll 函数取出 pollDesc 之后,
// 把 g 添加到链表里返回,接着重新调度 goroutine
if waitio || netpollcheckerr(pd, mode) == 0 {
// 注册 netpollblockcommit 回调给 gopark,在 gopark 内部会执行它,保存当前 goroutine 到 gpp
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent READY notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
// gopark 会停住当前的 goroutine 并且调用传递进来的回调函数 unlockf,从上面的源码我们可以知道这个函数是
// netpollblockcommit
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
// gopark 最终会调用 park_m,在这个函数内部会调用 unlockf,也就是 netpollblockcommit,
// 然后会把当前的 goroutine,也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
mcall(park_m)
}
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit,把当前的 goroutine,
// 也就是 g 数据结构保存到 pollDesc 的 rg 或者 wg 指针里
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
// netpollblockcommit 在 gopark 函数里被调用
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 通过原子操作把当前 goroutine 抽象的数据结构 g,也就是这里的参数 gp 存入 gpp 指针,
// 此时 gpp 的值是 pollDesc 的 rg 或者 wg 指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
pollDesc.waitWritepollDesc.waitReadpoll.runtime_pollWaitruntime.poll_runtime_pollWait
netpoll
eventpoll.rbr_Grunning_Gwaitting
所以我们现在可以来从整体的层面来概括 Go 的网络业务 goroutine 是如何被规划调度的了:
conn.Readconn.Writegoparkruntime.schedule()runtime.nepollruntime.netpoll
runtime.netpoll
epollwaitepollwaitepollwait
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
// 根据特定的规则把 delay 值转换为 epollwait 的 timeout 值
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
// 超时等待就绪的 fd 读写事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
// toRun 是一个 g 的链表,存储要恢复的 goroutines,最后返回给调用方
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
// Go scheduler 在调用 findrunnable() 寻找 goroutine 去执行的时候,
// 在调用 netpoll 之时会检查当前是否有其他线程同步阻塞在 netpoll,
// 若是,则调用 netpollBreak 来唤醒那个线程,避免它长时间阻塞
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
// 判断发生的事件类型,读类型或者写类型等,然后给 mode 复制相应的值,
// mode 用来决定从 pollDesc 里的 rg 还是 wg 里取出 goroutine
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用 netpollready,传入就绪 fd 的 pollDesc,
// 把 fd 对应的 goroutine 添加到链表 toRun 中
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// netpollready 调用 netpollunblock 返回就绪 fd 对应的 goroutine 的抽象数据结构 g
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
// netpollunblock 会依据传入的 mode 决定从 pollDesc 的 rg 或者 wg 取出当时 gopark 之时存入的
// goroutine 抽象数据结构 g 并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// mode == 'r' 代表当时 gopark 是为了等待读事件,而 mode == 'w' 则代表是等待写事件
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// 取出 gpp 存储的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 重置 pollDesc 的 rg 或者 wg
if atomic.Casuintptr(gpp, old, new) {
// 如果该 goroutine 还是必须等待,则返回 nil
if old == pdWait {
old = 0
}
// 通过万能指针还原成 g 并返回
return (*g)(unsafe.Pointer(old))
}
}
}
// netpollBreak 往通信管道里写入信号去唤醒 epollwait
func netpollBreak() {
// 通过 CAS 避免重复的唤醒信号被写入管道,
// 从而减少系统调用并节省一些系统资源
if atomic.Cas(&netpollWakeSig, 0, 1) {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
}
netpollnetpollepoll_waiteventpoll.rdllistepoll_ctlgnetpollinjectglist
netpollnetpoll
runtime.schedule()runtime.findrunable()runtime.findrunable()runtime.netpoll
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
...
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
...
}
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
...
// Poll network.
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
if faketime != 0 {
// When using fake time, just poll.
delta = 0
}
list := netpoll(delta) // 同步阻塞调用 netpoll,直至有可用的 goroutine
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if faketime != 0 && list.empty() {
// Using fake time and nothing is ready; stop M.
// When all M's stop, checkdead will call timejump.
stopm()
goto top
}
lock(&sched.lock)
_p_ = pidleget() // 查找是否有空闲的 P 可以来就绪的 goroutine
unlock(&sched.lock)
if _p_ == nil {
injectglist(&list) // 如果当前没有空闲的 P,则把就绪的 goroutine 放入全局调度队列等待被执行
} else {
// 如果当前有空闲的 P,则 pop 出一个 g,返回给调度器去执行,
// 并通过调用 injectglist 把剩下的 g 放入全局调度队列或者当前 P 本地调度队列
acquirep(_p_)
if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
} else if pollUntil != 0 && netpollinited() {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
netpollBreak()
}
}
stopm()
goto top
}
sysmonruntime.netpoll
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
...
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
...
}
sysmonsysmonsysmon
runtime.netpollstartm_Pidleretake_Psyscall
Listener.Acceptconn.Readconn.Write
Go netpoller 的价值
通过前面对源码的分析,我们现在知道 Go netpoller 依托于 runtime scheduler,为开发者提供了一种强大的同步网络编程模式;然而,Go netpoller 存在的意义却远不止于此,Go netpoller I/O 多路复用搭配 Non-blocking I/O 而打造出来的这个原生网络模型,它最大的价值是把网络 I/O 的控制权牢牢掌握在 Go 自己的 runtime 里,关于这一点我们需要从 Go 的 runtime scheduler 说起,Go 的 G-P-M 调度模型如下:
_Grunnable
_Grunnable_Grunnableeventpoll.rdreventpoll.rdllistnetpollepoll_wait_Grunnable
Goroutine 的调度
这一小节主要是讲处理网络 I/O 的 goroutines 阻塞之后,Go scheduler 具体是如何像前面几个章节所说的那样,避免让操作网络 I/O 的 goroutine 陷入到系统调用从而进入内核态的,而是封存 goroutine 然后让出 CPU 的使用权从而令 P 可以去调度本地调度队列里的下一个 goroutine 的。
「温馨提示」:这一小节属于延伸阅读,涉及到的知识点更偏系统底层,需要有一定的汇编语言基础才能通读,另外,这一节对 Go scheduler 的讲解仅仅涉及核心的一部分,不会把整个调度器都讲一遍(事实上如果真要解析 Go scheduler 的话恐怕重开一篇几万字的文章才能基本讲清楚。。。),所以也要求读者对 Go 的并发调度器有足够的了解,因此这一节可能会稍显深奥。当然这一节也可选择不读,因为通过前面的整个解析,我相信读者应该已经能够基本掌握 Go netpoller 处理网络 I/O 的核心细节了,以及能从宏观层面了解 netpoller 对业务 goroutines 的基本调度了。而这一节主要是通过对 goroutines 调度细节的剖析,能够加深读者对整个 Go netpoller 的彻底理解,接上前面几个章节,形成一个完整的闭环。如果对调度的底层细节没兴趣的话这也可以直接跳过这一节,对理解 Go netpoller 的基本原理影响不大,不过还是建议有条件的读者可以看看。
从源码可知,Go scheduler 的调度 goroutine 过程中所调用的核心函数链如下:
runtime.schedule --> runtime.execute --> runtime.gogo --> goroutine code --> runtime.goexit --> runtime.goexit1 --> runtime.mcall --> runtime.goexit0 --> runtime.schedule
runtime.schedule()runtime.schedule()goruntime.newprocruntime.newproc1runtime.newproc1gfreeruntime.gogo
runtime.gogo
// gobuf 存储 goroutine 调度上下文信息的结构体
type gobuf struct {
// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
//
// ctxt is unusual with respect to GC: it may be a
// heap-allocated funcval, so GC needs to track it, but it
// needs to be set and cleared from assembly, where it's
// difficult to have write barriers. However, ctxt is really a
// saved, live register, and we only ever exchange it between
// the real register and the gobuf. Hence, we treat it as a
// root during stack scanning, which means assembly that saves
// and restores it doesn't need write barriers. It's still
// typed as a pointer so that any other writes from Go get
// write barriers.
sp uintptr // Stack Pointer 栈指针
pc uintptr // Program Counter 程序计数器
g guintptr // 持有当前 gobuf 的 goroutine
ctxt unsafe.Pointer
ret sys.Uintreg
lr uintptr
bp uintptr // for GOEXPERIMENT=framepointer
}
runtime.execute()runtime.gogo
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if _g_.m.profilehz != hz {
setThreadCPUProfiler(hz)
}
if trace.enabled {
// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
traceGoStart()
}
// gp.sched 就是 gobuf
gogo(&gp.sched)
}
这里还需要了解一个概念:g0,Go G-P-M 调度模型中,g 代表 goroutine,而实际上一共有三种 g:
runtime.main
gosysmon
runtime.gogogogogp
runtime.gogo
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $16-8
// 将第一个 FP 伪寄存器所指向的 gobuf 的第一个参数存入 BX 寄存器,
// gobuf 的一个参数即是 SP 指针
MOVQ buf+0(FP), BX
MOVQ gobuf_g(BX), DX // 将 gp.sched.g 保存到 DX 寄存器
MOVQ 0(DX), CX // make sure g != nil
// 将 tls (thread local storage) 保存到 CX 寄存器,然后把 gp.sched.g 放到 tls[0],
// 这样以后调用 getg() 之时就可以通过 TLS 直接获取到当前 goroutine 的 g 结构体实例,
// 进而可以得到 g 所在的 m 和 p,TLS 里一开始存储的是系统堆栈 g0 的地址
get_tls(CX)
MOVQ DX, g(CX)
// 下面的指令则是对函数栈的 BP/SP 寄存器(指针)的存取,
// 最后进入到指定的代码区域,执行函数栈帧
MOVQ gobuf_sp(BX), SP // restore SP
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
// 这里是在清空 gp.sched,因为前面已经把 gobuf 里的字段值都存入了寄存器,
// 所以 gp.sched 就可以提前清空了,不需要等到后面 GC 来回收,减轻 GC 的负担
MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
// 把 gp.sched.pc 值放入 BX 寄存器
// PC 指针指向 gogo 退出时需要执行的函数地址
MOVQ gobuf_pc(BX), BX
// 用 BX 寄存器里的值去修改 CPU 的 IP 寄存器,
// 这样就可以根据 CS:IP 寄存器的段地址+偏移量跳转到 BX 寄存器里的地址,也就是 gp.sched.pc
JMP BX
runtime.gogogp.schedgobufruntime.gogo
JMP BXgp.sched.pc
gocmd/compile/internal/gc.state.stmtcmd/compile/internal/gc.state.callgoruntime.newprocruntime.newprocruntime.newproc1
// Create a new g in state _Grunnable, starting at fn, with narg bytes
// of arguments starting at argp. callerpc is the address of the go
// statement that created this. The caller is responsible for adding
// the new g to the scheduler.
//
// This must run on the system stack because it's the continuation of
// newproc, which cannot split the stack.
//
//go:systemstack
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
...
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
// 把 goexit 函数地址存入 gobuf 的 PC 指针里
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
casgstatus(newg, _Gdead, _Grunnable)
...
}
newg.sched.pcruntime.goexitnewgruntime.gogoruntime.gogoJMP BXruntime.goexit
// The top-most function running on a goroutine
// returns to goexit+PCQuantum. Defined as ABIInternal
// so as to make it identifiable to traceback (this
// function it used as a sentinel; traceback wants to
// see the func PC, not a wrapper PC).
TEXT runtime·goexit<ABIInternal>(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP
runtime·goexit1
// Finishes execution of the current goroutine.
func goexit1() {
if raceenabled {
racegoend()
}
if trace.enabled {
traceGoEnd()
}
mcall(goexit0)
}
runtime.mcall
// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
// 切换回 g0 的系统堆栈,执行 fn(g)
TEXT runtime·mcall(SB), NOSPLIT, $0-8
// 取入参 funcval 对象的指针存入 DI 寄存器,此时 fn.fn 是 goexit0 的地址
MOVQ fn+0(FP), DI
get_tls(CX)
MOVQ g(CX), AX // save state in g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(AX)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(AX)
MOVQ AX, (g_sched+gobuf_g)(AX)
MOVQ BP, (g_sched+gobuf_bp)(AX)
// switch to m->g0 & its stack, call fn
MOVQ g(CX), BX
MOVQ g_m(BX), BX
// 把 g0 的栈指针存入 SI 寄存器,后面需要用到
MOVQ m_g0(BX), SI
CMPQ SI, AX // if g == m->g0 call badmcall
JNE 3(PC)
MOVQ $runtime·badmcall(SB), AX
JMP AX
// 这两个指令是把 g0 地址存入到 TLS 里,
// 然后从 SI 寄存器取出 g0 的栈指针,
// 替换掉 SP 寄存器里存的当前 g 的栈指针
MOVQ SI, g(CX) // g = m->g0
MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp
PUSHQ AX
MOVQ DI, DX
// 入口处的第一个指令已经把 funcval 实例对象的指针存入了 DI 寄存器,
// 0(DI) 表示取出 DI 的第一个成员,即 goexit0 函数地址,再存入 DI
MOVQ 0(DI), DI
CALL DI // 调用 DI 寄存器里的地址,即 goexit0
POPQ AX
MOVQ $runtime·badmcall2(SB), AX
JMP AX
RET
runtime.mcallruntime.goexit0
func goexit0(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
dropg()
if GOARCH == "wasm" { // no threads yet on wasm
gfput(_g_.m.p.ptr(), gp)
schedule() // never returns
}
if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
throw("internal lockOSThread error")
}
gfput(_g_.m.p.ptr(), gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
schedule()
}
runtime.goexit0
_Grunning_Gdeadruntime.dropggfreeruntime.schedule()
runtime.goparkruntime.gogoruntime.mcallruntime.park_m
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
runtime.mcallruntime.mcallruntime.park_m_Grunning_Gwaitingruntime.schedule()
至此,我们完成了对 Go netpoller 原理剖析的整个闭环。
Go netpoller 的问题
goroutine-per-connectiongoroutine-per-connection
Reactor 网络模型
目前 Linux 平台上主流的高性能网络库/框架中,大都采用 Reactor 模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。
I/O 多路复用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)
通常设置一个主线程负责做 event-loop 事件循环和 I/O 读写,通过 select/poll/epoll_wait 等系统调用监听 I/O 事件,业务逻辑提交给其他工作线程去做。而所谓『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系统调用上,这样可以最大限度的复用 event-loop 线程,让一个线程能服务于多个 sockets。在 Reactor 模式中,I/O 线程只能阻塞在 I/O multiplexing 函数上(select/poll/epoll_wait)。
Reactor 模式的基本工作流程如下:
bind&listenselect/poll/epoll_waitsocket.acceptselect/poll/epoll_waitreadwrite
accept 连接以及 conn 上的读写操作若是在主线程完成,则要求是非阻塞 I/O,因为 Reactor 模式一条最重要的原则就是:I/O 操作不能阻塞 event-loop 事件循环。「实际上 event loop 可能也可以是多线程的,只是一个线程里只有一个 select/poll/epoll_wait」。
上面提到了 Go netpoller 在某些场景下可能因为创建太多的 goroutine 而过多地消耗系统资源,而在现实世界的网络业务中,服务器持有的海量连接中在极短的时间窗口内只有极少数是 active 而大多数则是 idle,就像这样(非真实数据,仅仅是为了比喻):
那么为每一个连接指派一个 goroutine 就显得太过奢侈了,而 Reactor 模式这种利用 I/O 多路复用进而只需要使用少量线程即可管理海量连接的设计就可以在这样网络业务中大显身手了:
在绝大部分应用场景下,我推荐大家还是遵循 Go 的 best practices,使用原生的 Go 网络库来构建自己的网络应用。然而,在某些极度追求性能、压榨系统资源以及技术栈必须是原生 Go (不考虑 C/C++ 写中间层而 Go 写业务层)的业务场景下,我们可以考虑自己构建 Reactor 网络模型。
gnet
Github: https://github.com/panjf2000/gnet
gnet
gnet
gnetgnetgnet
gnet,在某些极端的网络业务场景,比如海量连接、高频短连接、网络小包等等场景,gnet 在性能和资源占用上都远超 Go 原生的 net 包(基于 netpoller)。
gnetMulti-ReactorsMulti-Reactors + Goroutine Poolgnet
🚀 功能
TCPUDPUnix Domain SocketRound-Robin(轮询)Source-Addr-Hash(源地址哈希)Least-Connections(最少连接数)epollkqueuegnet
参考&延伸阅读
selectpoll
- END -
看完一键三连在看,转发,点赞
是对文章最大的赞赏,极客重生感谢你
推荐阅读
求点赞,在看,分享三连