由于发文章时涉及到一些 m g 词汇,所以反复删除了原有的一些理解,这才能发表成功。

其中,一些词也纯拼音表示了,比如 dai li

ReverseProxynet.http.httputil

功能点:

  • 支持自定义修改响应内容
  • 支持连接池
  • 支持错误信息自定义处理
  • 支持 websocket 服务
  • 支持自定义负载均衡
  • 支持 https dai li
  • 支持 url 重写

源码分析

声明:go.14 版本

核心结构体

type ReverseProxy struct {
	Director func(*http.Request)
	Transport http.RoundTripper
	FlushInterval time.Duration
	ErrorLog *log.Logger
	BufferPool BufferPool
	ModifyResponse func(*http.Response) error
	ErrorHandler func(http.ResponseWriter, *http.Request, error)
}

字段解释

字段 解释
Director 控制器 是一个函数,函数内容可以对请求进行修改
Transport 连接池,如果为 nil,则使用 http.DefaultTransport
FlushInterval 刷新内容到客户端的时间间隔
ErrorLog 错误记录器
BufferPool 缓冲池,在复制 http 响应时使用,用以提高请求效率
ModifyResponse 可自定义修改响应的函数
ErrorHandler 错误处理回调函数,如果为 nil,则遇到错误会显示 502

核心方法

// 根据目的URL对象,返回一个新的 ReverseProxy
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {}

// 核心中的核心,实现了Handler接口
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {}
ServeHTTP

声明;每个步骤的代码都是单独列出,是连续的,中间没有删除和增加任何代码,和源码保持一致,除了一些注释信息

step 1 设置连接池

如果没有配置,则使用 http 的默认连接池

transport := p.Transport
if transport == nil {
	transport = http.DefaultTransport
}

step 2 验证请求是否终止

rw.(http.CloseNotifier)
cancel()
http.CloseNotifierCloseNotify() <-chan bool
ctx := req.Context()
if cn, ok := rw.(http.CloseNotifier); ok {
	var cancel context.CancelFunc
	ctx, cancel = context.WithCancel(ctx)
	defer cancel()
	notifyChan := cn.CloseNotify()
	go func() {
		select {
		case <-notifyChan:
			cancel()
		case <-ctx.Done():
		}
	}()
}

step 3 拷贝上下文信息,并赋值给对外请求的 request

outreq := req.Clone(ctx)
if req.ContentLength == 0 {
	outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}

step 4 如果上下文中 header 为 nil,则使用 http 的 header 给该 ctx

if outreq.Header == nil {
	outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}

step 5 将拷贝后的 outreq 传递给控制器, 进行自定义修改。

DirectorReverseProxyNewSingleHostReverseProxy
outreq.Close = falseClosefalse
p.Director(outreq)
outreq.Close = false

step 6 Upgrade 头特殊处理

UpgradeConnectionhttp.header['Connection']Connection

具体 hop-by-hop 头信息 见 RFC 7230, section 6.1

reqUpType := upgradeType(outreq.Header)
removeConnectionHeaders(outreq.Header)
Tetrailers

逐段消息头是客户端和第一层 dai li 之间的消息头,与是否往下传递的 header 信息没有联系,往下游传递的信息里不应该包含这些逐段消息头。

注意:删除的是 对外请求 outreq 的 header。 step 11 也有删除,但是是删除响应的 header

hopHeaders
for _, h := range hopHeaders {
	hv := outreq.Header.Get(h)
	if hv == "" {
		continue
	}
	if h == "Te" && hv == "trailers" {
		continue
	}
	outreq.Header.Del(h)
}

// 删除上面所有的 hop-by-hop 头后,添加回协议升级所需的所有内容
if reqUpType != "" {
	outreq.Header.Set("Connection", "Upgrade")
	outreq.Header.Set("Upgrade", reqUpType)
}

step 8 追加 chientIP 信息

其实就是设置 X-Forwarded-For,以逗号+空格分隔

if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
	if prior, ok := outreq.Header["X-Forwarded-For"]; ok {
		clientIP = strings.Join(prior, ", ") + ", " + clientIP
	}
	outreq.Header.Set("X-Forwarded-For", clientIP)
}

step 9 向下游请求数据

transport.RoundTrip()
res, err := transport.RoundTrip(outreq)
if err != nil {
	p.getErrorHandler()(rw, outreq, err)
	return
}

step 10 处理升级协议的请求

响应的状态码为101才考虑升级

// Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
if res.StatusCode == http.StatusSwitchingProtocols {
	if !p.modifyResponse(rw, res, outreq) {
		return
	}
	p.handleUpgradeResponse(rw, outreq, res)
	return
}

step 11 删除下游响应数据中一些无用的头部字段

ConnectionhopHeaders
removeConnectionHeaders(res.Header)

for _, h := range hopHeaders {
	res.Header.Del(h)
}

step 12 修改返回的内容

modifyResponseReverseProxyModifyResponse
if !p.modifyResponse(rw, res, outreq) {
	return
}

step 13 拷贝头部数据

rw.Header()ResponseWriter
res.HeaderroundTrip.(outreq)Response
copyHeader(rw.Header(), res.Header)

step 14 处理 Trailer 头部数据

遍历,拼接,然后将内容加到上游的头部的 Trailer 字段中

// The "Trailer" header isn't included in the Transport's response,
// at least for *http.Transport. Build it up from Trailer.
announcedTrailers := len(res.Trailer)
if announcedTrailers > 0 {
	trailerKeys := make([]string, 0, len(res.Trailer))
	for k := range res.Trailer {
		trailerKeys = append(trailerKeys, k)
	}
	rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}

step 15 写入状态码

将下游响应的状态码写入上游(客户端)的状态码中

rw.WriteHeader(res.StatusCode)

step 16 周期性刷新内容到上游 response 中

p.flushInterval(req, res)ReverseProxyFlushInterval
err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
if err != nil {
	defer res.Body.Close()
	// Since we're streaming the response, if we run into an error all we can do
	// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
	// on read error while copying body.
	if !shouldPanicOnCopyError(req) {
		p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
		return
	}
	panic(http.ErrAbortHandler)
}
res.Body.Close() // close now, instead of defer, to populate res.Trailer

step 17 处理 Trailer

Trailerres.Trailer
if len(res.Trailer) > 0 {
	// Force chunking if we saw a response trailer.
	// This prevents net/http from calculating the length for short
	// bodies and adding a Content-Length.
	if fl, ok := rw.(http.Flusher); ok {
		fl.Flush() // 刷新到上游的数据中心
	}
}

// step 14 中的 announcedTrailers
if len(res.Trailer) == announcedTrailers {
	copyHeader(rw.Header(), res.Trailer)
	return
}
// 读取Trailer中的头部信息,并将其设置到上游
for k, vv := range res.Trailer {
	k = http.TrailerPrefix + k
	for _, v := range vv {
		rw.Header().Add(k, v)
	}
}