go http请求转发
1.说明
-
日常开发中会遇到需要将请求转发到其它服务器的需求:
- 1.如果是前端进行转发,需要解决跨域的问题;
- 2.后端转发到目标服务器,并返回数据到client;
我们只讨论后端如何处理转发。
2. 原理
-
转发需要对数据流解决的问题:
- 1.向目标服务器发送请求,并获取数据
- 2.将数据返回到client
- 3.对于client整个过程透明,不被感知到请求被转发
3. 方案
package proxy
import (
"io"
"log"
"net"
"sync"
"sync/atomic"
"time"
)
/**
封装代理服务,对于http连接反馈又超时处理,注意超时问题
*/
var pool = make(chan net.Conn, 100)
type conn struct {
conn net.Conn
wg *sync.WaitGroup
lock sync.Mutex
state int32
}
const (
maybeValid = iota
isValid
isInvalid
isInPool
isClosed
)
type timeoutErr interface {
Timeout() bool
}
func isTimeoutError(err error) bool {
timeoutErr, _ := err.(timeoutErr)
if timeoutErr == nil {
return false
}
return timeoutErr.Timeout()
}
func (cn *conn) Read(b []byte) (n int, err error) {
n, err = cn.conn.Read(b)
if err != nil {
if !isTimeoutError(err) {
atomic.StoreInt32(&cn.state, isInvalid)
}
} else {
atomic.StoreInt32(&cn.state, isValid)
}
return
}
func (cn *conn) Write(b []byte) (n int, err error) {
n, err = cn.conn.Write(b)
if err != nil {
if !isTimeoutError(err) {
atomic.StoreInt32(&cn.state, isInvalid)
}
} else {
atomic.StoreInt32(&cn.state, isValid)
}
return
}
func (cn *conn) Close() error {
atomic.StoreInt32(&cn.state, isClosed)
return cn.conn.Close()
}
func getConn() (*conn, error) {
var cn net.Conn
var err error
select {
case cn = <-pool:
//service.Logger.Info().Msg("get conn from pool")
default:
cn, err = net.Dial("tcp", "127.0.0.1:8090")
//service.Logger.Info().Msg("get conn by new")
}
if err != nil {
service.Logger.Error().Err(err).Msgf("dial to dest %s failed ", "127.0.0.1:8090")
return nil, err
}
return &conn{
conn: cn,
wg: &sync.WaitGroup{},
state: maybeValid,
}, nil
}
func release(cn *conn) error {
state := atomic.LoadInt32(&cn.state)
switch state {
case isInPool, isClosed:
return nil
case isInvalid:
return cn.conn.Close()
}
cn.lock.Lock()
defer cn.lock.Unlock()
select {
case pool <- cn.conn:
//service.Logger.Info().Msgf("%d %d put conn to pool",os.Getpid(),os.Getppid())
atomic.StoreInt32(&cn.state, isInPool)
return nil
default:
return cn.Close()
}
}
func Handle(conn net.Conn) {
if conn == nil {
return
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(time.Millisecond * 100)) //设置读写超时
client, err := getConn()
if err != nil {
return
}
defer release(client)
client.conn.SetDeadline(time.Now().Add(time.Millisecond * 100)) //设置读写超时
client.wg.Add(2)
//进行转发
go func() {
if _, err := io.Copy(client, conn); err != nil {
service.Logger.Err(err).Msg("copy data to svr")
}
client.wg.Done()
}()
go func() {
if _, err := io.Copy(conn, client); err != nil {
service.Logger.Err(err).Msg("copy data to conn")
}
client.wg.Done()
}()
client.wg.Wait()
}
func StartProxySvr() <-chan struct{} {
exit := make(chan struct{}, 1)
proxy_server, err := net.Listen("tcp", "8889")
if err != nil {
log.Printf("proxy server listen error: %v\n", err)
exit <- struct{}{}
return exit
}
for {
conn, err := proxy_server.Accept()
if err != nil {
log.Printf("proxy server accept error: %v\n", err)
exit <- struct{}{}
return exit
}
go Handle(conn)
}
}
type Client struct {
// Transport specifies the mechanism by which individual
// HTTP requests are made.
// If nil, DefaultTransport is used.
Transport RoundTripper
// CheckRedirect specifies the policy for handling redirects.
// If CheckRedirect is not nil, the client calls it before
// following an HTTP redirect. The arguments req and via are
// the upcoming request and the requests made already, oldest
// first. If CheckRedirect returns an error, the Client's Get
// method returns both the previous Response (with its Body
// closed) and CheckRedirect's error (wrapped in a url.Error)
// instead of issuing the Request req.
// As a special case, if CheckRedirect returns ErrUseLastResponse,
// then the most recent response is returned with its body
// unclosed, along with a nil error.
//
// If CheckRedirect is nil, the Client uses its default policy,
// which is to stop after 10 consecutive requests.
CheckRedirect func(req *Request, via []*Request) error
// Jar specifies the cookie jar.
//
// The Jar is used to insert relevant cookies into every
// outbound Request and is updated with the cookie values
// of every inbound Response. The Jar is consulted for every
// redirect that the Client follows.
//
// If Jar is nil, cookies are only sent if they are explicitly
// set on the Request.
Jar CookieJar
// Timeout specifies a time limit for requests made by this
// Client. The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
//
// A Timeout of zero means no timeout.
//
// The Client cancels requests to the underlying Transport
// as if the Request's Context ended.
//
// For compatibility, the Client will also use the deprecated
// CancelRequest method on Transport if found. New
// RoundTripper implementations should use the Request's Context
// for cancelation instead of implementing CancelRequest.
Timeout time.Duration
}
//中间件样例
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
proxy := func(_ *http.Request) (*url.URL, error) {
return url.Parse("target ip:port")//127.0.0.1:8099
}
transport := &http.Transport{
Proxy: proxy,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxIdleConnsPerHost: 100,
}
client := &http.Client{Transport: transport}
url := "http://" + r.RemoteAddr + r.RequestURI
req, err := http.NewRequest(r.Method, url, r.Body)
//注: 设置Request头部信息
for k, v := range r.Header {
for _, vv := range v {
req.Header.Add(k, vv)
}
}
resp, err := client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
//注: 设置Response头部信息
for k, v := range resp.Header {
for _, vv := range v {
w.Header().Add(k, vv)
}
}
data, _ := ioutil.ReadAll(resp.Body)
w.Write(data)
})
结束
本文是个人对工作中遇到的问题的总结,不够全面和深入还请多多指教。谢谢!