(*Transport).roundTript.nextProtoOnce.Do(t.onceSetNextProtoDefaults)TLSClientConfigh2transportTLSClientConfig: 初始化client支持的http协议, 并在tls握手时告知server。
h2transport: 如果本次请求是http2,那么h2transport会接管连接,请求和响应的处理逻辑。
下面看看源码:
func (t *Transport) onceSetNextProtoDefaults() {
// ...此处省略代码...
t2, err := http2configureTransport(t)
if err != nil {
log.Printf("Error enabling Transport HTTP/2 support: %v", err)
return
}
t.h2transport = t2
// ...此处省略代码...
}
func http2configureTransport(t1 *Transport) (*http2Transport, error) {
connPool := new(http2clientConnPool)
t2 := &http2Transport{
ConnPool: http2noDialClientConnPool{connPool},
t1: t1,
}
connPool.t = t2
if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil {
return nil, err
}
if t1.TLSClientConfig == nil {
t1.TLSClientConfig = new(tls.Config)
}
if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
}
if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
}
upgradeFn := func(authority string, c *tls.Conn) RoundTripper {
addr := http2authorityAddr("https", authority)
if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
go c.Close()
return http2erringRoundTripper{err}
} else if !used {
// Turns out we don't need this c.
// For example, two goroutines made requests to the same host
// at the same time, both kicking off TCP dials. (since protocol
// was unknown)
go c.Close()
}
return t2
}
if m := t1.TLSNextProto; len(m) == 0 {
t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{
"h2": upgradeFn,
}
} else {
m["h2"] = upgradeFn
}
return t2, nil
}笔者将上述的源码简单拆解为以下几个步骤:
http2clientConnPoolTLSClientConfigh2http1.1TLSClientConfig.NextProtosh2upgradeFnt1.TLSNextProto(*Transport).dialConnfunc (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
// ...此处省略代码...
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
// ...此处省略代码...
} else {
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
// Proxy setup.
// ...此处省略代码...
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
}
}
// ...此处省略代码...
}笔者对上述的源码描述如下:
t.dial(ctx, "tcp", cm.addr())NegotiatedProtocolt.TLSNextProtos.NegotiatedProtocolh2s.NegotiatedProtocolIsMutualtruehttp2configureTransportTLSNextProtoh2nextupgradeFnupgradeFnconnPool.addConnIfNeededt2http2Transportfunc (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {
p.mu.Lock()
// ...此处省略代码...
// 主要用于判断是否有必要像连接池添加新的连接
// 判断连接池中是否已有同host连接,如果有且该链接能够处理新的请求则直接返回
call, dup := p.addConnCalls[key]
if !dup {
// ...此处省略代码...
call = &http2addConnCall{
p: p,
done: make(chan struct{}),
}
p.addConnCalls[key] = call
go call.run(t, key, c)
}
p.mu.Unlock()
<-call.done
if call.err != nil {
return false, call.err
}
return !dup, nil
}
func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {
cc, err := t.NewClientConn(tc)
p := c.p
p.mu.Lock()
if err != nil {
c.err = err
} else {
p.addConnLocked(key, cc)
}
delete(p.addConnCalls, key)
p.mu.Unlock()
close(c.done)
}分析上述的源码我们能够得到两点结论:
upgradeFnt.NewClientConn(tc)http2clientConnPool最后我们回到(*Transport).roundTrip方法并分析其中的关键源码:
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
// ...此处省略代码...
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// ...此处省略代码...
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(req, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(req, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
return resp, nil
}
// ...此处省略代码...
}
}pconn.altpconn.alt.RoundTrip(req)http2Transport(*http2Transport).NewClientConn
t.newClientConn(c, t.disableKeepAlives())因为本节内容较多,所以笔者不再一次性贴出源码,而是按关键步骤分析并分块儿贴出源码。
http2ClientConncc := &http2ClientConn{
t: t,
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*http2clientStream),
singleUse: singleUse,
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
}上面的源码新建了一个默认的http2ClientConn。
initialWindowSize:初始化窗口大小为65535,这个值之后会初始化每一个数据流可发送的数据窗口大小。
maxConcurrentStreams:表示每个连接上允许最多有多少个数据流同时传输数据。
streams:当前连接上的数据流。
t.disableKeepAlives()2、创建一个条件锁并且新建Writer&Reader。
cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)cc.flow.add(int32(http2initialWindowSize))cc.flow.addhttp2initialWindowSize3、新建一个读写数据帧的Framer。
cc.fr = http2NewFramer(cc.bw, cc.br) cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
4、向server发送开场白,并发送一些初始化数据帧。
initialSettings := []http2Setting{
{ID: http2SettingEnablePush, Val: 0},
{ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},
}
if max := t.maxHeaderListSize(); max != 0 {
initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})
}
cc.bw.Write(http2clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)
cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)
cc.bw.Flush()client向server发送的开场白内容如下:
const (
// client首先想server发送以PRI开头的一串字符串。
http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
)
var (
http2clientPreface = []byte(http2ClientPreface)
)SETTINGShttp2SettingEnablePush: 告知server客户端是否开启push功能。
http2transportDefaultStreamFlowhttp2transportDefaultConnFlowhttp2transportDefaultConnFlow + http2initialWindowSize5、开启读循环并返回
go cc.readLoop()
(*http2Transport).RoundTrip
(*http2Transport).RoundTrip只是一个入口函数,它会调用(*http2Transport). RoundTripOpt方法。
(*http2Transport). RoundTripOpt有两个步骤比较关键:
t.connPool().GetClientConn(req, addr)http2noDialClientConnPoolhttp2configureTransportcc.roundTrip(req)(http2noDialClientConnPool).GetClientConn
(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)false在(*http2clientConnPool).getClientConn中会遍历同地址的连接,并判断连接的状态从而获取一个可以处理请求的连接。
for _, cc := range p.conns[addr] {
if st := cc.idleState(); st.canTakeNewRequest {
if p.shouldTraceGetConn(st) {
http2traceGetConn(req, addr)
}
p.mu.Unlock()
return cc, nil
}
}cc.idleState()1、当前连接是否能被多个请求共享,如果仅单个请求使用且已经有一个数据流,则当前连接不能处理新的请求。
if cc.singleUse && cc.nextStreamID > 1 {
return
}2、以下几点均为true时,才代表当前连接能够处理新的请求:
maxConcurrentStreamscc.tooIdleLocked()st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 && !cc.tooIdleLocked()
(*http2ClientConn).roundTrip(*http2ClientConn).roundTrip
1、在真正开始处理请求前,还要进行header检查,http2对http1.1的某些header是不支持的,笔者就不对这个逻辑进行分析了,直接上源码:
func http2checkConnHeaders(req *Request) error {
if v := req.Header.Get("Upgrade"); v != "" {
return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
}
if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
}
if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {
return fmt.Errorf("http2: invalid Connection request header: %q", vv)
}
return nil
}
func http2commaSeparatedTrailers(req *Request) (string, error) {
keys := make([]string, 0, len(req.Trailer))
for k := range req.Trailer {
k = CanonicalHeaderKey(k)
switch k {
case "Transfer-Encoding", "Trailer", "Content-Length":
return "", &http2badStringError{"invalid Trailer key", k}
}
keys = append(keys, k)
}
if len(keys) > 0 {
sort.Strings(keys)
return strings.Join(keys, ","), nil
}
return "", nil
}(*http2ClientConn).awaitOpenSlotForRequestmaxConcurrentStreams2.1、double check当前连接可用。
if cc.closed || !cc.canTakeNewRequestLocked() {
if waitingForConn != nil {
close(waitingForConn)
}
return http2errClientConnUnusable
}maxConcurrentStreamsif int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
if waitingForConn != nil {
close(waitingForConn)
}
return nil
}2.3、如果当前连接处理的数据流确实已经达到上限,则开始进入等待流程。
if waitingForConn == nil {
waitingForConn = make(chan struct{})
go func() {
if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
cc.mu.Lock()
waitingForConnErr = err
cc.cond.Broadcast()
cc.mu.Unlock()
}
}()
}
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--通过上面的逻辑知道,当前连接处理的数据流达到上限后有两种情况,一是等待请求被取消,二是等待其他请求结束。如果有其他数据流结束并唤醒当前等待的请求,则重复2.1、2.2和2.3的步骤。
cc.newStream()awaitOpenSlotForRequestfunc (cc *http2ClientConn) newStream() *http2clientStream {
cs := &http2clientStream{
cc: cc,
ID: cc.nextStreamID,
resc: make(chan http2resAndError, 1),
peerReset: make(chan struct{}),
done: make(chan struct{}),
}
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
cs.inflow.add(http2transportDefaultStreamFlow)
cs.inflow.setConnFlow(&cc.inflow)
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
return cs
}笔者对上述代码简单描述如下:
http2clientStreamcc.nextStreamIDcc.nextStreamID +=2http2resAndErrorcc.initialWindowSizehttp2transportDefaultStreamFlowcc.t.getBodyWriterState(cs, body)http2bodyWriterStatefunc (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {
s.cs = cs
if body == nil {
return
}
resc := make(chan error, 1)
s.resc = resc
s.fn = func() {
cs.cc.mu.Lock()
cs.startedWrite = true
cs.cc.mu.Unlock()
resc <- cs.writeRequestBody(body, cs.req.Body)
}
s.delay = t.expectContinueTimeout()
if s.delay == 0 ||
!httpguts.HeaderValuesContainsToken(
cs.req.Header["Expect"],
"100-continue") {
return
}
// 此处省略代码,因为绝大部分请求都不会设置100-continue的标头
return
}s.fns.rescwriteRequestBody5、因为是多个请求共享一个连接,那么向连接写入数据帧时需要加锁,比如加锁写入请求头。
cc.wmu.Lock() endStream := !hasBody && !hasTrailers werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) cc.wmu.Unlock()
6、如果有请求body,则开始写入请求body,没有请求body则设置响应header的超时时间(有请求body时,响应header的超时时间需要在请求body写完之后设置)。
if hasBody {
bodyWriter.scheduleBodyWrite()
} else {
http2traceWroteRequest(cs.trace, nil)
if d := cc.responseHeaderTimeout(); d != 0 {
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
}
}scheduleBodyWritefunc (s http2bodyWriterState) scheduleBodyWrite() {
if s.timer == nil {
// We're not doing a delayed write (see
// getBodyWriterState), so just start the writing
// goroutine immediately.
go s.fn()
return
}
http2traceWait100Continue(s.cs.trace)
if s.timer.Stop() {
s.timer.Reset(s.delay)
}
}100-continuegetBodyWriterStatescheduleBodyWrite7、轮询管道获取响应结果。
在看轮询源码之前,先看一个简单的函数:
handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
res := re.res
if re.err != nil || res.StatusCode > 299 {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
if re.err != nil {
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), re.err
}
res.Request = req
res.TLS = cc.tlsState
return res, false, nil
}(*http2ClientConn).roundTriphandleReadLoopResponsefor {
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
// 此处省略代码(包含请求取消,请求超时等管道的轮询)
case err := <-bodyWriter.resc:
// Prefer the read loop's response, if available. Issue 16102.
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
default:
}
if err != nil {
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), err
}
bodyWritten = true
if d := cc.responseHeaderTimeout(); d != 0 {
timer := time.NewTimer(d)
defer timer.Stop()
respHeaderTimer = timer.C
}
}
}笔者仅对上面的第二种情况即请求body发送完成进行描述:
能否读到响应,如果能够读取响应则直接返回。
判断请求body是否发送成功,如果发送失败,直接返回。
如果请求body发送成功,则设置响应header的超时时间。
总结
本文主要描述了两个方面的内容:
确认client和server都支持http2协议,并构建一个http2的连接,同时开启该连接的读循环。
通过http2连接池获取一个http2连接,并发送请求和读取响应。