Golang sync包提供了基础的异步操作方法,包括互斥锁Mutex,执行一次Once和并发等待组WaitGroup。
本文主要介绍sync包提供的这些功能的基本使用方法。
- Mutex: 互斥锁
- RWMutex:读写锁
- WaitGroup:并发等待组
- Once:执行一次
- Cond:信号量
- Pool:临时对象池
- Map:自带锁的map
二. sync.Mutex
sync.Mutex互斥锁
互斥锁的概念: 对共享数据进行锁定,保证同一时刻只有能一个线程或者协程去操作。
注意: 互斥锁是多个线程或者协程一起去抢,抢到锁的线程或者协程去先执行,没抢到的就等待。等互斥锁使用完释放后,其他等待的线程或者协程去抢这个锁。
sync.MutexLockUnLock
func (m *Mutex) Lock()
func (m *Mutex) UnLock()
sync.Mutex初始值为UnLock状态,并且sync.Mutex常做为其它结构体的匿名变量使用。
举个例子: 我们经常使用网上支付购物东西,就会出现同一个银行账户在某一个时间既有支出也有收入,那银行就得保证我们余额准确,保证数据无误。
我们可以简单的实现银行的支出和收入来说明Mutex的使用。
type Bank struct {
sync.Mutex
balance map[string]float64
}
// In 收入
func (b *Bank) In(account string, value float64) {
// 加锁 保证同一时间只有一个协程能访问这段代码
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok {
b.balance[account] = 0.0
}
b.balance[account] += v
}
// Out 支出
func (b *Bank) Out(account string, value float64) error {
// 加锁 保证同一时间只有一个协程能访问这段代码
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
三. sync.RWMutex
sync.RWMutex称为读写锁是sync.Mutex的一种变种,RWMutex来自于计算机操作系统非常有名的读者写者问题。
sync.RWMutex目的是为了能够支持多个并发协程同时读取某一个资源,但只有一个并发协程能够更新资源。也就是说读和写是互斥的,写和写也是互斥的,读和读是不互斥的。
总结起来如下:
- 当有一个协程在读的时候,所有写的协程必须等到所有读的协程结束才可以获得锁进行写操作。
- 当有一个协程在读的时候,所有读的协程不受影响都可以进行读操作。
- 当有一个协程在写的时候,所有读、写的协程必须等到写的协程结束才可以获得锁进行读、写操作。
- RWMutex有5个函数,分别为读和写提供锁操作。
写操作
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
读操作
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
RLocker()能获取读锁,然后传递给其他协程使用。
func (rw *RWMutex) RLocker() Locker
sync.Mutexsync.RWMutex
type Bank struct {
sync.RWMutex
balance map[string]float64
}
func (b *Bank) In(account string, value float64) {
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok {
b.balance[account] = 0.0
}
b.balance[account] += v
}
func (b *Bank) Out(account string, value float64) error {
b.Lock()
defer b.Unlock()
v, ok := b.balance[account]
if !ok || v < value {
return errors.New("account not enough balance")
}
b.balance[account] -= value
return nil
}
func (b *Bank) Query(account string) float64 {
b.RLock()
defer b.RUnlock()
v, ok := b.balance[account]
if !ok {
return 0.0
}
return v
}
sync.WaitGroup等待一组工作完成后,再进行下一组工作
sync.WaitGroup
func (wg *WaitGroup) Add(delta int) Add添加n个并发协程
func (wg *WaitGroup) Done() Done完成一个并发协程
func (wg *WaitGroup) Wait() Wait等待其它并发协程结束
sync.WaitGroup
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
// 等待所有协程结束
wg.Wait()
fmt.Println("WaitGroup all process done ~")
}
sync.WaitGroup没有办法指定最大并发协程数,在一些场景下会有问题。例如操作数据库场景下,我们不希望某一些时刻出现大量连接数据库导致数据库不可访问。所以,为了能够控制最大的并发数,推荐使用最下面的,用法和sync.WaitGroup非常类似。
下面这个例子最多只有10个并发协程,如果已经达到10个并发协程,只有某一个协程执行了Done才能启动一个新的协程。
import "github.com/remeh/sizedwaitgroup"
func main() {
# 最大10个并发
wg := sizedwaitgroup.New(10)
for i = 0; i < 1000; i++ {
wg.Add()
go func() {
defer func() {
wg.Done()
}()
time.Sleep(1 * time.Second)
fmt.Println("hello world ~")
}()
}
// 等待所有协程结束
wg.Wait()
fmt.Println("WaitGroup all process done ~")
}
sync.Once
sync.Once指的是只执行一次的对象实现,常用来控制某些函数只能被调用一次。sync.Once的使用场景例如单例模式、系统初始化。
例如并发情况下多次调用channel的close会导致panic,解决这个问题我们可以使用sync.Once来保证close只会被执行一次。
sync.Once的结构如下所示,只有一个函数。使用变量done来记录函数的执行状态,使用sync.Mutex和sync.atomic来保证线程安全的读取done。
type Once struct {
m Mutex #互斥锁
done uint32 #执行状态
}
func (o *Once) Do(f func())
举个例子,1000个并发协程情况下只有一个协程会执行到fmt.Printf,多次执行的情况下输出的内容还不一样,因为这取决于哪个协程先调用到该匿名函数。
func main() {
once := &sync.Once{}
for i := 0; i < 1000; i++ {
go func(idx int) {
once.Do(func() {
time.Sleep(1 * time.Second)
fmt.Printf("hello world index: %d", idx)
})
}(i)
}
time.Sleep(5 * time.Second)
}
sync.Cond
sync.Cond
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
sync.CondWaitSignalBroadcast:
// Wait 等待通知
func (c *Cond) Wait()
// Signal 单播通知
func (c *Cond) Signal()
// Broadcast 广播通知
func (c *Cond) Broadcast()
sync.Cond
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
sync.Pool
sync.Pool指的是临时对象池,Golang和Java具有GC机制,因此很多开发者基本上都不会考虑内存回收问题,不像C++很多时候开发需要自己回收对象。
Gc是一把双刃剑,带来了编程的方便但同时也增加了运行时开销,使用不当可能会严重影响程序的性能,因此性能要求高的场景不能任意产生太多的垃圾。
sync.Pool正是用来解决这类问题的,Pool可以作为临时对象池来使用,不再自己单独创建对象,而是从临时对象池中获取出一个对象。
sync.Pool有2个函数Get和Put,Get负责从临时对象池中取出一个对象,Put用于结束的时候把对象放回临时对象池中。
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
看一个官方例子:
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {
// 获取临时对象,没有的话会自动创建
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes())
// 将临时对象放回到 Pool 中
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
从上面的例子我们可以看到创建一个Pool对象并不能指定大小,所以sync.Pool的缓存对象数量是没有限制的(只受限于内存),那sync.Pool是如何控制缓存临时对象数的呢?
sync.Pool在init的时候注册了一个poolCleanup函数,它会清除所有的pool里面的所有缓存的对象,该函数注册进去之后会在每次Gc之前都会调用,因此sync.Pool缓存的期限只是两次Gc之间这段时间。正因Gc的时候会清掉缓存对象,所以不用担心pool会无限增大的问题。
正因为如此sync.Pool适合用于缓存临时对象,而不适合用来做持久保存的对象池(连接池等)。
sync.Map
Go在1.9版本之前自带的map对象是不具有并发安全的,很多时候我们都得自己封装支持并发安全的Map结构,如下所示给map加个读写锁sync.RWMutex。
type MapWithLock struct {
sync.RWMutex
M map[string]Kline
}
sync.Map
// 查询一个key
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
// 设置key value
func (m *Map) Store(key, value interface{})
// 如果key存在则返回key对应的value,否则设置key value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// 删除一个key
func (m *Map) Delete(key interface{})
// 遍历map,仍然是无序的
func (m *Map) Range(f func(key, value interface{}) bool)
为了能够控制最大的并发数:
package sizedwaitgroup
import (
"context"
"sync/atomic"
"testing"
)
func TestWait(t *testing.T) {
swg := New(10)
var c uint32
for i := 0; i < 10000; i++ {
swg.Add()
go func(c *uint32) {
defer swg.Done()
atomic.AddUint32(c, 1)
}(&c)
}
swg.Wait()
if c != 10000 {
t.Fatalf("%d, not all routines have been executed.", c)
}
}
func TestThrottling(t *testing.T) {
var c uint32
swg := New(4)
if len(swg.current) != 0 {
t.Fatalf("the SizedWaitGroup should start with zero.")
}
for i := 0; i < 10000; i++ {
swg.Add()
go func(c *uint32) {
defer swg.Done()
atomic.AddUint32(c, 1)
if len(swg.current) > 4 {
t.Fatalf("not the good amount of routines spawned.")
return
}
}(&c)
}
swg.Wait()
}
func TestNoThrottling(t *testing.T) {
var c uint32
swg := New(0)
if len(swg.current) != 0 {
t.Fatalf("the SizedWaitGroup should start with zero.")
}
for i := 0; i < 10000; i++ {
swg.Add()
go func(c *uint32) {
defer swg.Done()
atomic.AddUint32(c, 1)
}(&c)
}
swg.Wait()
if c != 10000 {
t.Fatalf("%d, not all routines have been executed.", c)
}
}
func TestAddWithContext(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.TODO())
swg := New(1)
if err := swg.AddWithContext(ctx); err != nil {
t.Fatalf("AddContext returned error: %v", err)
}
cancelFunc()
if err := swg.AddWithContext(ctx); err != context.Canceled {
t.Fatalf("AddContext returned non-context.Canceled error: %v", err)
}
}