go http请求转发

1.说明

  • 日常开发中会遇到需要将请求转发到其它服务器的需求:

    • 1.如果是前端进行转发,需要解决跨域的问题;
    • 2.后端转发到目标服务器,并返回数据到client;

我们只讨论后端如何处理转发。

2. 原理

  • 转发需要对数据流解决的问题:

    • 1.向目标服务器发送请求,并获取数据
    • 2.将数据返回到client
    • 3.对于client整个过程透明,不被感知到请求被转发

3. 方案

 package proxy
 
 import (
     "io"
     "log"
     "net"
     "sync"
     "sync/atomic"
     "time"
 )
 
 /**
 封装代理服务,对于http连接反馈又超时处理,注意超时问题
 */
 
 var pool = make(chan net.Conn, 100)
 
 type conn struct {
     conn  net.Conn
     wg    *sync.WaitGroup
     lock  sync.Mutex
     state int32
 }
 
 const (
     maybeValid = iota
     isValid
     isInvalid
     isInPool
     isClosed
 )
 
 type timeoutErr interface {
     Timeout() bool
 }
 
 func isTimeoutError(err error) bool {
     timeoutErr, _ := err.(timeoutErr)
     if timeoutErr == nil {
         return false
     }
     return timeoutErr.Timeout()
 }
 
 func (cn *conn) Read(b []byte) (n int, err error) {
     n, err = cn.conn.Read(b)
     if err != nil {
         if !isTimeoutError(err) {
             atomic.StoreInt32(&cn.state, isInvalid)
         }
     } else {
         atomic.StoreInt32(&cn.state, isValid)
     }
     return
 }
 
 func (cn *conn) Write(b []byte) (n int, err error) {
     n, err = cn.conn.Write(b)
     if err != nil {
         if !isTimeoutError(err) {
             atomic.StoreInt32(&cn.state, isInvalid)
         }
     } else {
         atomic.StoreInt32(&cn.state, isValid)
     }
     return
 }
 
 func (cn *conn) Close() error {
     atomic.StoreInt32(&cn.state, isClosed)
     return cn.conn.Close()
 }
 
 func getConn() (*conn, error) {
     var cn net.Conn
     var err error
     select {
     case cn = <-pool:
         //service.Logger.Info().Msg("get conn from pool")
     default:
         cn, err = net.Dial("tcp", "127.0.0.1:8090")
         //service.Logger.Info().Msg("get conn by new")
     }
     if err != nil {
         service.Logger.Error().Err(err).Msgf("dial to dest %s failed ", "127.0.0.1:8090")
         return nil, err
     }
     return &conn{
         conn:  cn,
         wg:    &sync.WaitGroup{},
         state: maybeValid,
     }, nil
 }
 
 func release(cn *conn) error {
     state := atomic.LoadInt32(&cn.state)
     switch state {
     case isInPool, isClosed:
         return nil
     case isInvalid:
         return cn.conn.Close()
     }
     cn.lock.Lock()
     defer cn.lock.Unlock()
     select {
     case pool <- cn.conn:
         //service.Logger.Info().Msgf("%d  %d put conn to pool",os.Getpid(),os.Getppid())
         atomic.StoreInt32(&cn.state, isInPool)
         return nil
     default:
         return cn.Close()
     }
 }
 
 func Handle(conn net.Conn) {
     if conn == nil {
         return
     }
     defer conn.Close()
     conn.SetDeadline(time.Now().Add(time.Millisecond * 100))  //设置读写超时
     client, err := getConn()
     if err != nil {
         return
     }
 
     defer release(client)
     client.conn.SetDeadline(time.Now().Add(time.Millisecond * 100)) //设置读写超时
 
     client.wg.Add(2)
     //进行转发
     go func() {
         if _, err := io.Copy(client, conn); err != nil {
             service.Logger.Err(err).Msg("copy data to svr")
         }
         client.wg.Done()
     }()
     go func() {
         if _, err := io.Copy(conn, client); err != nil {
             service.Logger.Err(err).Msg("copy data to conn")
         }
         client.wg.Done()
     }()
 
     client.wg.Wait()
 }
 
 func StartProxySvr() <-chan struct{} {
     exit := make(chan struct{}, 1)
     proxy_server, err := net.Listen("tcp", "8889")
     if err != nil {
         log.Printf("proxy server listen error: %v\n", err)
         exit <- struct{}{}
         return exit
     }
 
     for {
         conn, err := proxy_server.Accept()
         if err != nil {
             log.Printf("proxy server accept error: %v\n", err)
             exit <- struct{}{}
             return exit
         }
         go Handle(conn)
     }
 }
 type Client struct {
     // Transport specifies the mechanism by which individual
     // HTTP requests are made.
     // If nil, DefaultTransport is used.
     Transport RoundTripper
 
     // CheckRedirect specifies the policy for handling redirects.
     // If CheckRedirect is not nil, the client calls it before
     // following an HTTP redirect. The arguments req and via are
     // the upcoming request and the requests made already, oldest
     // first. If CheckRedirect returns an error, the Client's Get
     // method returns both the previous Response (with its Body
     // closed) and CheckRedirect's error (wrapped in a url.Error)
     // instead of issuing the Request req.
     // As a special case, if CheckRedirect returns ErrUseLastResponse,
     // then the most recent response is returned with its body
     // unclosed, along with a nil error.
     //
     // If CheckRedirect is nil, the Client uses its default policy,
     // which is to stop after 10 consecutive requests.
     CheckRedirect func(req *Request, via []*Request) error
 
     // Jar specifies the cookie jar.
     //
     // The Jar is used to insert relevant cookies into every
     // outbound Request and is updated with the cookie values
     // of every inbound Response. The Jar is consulted for every
     // redirect that the Client follows.
     //
     // If Jar is nil, cookies are only sent if they are explicitly
     // set on the Request.
     Jar CookieJar
 
     // Timeout specifies a time limit for requests made by this
     // Client. The timeout includes connection time, any
     // redirects, and reading the response body. The timer remains
     // running after Get, Head, Post, or Do return and will
     // interrupt reading of the Response.Body.
     //
     // A Timeout of zero means no timeout.
     //
     // The Client cancels requests to the underlying Transport
     // as if the Request's Context ended.
     //
     // For compatibility, the Client will also use the deprecated
     // CancelRequest method on Transport if found. New
     // RoundTripper implementations should use the Request's Context
     // for cancelation instead of implementing CancelRequest.
     Timeout time.Duration
 }
 
 //中间件样例
     http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
             proxy := func(_ *http.Request) (*url.URL, error) {
                 return url.Parse("target ip:port")//127.0.0.1:8099
             }
             transport := &http.Transport{
                 Proxy: proxy,
                 DialContext: (&net.Dialer{
                     Timeout:   30 * time.Second,
                     KeepAlive: 30 * time.Second,
                     DualStack: true,
                 }).DialContext,
                 MaxIdleConns:          100,
                 IdleConnTimeout:       90 * time.Second,
                 TLSHandshakeTimeout:   10 * time.Second,
                 ExpectContinueTimeout: 1 * time.Second,
                 MaxIdleConnsPerHost:   100,
             }
 
             client := &http.Client{Transport: transport}
             url := "http://" + r.RemoteAddr + r.RequestURI
             req, err := http.NewRequest(r.Method, url, r.Body)
             //注: 设置Request头部信息
             for k, v := range r.Header {
                 for _, vv := range v {
                     req.Header.Add(k, vv)
                 }
             }
 
             resp, err := client.Do(req)
             if err != nil {
                 return
             }
             defer resp.Body.Close()
             //注: 设置Response头部信息
             for k, v := range resp.Header {
                 for _, vv := range v {
                     w.Header().Add(k, vv)
                 }
             }
             data, _ := ioutil.ReadAll(resp.Body)
             w.Write(data)
 
     })

结束

本文是个人对工作中遇到的问题的总结,不够全面和深入还请多多指教。谢谢!