由于发文章时涉及到一些 m g 词汇,所以反复删除了原有的一些理解,这才能发表成功。
其中,一些词也纯拼音表示了,比如 dai li
ReverseProxynet.http.httputil
功能点:
- 支持自定义修改响应内容
- 支持连接池
- 支持错误信息自定义处理
- 支持 websocket 服务
- 支持自定义负载均衡
- 支持 https dai li
- 支持 url 重写
源码分析
声明:go.14 版本
核心结构体
type ReverseProxy struct {
Director func(*http.Request)
Transport http.RoundTripper
FlushInterval time.Duration
ErrorLog *log.Logger
BufferPool BufferPool
ModifyResponse func(*http.Response) error
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
字段解释
字段 | 解释 |
---|---|
Director | 控制器 是一个函数,函数内容可以对请求进行修改 |
Transport | 连接池,如果为 nil,则使用 http.DefaultTransport |
FlushInterval | 刷新内容到客户端的时间间隔 |
ErrorLog | 错误记录器 |
BufferPool | 缓冲池,在复制 http 响应时使用,用以提高请求效率 |
ModifyResponse | 可自定义修改响应的函数 |
ErrorHandler | 错误处理回调函数,如果为 nil,则遇到错误会显示 502 |
核心方法
// 根据目的URL对象,返回一个新的 ReverseProxy
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {}
// 核心中的核心,实现了Handler接口
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {}
ServeHTTP
声明;每个步骤的代码都是单独列出,是连续的,中间没有删除和增加任何代码,和源码保持一致,除了一些注释信息
step 1 设置连接池
如果没有配置,则使用 http 的默认连接池
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}
step 2 验证请求是否终止
rw.(http.CloseNotifier)
cancel()
http.CloseNotifierCloseNotify() <-chan bool
ctx := req.Context()
if cn, ok := rw.(http.CloseNotifier); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel()
case <-ctx.Done():
}
}()
}
step 3 拷贝上下文信息,并赋值给对外请求的 request
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}
step 4 如果上下文中 header 为 nil,则使用 http 的 header 给该 ctx
if outreq.Header == nil {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
step 5 将拷贝后的 outreq 传递给控制器, 进行自定义修改。
DirectorReverseProxyNewSingleHostReverseProxy
outreq.Close = falseClosefalse
p.Director(outreq)
outreq.Close = false
step 6 Upgrade 头特殊处理
UpgradeConnectionhttp.header['Connection']Connection
具体 hop-by-hop 头信息 见 RFC 7230, section 6.1
reqUpType := upgradeType(outreq.Header)
removeConnectionHeaders(outreq.Header)
Tetrailers
逐段消息头是客户端和第一层 dai li 之间的消息头,与是否往下传递的 header 信息没有联系,往下游传递的信息里不应该包含这些逐段消息头。
注意:删除的是 对外请求 outreq 的 header。 step 11 也有删除,但是是删除响应的 header
hopHeaders
for _, h := range hopHeaders {
hv := outreq.Header.Get(h)
if hv == "" {
continue
}
if h == "Te" && hv == "trailers" {
continue
}
outreq.Header.Del(h)
}
// 删除上面所有的 hop-by-hop 头后,添加回协议升级所需的所有内容
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
step 8 追加 chientIP 信息
其实就是设置 X-Forwarded-For,以逗号+空格分隔
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
if prior, ok := outreq.Header["X-Forwarded-For"]; ok {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
outreq.Header.Set("X-Forwarded-For", clientIP)
}
step 9 向下游请求数据
transport.RoundTrip()
res, err := transport.RoundTrip(outreq)
if err != nil {
p.getErrorHandler()(rw, outreq, err)
return
}
step 10 处理升级协议的请求
响应的状态码为101才考虑升级
// Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
step 11 删除下游响应数据中一些无用的头部字段
ConnectionhopHeaders
removeConnectionHeaders(res.Header)
for _, h := range hopHeaders {
res.Header.Del(h)
}
step 12 修改返回的内容
modifyResponseReverseProxyModifyResponse
if !p.modifyResponse(rw, res, outreq) {
return
}
step 13 拷贝头部数据
rw.Header()ResponseWriter
res.HeaderroundTrip.(outreq)Response
copyHeader(rw.Header(), res.Header)
step 14 处理 Trailer 头部数据
遍历,拼接,然后将内容加到上游的头部的 Trailer 字段中
// The "Trailer" header isn't included in the Transport's response,
// at least for *http.Transport. Build it up from Trailer.
announcedTrailers := len(res.Trailer)
if announcedTrailers > 0 {
trailerKeys := make([]string, 0, len(res.Trailer))
for k := range res.Trailer {
trailerKeys = append(trailerKeys, k)
}
rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}
step 15 写入状态码
将下游响应的状态码写入上游(客户端)的状态码中
rw.WriteHeader(res.StatusCode)
step 16 周期性刷新内容到上游 response 中
p.flushInterval(req, res)ReverseProxyFlushInterval
err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
if err != nil {
defer res.Body.Close()
// Since we're streaming the response, if we run into an error all we can do
// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
// on read error while copying body.
if !shouldPanicOnCopyError(req) {
p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
return
}
panic(http.ErrAbortHandler)
}
res.Body.Close() // close now, instead of defer, to populate res.Trailer
step 17 处理 Trailer
Trailerres.Trailer
if len(res.Trailer) > 0 {
// Force chunking if we saw a response trailer.
// This prevents net/http from calculating the length for short
// bodies and adding a Content-Length.
if fl, ok := rw.(http.Flusher); ok {
fl.Flush() // 刷新到上游的数据中心
}
}
// step 14 中的 announcedTrailers
if len(res.Trailer) == announcedTrailers {
copyHeader(rw.Header(), res.Trailer)
return
}
// 读取Trailer中的头部信息,并将其设置到上游
for k, vv := range res.Trailer {
k = http.TrailerPrefix + k
for _, v := range vv {
rw.Header().Add(k, v)
}
}