一、sync.Pool 基本使用

https://golang.org/pkg/sync/
sync.Pool 的使用非常简单,它具有以下几个特点:

  • sync.Pool 设计目的是存放已经分配但暂时不用的对象,供以后使用,以减轻 gc 的代价,提高效率
  • 存储在 Pool 中的对象会随时被 gc 自动回收,Pool 中对象的缓存期限为两次 gc 之间
  • 用户无法定义 sync.Pool 的大小,其大小仅仅受限于内存的大小
  • sync.Pool 支持多协程之间共享

sync.Pool 的使用非常简单,定义一个 Pool 对象池时,需要提供一个 New 函数,表示当池中没有对象时,如何生成对象。对象池 Pool 提供 Get 和 Put 函数从 Pool 中取和存放对象。

下面有一个简单的实例,直接运行是会打印两次 “new an object”,注释掉 runtime.GC(),发现只会调用一次 New 函数,表示实现了对象重用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"runtime"
"sync"
)

func main() {
p := &sync.Pool{
New: func() interface{} {
fmt.Println("new an object")
return 0
},
}

a := p.Get().(int)
a = 100
p.Put(a)
runtime.GC()
b := p.Get().(int)
fmt.Println(a, b)
}

二、sync.Pool 如何支持多协程共享?

sync.Pool 支持多协程共享,为了尽量减少竞争和加锁的操作,golang 在设计的时候为每个 P(核)都分配了一个子池,每个子池包含一个私有对象和共享列表。 私有对象只有对应的和核 P 能够访问,而共享列表是与其它 P 共享的。

在 golang 的 GMP 调度模型中,我们知道协程 G 最终会被调度到某个固定的核 P 上。当一个协程在执行 Pool 的 get 或者 put 方法时,首先对改核 P 上的子池进行操作,然后对其它核的子池进行操作。因为一个 P 同一时间只能执行一个 goroutine,所以对私有对象存取操作是不需要加锁的,而共享列表是和其他 P 分享的,因此需要加锁操作。

一个协程希望从某个 Pool 中获取对象,它包含以下几个步骤:

  1. 判断协程所在的核 P 中的私有对象是否为空,如果非常则返回,并将改核 P 的私有对象置为空
  2. 如果协程所在的核 P 中的私有对象为空,就去改核 P 的共享列表中获取对象(需要加锁)
  3. 如果协程所在的核 P 中的共享列表为空,就去其它核的共享列表中获取对象(需要加锁)
  4. 如果所有的核的共享列表都为空,就会通过 New 函数产生一个新的对象

在 sync.Pool 的源码中,每个核 P 的子池的结构如下所示:

// Local per-P Pool appendix.
type poolLocalInternal struct {
    private interface{}   // Can be used only by the respective P.
    shared  []interface{} // Can be used by any P.
    Mutex                 // Protects shared.
}

更加细致的 sync.Pool 源码分析,可参考http://jack-nie.github.io/go/golang-sync-pool.html

三、为什么不使用 sync.pool 实现连接池?

刚开始接触到 sync.pool 时,很容易让人联想到连接池的概念,但是经过仔细分析后发现 sync.pool 并不是适合作为连接池,主要有以下两个原因:

  • 连接池的大小通常是固定且受限制的,而 sync.Pool 是无法控制缓存对象的数量,只受限于内存大小,不符合连接池的目标
  • sync.Pool 对象缓存的期限在两次 gc 之间,这点也和连接池非常不符合

golang 中连接池通常利用 channel 的缓存特性实现。当需要连接时,从 channel 中获取,如果池中没有连接时,将阻塞或者新建连接,新建连接的数量不能超过某个限制。

https://github.com/goctx/generic-pool基于 channel 提供了一个通用连接池的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package pool

import (
"errors"
"io"
"sync"
"time"
)

var (
ErrInvalidConfig = errors.New("invalid pool config")
ErrPoolClosed = errors.New("pool closed")
)

type Poolable interface {
io.Closer
GetActiveTime() time.Time
}

type factory func() (Poolable, error)

type Pool interface {
Acquire() (Poolable, error) // 获取资源
Release(Poolable) error // 释放资源
Close(Poolable) error // 关闭资源
Shutdown() error // 关闭池
}

type GenericPool struct {
sync.Mutex
pool chan Poolable
maxOpen int // 池中最大资源数
numOpen int // 当前池中资源数
minOpen int // 池中最少资源数
closed bool // 池是否已关闭
maxLifetime time.Duration
factory factory // 创建连接的方法
}

func NewGenericPool(minOpen, maxOpen int, maxLifetime time.Duration, factory factory) (*GenericPool, error) {
if maxOpen <= 0 || minOpen > maxOpen {
return nil, ErrInvalidConfig
}
p := &GenericPool{
maxOpen: maxOpen,
minOpen: minOpen,
maxLifetime: maxLifetime,
factory: factory,
pool: make(chan Poolable, maxOpen),
}

for i := 0; i < minOpen; i++ {
closer, err := factory()
if err != nil {
continue
}
p.numOpen++
p.pool <- closer
}
return p, nil
}

func (p *GenericPool) Acquire() (Poolable, error) {
if p.closed {
return nil, ErrPoolClosed
}
for {
closer, err := p.getOrCreate()
if err != nil {
return nil, err
}
// 如果设置了超时且当前连接的活跃时间+超时时间早于现在,则当前连接已过期
if p.maxLifetime > 0 && closer.GetActiveTime().Add(time.Duration(p.maxLifetime)).Before(time.Now()) {
p.Close(closer)
continue
}
return closer, nil
}
}

func (p *GenericPool) getOrCreate() (Poolable, error) {
select {
case closer := <-p.pool:
return closer, nil
default:
}
p.Lock()
if p.numOpen >= p.maxOpen {
closer := <-p.pool
p.Unlock()
return closer, nil
}
// 新建连接
closer, err := p.factory()
if err != nil {
p.Unlock()
return nil, err
}
p.numOpen++
p.Unlock()
return closer, nil
}

// 释放单个资源到连接池
func (p *GenericPool) Release(closer Poolable) error {
if p.closed {
return ErrPoolClosed
}
p.Lock()
p.pool <- closer
p.Unlock()
return nil
}

// 关闭单个资源
func (p *GenericPool) Close(closer Poolable) error {
p.Lock()
closer.Close()
p.numOpen--
p.Unlock()
return nil
}

// 关闭连接池,释放所有资源
func (p *GenericPool) Shutdown() error {
if p.closed {
return ErrPoolClosed
}
p.Lock()
close(p.pool)
for closer := range p.pool {
closer.Close()
p.numOpen--
}
p.closed = true
p.Unlock()
return nil
}

参考:
[1].https://blog.csdn.net/yongjian_lian/article/details/42058893
[2].https://segmentfault.com/a/1190000013089363
[3].http://jack-nie.github.io/go/golang-sync-pool.html