redigo连接池的源码分析
今天我们来看一看redigo(github.com/gomodule/re…)是如何实现连接池的。
概述
redis/pool.go
Pool
Dial func() (Conn, error)MaxIdle intMaxActive intIdleTimeout time.DurationGet() Conn
idleList*poolConnpushFront()popFront()popBack()
type idleList struct {
count int
front, back *poolConn
}
实现连接池时,需要考虑以下几个问题
-
何时新建连接?
- 若新建连接时发现已创建的连接数到达连接池的容量上限,该如何处理?
-
如何回收空闲时间过长的连接?
-
如何确保连接池中的连接依然存活?
func (p *Pool) Get() Connfunc (ac *activeConn) Close()
问题1:如何回收空闲时间过长的连接?
func (p *Pool) Get() Conn
func (p *Pool) Get() Conn {
// GetContext returns errorConn in the first argument when an error occurs.
c, _ := p.GetContext(context.Background())
return c
}
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// Wait until there is a vacant connection in the pool.
waited, err := p.waitVacantConn(ctx)
if err != nil {
return errorConn{err}, err
}
// ...
Get()activeConnerrorConnConn
nilGet()Do()
Get()GetContext()waitVacantConn()waitVacantConn()p.Wait == falsep.Wait == true
func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {
if !p.Wait || p.MaxActive 0 {
// ...
}
// Prune stale connections at the back of the idle list.
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back // ①
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}
这部分代码回答了有关连接池的一个问题——如何回收空闲时间过长的连接?
p.idle.back*poolConnttp.IdleTimeoutnowFunc()p.idle
p.active--p.mu.Lock()
p.mu.Lock()
// for ...
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
// ...
// }
问题2:如何确保连接池中的连接依然存活?
p.mu
p.mu.Lock()
for p.idle.front != nil {
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
// return an `activeConn` or check next idle connection
// ...
}
activeConn
type activeConn struct {
p *Pool
pc *poolConn
state int
}
之所以要确保空闲连接依然存活,是因为空闲连接虽然存在,但可能已经是失效的连接了。那么什么时候会出现这种情况呢?
timeout0
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0
MaxIdle
$ fgrep timeout -B2 /usr/local/etc/redis.conf
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 5
--
$ # 重启redis
$ # brew services restart redis
func main() {
pool := &redis.Pool{
MaxActive: 1,
MaxIdle: 1,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", "127.0.0.1:6379")
},
}
c := pool.Get()
reply, err := c.Do("PING")
if err != nil {
fmt.Println(reply, err)
}
c.Close() // return to pool
time.Sleep(20 * time.Second)
c = pool.Get()
reply, err = c.Do("PING")
if err != nil {
fmt.Println(reply, err) // EOF
}
}
c.Do("PING")
PINGtime.Sleep(20 * time.Second)PINGRST
这就是空闲连接虽然存在,但已经失效的情况。
timeoutIdleTimeout time.DurationTestOnBorrow
// pool := &redis.Pool{
// // Other pool configuration not shown in this example.
// TestOnBorrow: func(c redis.Conn, t time.Time) error {
// if time.Since(t) < time.Minute {
// return nil
// }
// _, err := c.Do("PING")
// return err
// },
// }
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
// ...
return &activeConn{p: p, pc: pc}, nil
}
pc.c.Close() // ①
p.mu.Lock()
p.active--
p.TestOnBorrow
问题3:新建连接的问题
如果空闲连接的链表为空,或者链表中没有存活着的可用连接,就不得不新建连接了。
dial()
p.mu.Lock()
// ...
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
// ...
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
dial()Dial
func (p *Pool) dial(ctx context.Context) (Conn, error) {
// ...
if p.Dial != nil {
return p.Dial()
}
// ...
}
但新建时需要考虑,当已创建的连接数已达到连接池的容量上限时要如何处理。
我们先来看redigo中最简单的一种处理方法,
// Handle limit for p.Wait == false.
if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
p.mu.Unlock()
return errorConn{ErrPoolExhausted}, ErrPoolExhausted
}
p.Wait == falsep.active >= p.MaxActivereturn errorConn{}
p.Wait == truePoolch
type Pool struct {
// ...
ch chan struct{} // limits open connections when p.Wait is true
让获取连接的goroutine进入等待状态。
select {
case
本文来自网络,不代表每日运维立场,如若转载,请注明出处:https://www.mryunwei.com/406895.html
题图来自Unsplash,基于CC0协议
内容观点仅代表作者本人,每日运维平台仅提供信息存储空间服务。
如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。