目录
0. 前言
前两周匆匆忙忙,焦虑万分,想要深入 Gin 框架源码去做一个解读,于是在细节上越陷越深,进度缓慢,最终止步不前,又悬空于放弃的边缘,在这里,感谢小徐先生的编程世界的知识分享,让我从最初的惊叹、丧气,到现在的平静,如果不是亲身去解读源码,如果没有这些许许多多的优秀技术大佬的分享,也许我永远体会不了:好的程序员掌控数据结构和他们之间的关系,烂程序员去翻译源码细节这样的话。接下来,我希望从 net/http 包的底层实现开始,开展我的源码学习之路,也是向各位技术大佬的学习和致敬。
1. 整体架构初探1.1. C-S 架构
http 协议下,交互框架是由客户端(Client)和服务端(Server)两个模块组成的 C-S 架构,由服务端在服务器上开启某个端口的监听,等待来自客户端的请求然后做出相应的响应。本文将由总体到局部,介绍 net/http 包的使用方式,并分别针对 Server 和 Client 两条线展开相关源码的走读。
1.2. 注册路由-开启监听
http.HandleFunc("/api/apps/v1/ping", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("pong"))
})
http.ListenAndServe(":8000", nil)
像上面这样,我们就完成了两件事:
- 调用 http.HandlerFunc 方法,注册了对应于请求路径 "/api/apps/v1/ping" 的 handler 函数
- 调用 http.ListenAndServe 方法,在服务器的 8000 端口上开启了服务监听。
这里的细节我们放到第二章再深入。
1.3. 构造请求-发送请求-接收响应
serverAddr := "127.0.0.1"
serverPort := ":8000"
resourceUrl := "/api/apps/v1/ping"
absoluteUrl := strings.Join([]string{serverAddr, serverPort, resourceUrl}, "")
contentType := "application/json"
reqBody, _ := json.Marshal(map[string]interface{}{"key1": "val1", "key2": "val2"})
resp, _ := http.Post(absoluteUrl, contentType, bytes.NewReader(reqBody))
respBody, _ := io.ReadAll(resp.Body)
fmt.Printf("response from server [%v]: %v", serverAddr, respBody)
像上面这样,我们完成了三件事:
- 构造请求 url,请求内容类型,请求体,也就是构造请求
- 调用 http.Post 方法发送请求,接收响应
- 调用 io.ReadAll 方法读取响应体内容并输出
这里的细节我们放到第三章再深入。
1.4. 源码位置,相关阅读推荐
| 模块 | 文件 |
|---|---|
| 服务端 | net/http/server.go |
| 客户端 -- 主流程 | net/http/client.go |
| 客户端 -- 构造请求 | net/http/request.go |
| 客户端 -- 网络交互 | net/http/transport.go |
| 推荐阅读 | io |
| 推荐阅读 | bytes |
2.1. 核心数据结构
1) Server
整个 http 服务端模块被封装在了 Server 结构体中,Handler 是其中最核心的成员字段,实现了从请求路径 path 到具体处理函数 handler 的注册和映射能力,在构造 Server 对象时,若其中的 Handler 字段未显式声明,则会取 net/http 包下的单例对象 DefaultServeMux 进行默认替代。
type Server struct {
// Addr optionally specifies the TCP address for the server to listen on,
// in the form "host:port". If empty, ":http" (port 80) is used.
Addr string
Handler Handler // handler to invoke, http.DefaultServeMux if nil
// ......
}
2)Handler
Handler 是一个接口,定义了一个方法:ServeHTTP(ResponseWriter, *Request),该方法的作用是根据 http 请求 Request 中的请求路径 path 映射到对应的 handler 处理函数,对请求进行处理和响应。
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
3)ServeMux
ServeMux 是对 Handler 的具体实现,内部通过 map 保存了从路径到处理函数的映射关系
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
hosts bool // whether any patterns contain hostnames
}
4)muxEntry
muxEntry 作为一个路由单元,内部包含了请求路径 pattern 和处理函数 Handler 两部分
type muxEntry struct {
h Handler
pattern string
}
2.2. 注册路由-pattern&handler
当用户通过公开的 http.HandleFunc 注册路由时,实际上是将其注册到了 net/http 提供的默认单例 ServeMux 对象 -- DefaultServeMux 中。
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}
var DefaultServeMux = &defaultServeMux
var defaultServeMux ServeMux
ServeMux.HandleFunc 内部,首先会将用户传入的 handler 函数转换为 HandlerFunc 函数类型,因为 HandlerFunc 函数类型实现了 Handler 接口,
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
// 典型的适配器模式
type HandlerFunc func(ResponseWriter, *Request)
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
f(w, r)
}
紧接着,就会将 pattern 和 HandlerFunc(handler) 当作入参,调用 ServeMux.Handle 方法,将路由注册到 DefaultServeMux 的 map 中。
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
if handler == nil {
panic("http: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}
实现路由注册的核心位于 ServeMux.Handle 方法中,两个核心逻辑必须要说一下:
- 将 pattern 和 handler 包装成 muxEntry 对象,注册到 DefaultServeMux 的路由 map -- m 中
- 对于以 "/" 结尾的路径,将其按照路径长度加入到 DefaultServeMux 的排序数组 -- es 中
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()
// ......
if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
if pattern[len(pattern)-1] == '/' {
mux.es = appendSorted(mux.es, e)
}
// ......
}
func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
n := len(es)
i := sort.Search(n, func(i int) bool {
return len(es[i].pattern) < len(e.pattern)
})
if i == n {
return append(es, e)
}
// we now know that i points at where we want to insert
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
copy(es[i+1:], es[i:]) // Move shorter entries down
es[i] = e
return es
}
2.3. 启动服务监听
注册完服务的路由之后,就可以使用 http.ListenAndServe 方法在特定端口上开启服务监听,而当用户在浏览器上输入对应的请求路径之后,其实是经历了一系列调用链路,才最终拿到我们注册好的对应路径的处理函数来为用户服务,接下来,我们就一起来探索一下这个过程,坐上源码列车,与我一同出发吧~
1)http.ListenAndServe 方法
当我们使用 net/http 公开的 ListenAndServe 方法启动服务时,其实内部是用我们传入的端口和处理器入参生成了一个 Server 对象,然后调用 Server.ListenAndServe 方法。
func ListenAndServe(addr string, handler Handler) error {
server := &Server{Addr: addr, Handler: handler}
return server.ListenAndServe()
}
2)Server.ListenAndServe
Server.ListenAndServe 方法内部主要做了两件事:
- 调用 net.Listen 在特定端口上开启监听,返回 net.Listener 接口的实现 -- *net.TCPListener
- 调用 Server.Serve 开启服务
func (srv *Server) ListenAndServe() error {
// ......
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
3)net.Listen & Server.Serve
net.Listen 方法根据传入的协议和端口,在服务器上开启监听,并返回 net.TCPListener 对象。
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
Server.Serve 方法则是启动服务监听的核心,体现了 http 服务的运行精髓 “for + Accept()” 模式,onceCloseListener.Accept 方法负责监听用户请求,返回 http.Conn 接口的某种具体实现对象,在我们的用法中,返回的是 net.TCPConn 对象,该对象对应用户请求服务建立的 TCP 连接;然后,调用 Server.newConn 方法将 Server 对象和 Conn 接口的实现类封装进 http.conn 结构中;紧接着,就会为这个连接开启一个协程,调用 conn.serve 方法去为每一个请求对应的连接服务。
func (srv *Server) Serve(l net.Listener) error {
// ......
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
// ...... 网络错误重试
}
connCtx := ctx
// ......
c := srv.newConn(rw)
// ......
go c.serve(connCtx)
}
}
func (srv *Server) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc: rwc,
}
// ......
return c
}
conn.serve 方法中,会将 Server 对象封装进 serverHandler 结构中,然后调用 serverHandler 对象的 ServeHTTP 方法,在 serverHandler.ServeHTTP 方法中,会判断 Server 对象中的 Handler 是否为 nil,如果是,则会使用默认的 ServeMux 单例对象 DefaultServeMux 来代替,紧接着,调用 ServeMux.ServeHTTP 方法,准备到 DefaultServeMux 对象中去查找路由。
func (c *conn) serve(ctx context.Context) {
// ......
serverHandler{c.server}.ServeHTTP(w, w.req)
// ......
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
// ......
handler.ServeHTTP(rw, req)
}
那么,ServeMux.ServeHTTP 查找路由的方式又是怎样的呢?首先,他会调用 ServeMux.Handler 方法传入请求内容,在 ServeMux.Handler 方法中,会从请求内容中解析出请求路径等内容,然后调用 ServeMux.handler 方法,传入主机名和请求路径。最终,ServeMux.Handler 方法经过一系列的方法调用,获取到我们注册好的与请求路径对应处理函数,并用来实际处理用户对应的请求,返回响应内容。
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
// ......
h, _ := mux.Handler(r)
h.ServeHTTP(w, r)
}
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
// ......
// All other requests have any port stripped and path cleaned
// before passing to mux.handler.
host := stripHostPort(r.Host)
path := cleanPath(r.URL.Path)
// ......
if path != r.URL.Path {
_, pattern = mux.handler(host, path)
u := &url.URL{Path: path, RawQuery: r.URL.RawQuery}
return RedirectHandler(u.String(), StatusMovedPermanently), pattern
}
return mux.handler(host, r.URL.Path)
}
ServeMux.handler 方法中,会根据主机名和请求路径,调用 ServeMux.match 方法去查找对应的处理函数。
func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
// Host-specific pattern takes precedence over generic ones
if mux.hosts {
h, pattern = mux.match(host + path)
}
if h == nil {
h, pattern = mux.match(path)
}
if h == nil {
h, pattern = NotFoundHandler(), ""
}
return
}
ServeMux.match 是 ServeMux 对象处理用户请求的核心,它会根据用户传入的请求路径,从 ServeMux 对象的 map 成员 m 中查找到对应 pattern 的处理函数,如果路径是以 "/" 结尾的模式,还支持模糊搜索,从 ServeMux 对象的 slice 成员 es 中取出具有公共前缀的已注册路径对应的处理函数来代替未找到处理函数的请求路径。
func (mux *ServeMux) match(path string) (h Handler, pattern string) {
// Check for exact match first.
v, ok := mux.m[path]
if ok {
return v.h, v.pattern
}
// Check for longest valid match. mux.es contains all patterns
// that end in / sorted from longest to shortest.
for _, e := range mux.es {
if strings.HasPrefix(path, e.pattern) {
return e.h, e.pattern
}
}
return nil, ""
}
3. 客户端初探
3.1. 核心数据结构
1)Client
和服务端一样,客户端模块也有一个对应的数据结构 Client,实现了对整个客户端模块的封装
- Tranceport: 负责 http 通信的核心部分
- Jar: cookie 管理
- Timeout: 超时设置
type Client struct {
Transport RoundTripper
// ......
Jar CookieJar
Timeout time.Duration
}
2)RoundTripper
RoundTripper 是通信模块的接口,定义了方法 RoundTrip,根据传入请求 Request 与服务端交互后获得响应 Response 。
type RoundTripper interface {
RoundTrip(*Request) (*Response, error)
}
3)Transport
Transport 是 RoundTripper 的实现类,核心字段有:
- idleConn: 空闲连接 map,实现复用
- DialContext: 新连接生成器
type Transport struct {
//......
idleConn map[connectMethodKey][]*persistConn // most recently used at end
// ......
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
// ......
}
4)Request
http 请求参数结构体
type Request struct {
// ......
// HTTP 方法名,缺省为 GET
Method string
// 请求路径
URL *url.URL
// 请求头
Header Header
// 请求体内容
Body io.ReadCloser
// 返回请求体内容拷贝,用于重定向等多次读取请求体的场景
GetBody func() (io.ReadCloser, error)
// 请求体字节数
ContentLength int64
// 服务器主机名
Host string
// query 请求参数
Form url.Values
// 响应参数对象,重定向的时候用于记录上次请求响应内容(只在请求发生重定向时有用)
Response *Response
// 请求链路上下文
ctx context.Context
// ......
}
5)Response
http 响应参数结构体
type Response struct {
// ......
// 响应状态
Status string // e.g. "200 OK"
// 响应状态码
StatusCode int // e.g. 200
// 协议类型
Proto string // e.g. "HTTP/1.0"
ProtoMajor int // e.g. 1
ProtoMinor int // e.g. 0
// 响应头
Header Header
// 响应体
Body io.ReadCloser
// 响应体字节数
ContentLength int64
// 请求参数内容
Request *Request
// ......
}
希望大家认真阅读一下源码中关于这几个核心数据结构的注释,会有很大收获。
3.2. 方法链路总览
构造客户端发起 http 请求大致分为以下几个步骤:
- 构造请求参数
- 获取用于与服务端交互的 tcp 连接
- 通过 tcp 连接发送请求参数
- 通过 tcp 连接接收响应结果
3.3. 从构造请求到构造网络连接
调用 net/http 包下的公开方法 Post 时,需要传入服务端地址 url,请求参数格式 contentType 以及请求体 io.Reader,方法中会调用包下的客户端默认单例对象 DefaultClient 来处理这此请求。
// DefaultClient is the default Client and is used by Get, Head, and Post.
var DefaultClient = &Client{}
func Post(url, contentType string, body io.Reader) (resp *Response, err error) {
return DefaultClient.Post(url, contentType, body)
}
在 Client.Post 方法中,首先由 http.NewRequest 方法,调起 http.NewRequestWithContext 方法,根据 HTTP Method,url,body 入参,封装好用户的请求对象 req,然后再调用 Client.Do 方法,将请求发送出去。
func (c *Client) Post(url, contentType string, body io.Reader) (resp *Response, err error) {
req, err := NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)
return c.Do(req)
}
// NewRequest wraps NewRequestWithContext using context.Background.
func NewRequest(method, url string, body io.Reader) (*Request, error) {
return NewRequestWithContext(context.Background(), method, url, body)
}
接下来,经过 Client.Do,Client.do,Client.send 方法的辗转,最终来到了 http.send 方法。
func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}
func (c *Client) do(req *Request) (retres *Response, reterr error) {
// ......
var (
deadline = c.deadline()
reqs []*Request
resp *Response
copyHeaders = c.makeHeadersCopier(req)
reqBodyClosed = false // have we closed the current req.Body?
// Redirect behavior:
redirectMethod string
includeBody bool
)
// ......
for {
// ......
reqs = append(reqs, req)
var err error
var didTimeout func() bool
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
// ......
req.closeBody()
}
}
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}
在 Client.send 调用 http.send 方法时,会将 http.RoundTripper 接口的实现类 Transport 的默认对象 DefaultTransport 当作参数传入。
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}
var DefaultTransport RoundTripper = &Transport{
Proxy: ProxyFromEnvironment,
DialContext: defaultTransportDialContext(&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}),
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
在 http.send 方法中,则是调用 DefaultTransport.roundTrip 方法对请求进行处理。
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
// ......
resp, err = rt.RoundTrip(req)
if err != nil {
// ......
}
// ......
return resp, nil, nil
}
Transport.roundTrip 方法中,会调用 Transport.getConn 方法复用或创建新的网络连接,然后再使用连接的 pconn.roundTrip 方法为请求服务。
// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
// ......
for {
// ......
// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
// ......
pconn, err := t.getConn(treq, cm)
// ......
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
resp.Request = origReq
return resp, nil
}
// ......
}
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ......
var continueCh chan struct{}
if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
continueCh = make(chan struct{}, 1)
}
// ......
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
cancelKey: req.cancelKey,
ch: resc,
// ......
}
// ......
for {
// ......
select {
// ......
case re := <-resc:
// ......
return re.res, nil
// ......
}
}
}
3.4. 构造网络连接的方式
构造网络连接 Transport.Transport 有两种方式:
- 复用连接
- 新建连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
// ......
// 获取连接的请求参数体
w := &wantConn{
cm: cm,
// 由 http 协议、服务端地址等信息组成的键
key: cm.key(),
ctx: ctx,
// 标识连接获取成功的信号通道
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
// ......
// 尝试复用指向相同服务端地址的空闲网络连接
// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// ......
return pc, nil
}
// ......
// 异步构造新的网络连接
t.queueForDial(w)
// Wait for completion or cancellation.
select {
case <-w.ready:
// ......
return w.pc, w.err
// ......
}
}
1)复用连接
- 尝试从 Transport.idleConn 中获取指向统一服务端的空闲网络连接
- 获取到空闲连接后调用 wantConn.tryDeliver 方法将连接绑定到 wantConn 入参上,并关闭 wantConn.ready 管道,以唤醒阻塞读取该管道的 goroutine
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
// ......
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
pconn := list[len(list)-1]
// ......
delivered = w.tryDeliver(pconn, nil)
if delivered {
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
// ......
}
}
// tryDeliver attempts to deliver pc, err to w and reports whether it succeeded.
func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
w.mu.Lock()
defer w.mu.Unlock()
if w.pc != nil || w.err != nil {
return false
}
w.pc = pc
w.err = err
if w.pc == nil && w.err == nil {
panic("net/http: internal error: misuse of tryDeliver")
}
close(w.ready)
return true
}
2)新建连接
在 Transport.queueForDial 方法中会异步调用 Transport.dialConnFor 方法,创建新的网络连接,由于是异步操作,所以在上游会通过读取 ready 管道的方式,等待创建操作的完成。
这里之所以采用异步操作进行连接创建,有两部分原因:
- 一个网络连接并不是一个静态的数据,它是有生命周期的,创建过程会为其创建负责读写的两个守护协程,伴随而生
- 在上游 Transport.getConn 方法中,当通过 select 多路复用的方式,接收到其它终止信号时,可以提前退出。相比串行化执行而言,这种异步交互的模式,具有更高的灵活性
func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
if t.MaxConnsPerHost <= 0 {
go t.dialConnFor(w)
return
}
t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.cleanFront()
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
pc, err := t.dialConn(w.ctx, w.cm)
delivered := w.tryDeliver(pc, err)
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}
Transport.dialConnFor 方法包含了创建网路连接的核心逻辑:
- 调用 Transport.dial 方法,最终通过 Transport.DialContext 成员函数,创建好连接,封装到 persistConn 中
- 异步启动连接的伴生读写协程 readLoop 和 writeLoop,组成发送请求,接收响应的循环
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
// ......
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
// ......
} else {
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
// ......
}
}
// ......
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
在伴生读协程 persistConn.readLoop 中,会读取来自服务端的响应,并添加到 persistConn.reqch 管道中,供上游的 persistConn.roundTrip 方法读取
func (pc *persistConn) readLoop() {
// ......
alive := true
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)
// ......
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
// ......
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// ......
}
}
在伴生写协程 persistConn.writeLoop 中,会通过 persistConn.writech 读取到上游的 persistConn.roundTrip 方法提交的请求,将其发送到服务端
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
if bre, ok := err.(requestBodyReadError); ok {
err = bre.error
// Errors reading from the user's
// Request.Body are high priority.
// Set it here before sending on the
// channels below or calling
// pc.close() which tears down
// connections and causes other
// errors.
wr.req.setError(err)
}
if err == nil {
err = pc.bw.Flush()
}
if err != nil {
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
pc.writeErrCh <- err // to the body reader, which might recycle us
wr.ch <- err // to the roundTrip function
if err != nil {
pc.close(err)
return
}
case <-pc.closech:
return
}
}
}
3)归还连接
有复用连接的能力,则必然存在归还连接的机制。
首先,在获取连接的途中,如果被打断,则可能将连接放回队列以供复用:
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
// ......
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()
// ......
}
// cancel marks w as no longer wanting a result (for example, due to cancellation).
// If a connection has been delivered already, cancel returns it with t.putOrCloseIdleConn.
func (w *wantConn) cancel(t *Transport, err error) {
w.mu.Lock()
if w.pc == nil && w.err == nil {
close(w.ready) // catch misbehavior in future delivery
}
pc := w.pc
w.pc = nil
w.err = err
w.mu.Unlock()
if pc != nil {
t.putOrCloseIdleConn(pc)
}
}
其次,当与服务端的一轮交互结束,也会将连接放回队列以供复用:
func (pc *persistConn) readLoop() {
// ......
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}
// ......
alive := true
for alive {
// ......
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancellation or death)
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
// ......
}
}
3.5. 展望
花了一周的时间,来阅读 net/http 包的部分源码,感觉得到收获,但是仍有大部分源码是模糊的,并非它不是核心逻辑,应该说,标准库没有哪一部分不是核心逻辑,但无奈短期内没有太多时间投入到这里,还是随着时间的推移,慢慢的掌握这些知识吧,源码使用了大量的设计模式,对于我这种对设计模式一点都不了解的人,阅读源码可谓是真的痛苦,有时间还是要认真学习设计模式这门学问,接下来的一周呢,会回到 gin 框架上,以类似的方式,将设计原理解析出来。下篇文章见~