sync 包(二):条件变量 sync.Cond
sync.Cond
sync.Cond
sync.Cond
sync.Cond
type Cond struct {
  noCopy noCopy

  // L is held while observing or changing the condition
  L Locker

  notify  notifyList
  checker copyChecker
}

提供了三个方法:

// 等待通知
func (c *Cond) Wait() {
  c.checker.check()
  t := runtime_notifyListAdd(&c.notify)
  c.L.Unlock()
  runtime_notifyListWait(&c.notify, t)
  c.L.Lock()  
}

// 单发通知
func (c *Cond) Signal() {
  c.checker.check()
  runtime_notifyListNotifyOne(&c.notify)  
}

// 广播通知
func (c *Cond) Broadcast() {
  c.checker.check()
  runtime_notifyListNotifyAll(&c.notify)  
}
sync.NewCondsync.CondL
locker := &sync.Mutex{}
cond := sync.NewCond(locker)
sync.Cond

下面来看一个具体的示例。假设我们有一个读取器和一个写入器,读取器必须依赖写入器对缓冲区进行数据写入后,才可以从缓冲区中读取数据,写入器每次完成写入数据后,都需要通过某种通知机制通知处于阻塞状态的读取器,告诉它可以对数据进行访问,这种场景正好可以通过条件变量来实现:

package main

import (
  "bytes"
  "fmt"
  "io"
  "sync"
  "time"
)

// 数据 bucket
type DataBucket struct {
  buffer *bytes.Buffer  //缓冲区
  mutex *sync.RWMutex //互斥锁
  cond  *sync.Cond //条件变量
}

func NewDataBucket() *DataBucket {
  buf := make([]byte, 0)
  db := &DataBucket{
    buffer:     bytes.NewBuffer(buf),
    mutex: new(sync.RWMutex),
  }
  db.cond = sync.NewCond(db.mutex.RLocker())
  return db
}

// 读取器
func (db *DataBucket) Read(i int) {
  db.mutex.RLock()   // 打开读锁
  defer db.mutex.RUnlock()  // 结束后释放读锁
  var data []byte
  var d byte
  var err error
  for {
    //每次读取一个字节
    if d, err = db.buffer.ReadByte(); err != nil {
      if err == io.EOF { // 缓冲区数据为空时执行
        if string(data) != "" {  // data 不为空,则打印它
          fmt.Printf("reader-%d: %s\n", i, data)
        }
        db.cond.Wait() // 缓冲区为空,通过 Wait 方法等待通知,进入阻塞状态
        data = data[:0]  // 将 data 清空
        continue
      }
    }
    data = append(data, d) // 将读取到的数据添加到 data 中
  }
}

// 写入器
func (db *DataBucket) Put(d []byte) (int, error) {
  db.mutex.Lock()   // 打开写锁
  defer db.mutex.Unlock()  // 结束后释放写锁
  //写入一个数据块
  n, err := db.buffer.Write(d)
  db.cond.Signal()  // 写入数据后通过 Signal 通知处于阻塞状态的读取器
  return n, err
}

func main() {
  db := NewDataBucket()
  go db.Read(1) // 开启读取器协程
  go func(i int) {
    d := fmt.Sprintf("data-%d", i)
    db.Put([]byte(d))  // 写入数据到缓冲区
  }(1)  // 开启写入器协程
  time.Sleep(100 * time.Millisecond)
}
deferdb.cond.Wait()db.cond.Signal()

执行上述示例代码,结果如下:

reader-1: data-1
SignalBroadcast
package main

import (
  "bytes"
  "fmt"
  "io"
  "sync"
  "time"
)

// 数据 bucket
type DataBucket struct {
  buffer *bytes.Buffer  //缓冲区
  mutex *sync.RWMutex //互斥锁
  cond  *sync.Cond //条件变量
}

func NewDataBucket() *DataBucket {
  buf := make([]byte, 0)
  db := &DataBucket{
    buffer:     bytes.NewBuffer(buf),
    mutex: new(sync.RWMutex),
  }
  db.cond = sync.NewCond(db.mutex.RLocker())
  return db
}

// 读取器
func (db *DataBucket) Read(i int) {
  db.mutex.RLock()   // 打开读锁
  defer db.mutex.RUnlock()  // 结束后释放读锁
  var data []byte
  var d byte
  var err error
  for {
    //每次读取一个字节
    if d, err = db.buffer.ReadByte(); err != nil {
      if err == io.EOF { // 缓冲区数据为空时执行
        if string(data) != "" {  // data 不为空,则打印它
          fmt.Printf("reader-%d: %s\n", i, data)
        }
        db.cond.Wait() // 缓冲区为空,通过 Wait 方法等待通知,进入阻塞状态
        data = data[:0]  // 将 data 清空
        continue
      }
    }
    data = append(data, d) // 将读取到的数据添加到 data 中
  }
}

// 写入器
func (db *DataBucket) Put(d []byte) (int, error) {
  db.mutex.Lock()   // 打开写锁
  defer db.mutex.Unlock()  // 结束后释放写锁
  //写入一个数据块
  n, err := db.buffer.Write(d)
  db.cond.Broadcast()  // 写入数据后通过 Broadcast 通知处于阻塞状态的读取器
  return n, err
}

func main() {
  db := NewDataBucket()
  for i := 1; i < 3; i++ {  // 启动多个读取器
    go db.Read(i)
  }
  for j := 0; j < 10; j++  {  // 启动多个写入器
    go func(i int) {
      d := fmt.Sprintf("data-%d", i)
      db.Put([]byte(d))  // 写入数据到缓冲区
    }(j)
    time.Sleep(100 * time.Millisecond) // 每次启动一个写入器暂停100ms,让读取器阻塞
  }
}

执行上述代码,打印结果如下:

可以看到,通过互斥锁+条件变量,我们可以非常方便的实现多个 Go 协程之间的通信,但是这个还是比不上 channel,因为 channel 还可以实现数据传递,条件变量只是发送信号,唤醒被阻塞的协程继续执行,另外 channel 还有超时机制,不会出现协程等不到信号一直阻塞造成内存堆积问题,换句话说,channel 可以让程序更可控。