控制并发有三种种经典的方式,一种是通过channel通知实现并发控制 一种是WaitGroup,另外一种就是Context。

1. 使用最基本通过channel通知实现并发控制

无缓冲通道

goroutinegoroutine
goroutinegouroutine
goroutine
func main() {
    ch := make(chan struct{})
    go func() {
        fmt.Println("do something..")
        time.Sleep(time.Second * 1)
        ch <- struct{}{}
    }()

    <-ch

    fmt.Println("I am finished")
}
goroutine<-chchannelchannel

2. 通过sync包中的WaitGroup实现并发控制

syncWaitGroupgoroutine
  • Add, 可以添加或减少 goroutine的数量
  • Done, 相当于Add(-1)
  • Wait, 执行后会堵塞主线程,直到WaitGroup 里的值减至0
goroutineAdd(delta int)goroutinegoroutineDone()goroutinegoroutinegoroutineWaitGroup
func main(){
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {
        wg.Add(1)
        go func(url string) {
            defer wg.Done()
            http.Get(url)
        }(url)
    }
    wg.Wait()
}

但是在Golang官网中,有这么一句话

  • A WaitGroup must not be copied after first use.
WaitGroup
func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(wg sync.WaitGroup, i int) {
            log.Printf("i:%d", i)
            wg.Done()
        }(wg, i)
    }
    wg.Wait()
    log.Println("exit")
}

运行结果如下

2009/11/10 23:00:00 i:4
2009/11/10 23:00:00 i:0
2009/11/10 23:00:00 i:1
2009/11/10 23:00:00 i:2
2009/11/10 23:00:00 i:3
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x1040a13c, 0x44bc)
    /usr/local/go/src/runtime/sema.go:47 +0x40
sync.(*WaitGroup).Wait(0x1040a130, 0x121460)
    /usr/local/go/src/sync/waitgroup.go:131 +0x80
main.main()
    /tmp/sandbox894380819/main.go:19 +0x120

goroutinewggoroutineAddDonewgWait
wg*sync.WaitGrouwgwg

go 中五种引用类型有 slice, channel, function, map, interface

interface是Go语言中最成功的设计之一,空的interface可以被当作“鸭子”类型使用,它使得Go这样的静态语言拥有了一定的动态性,但却又不损失静态语言在类型安全方面拥有的编译时检查的优势。依赖于接口而不是实现,优先使用组合而不是继承,这是程序抽象的基本原则。但是长久以来以C++为代表的“面向对象”语言曲解了这些原则,让人们走入了误区。为什么要将方法和数据绑死?为什么要有多重继承这么变态的设计?面向对象中最强调的应该是对象间的消息传递,却为什么被演绎成了封装继承和多态。面向对象是否实现程序程序抽象的合理途径,又或者是因为它存在我们就认为它合理了。历史原因,中间出现了太多的错误。不管怎么样,Go的interface给我们打开了一扇新的窗。

3. 在Go 1.7 以后引进的强大的Context上下文,实现并发控制

3.1 简介

channelWaitGroupchannelWaitGroupRequestRequestgoroutinegoroutinegoroutinegoroutineContextgoroutineContextgoroutine
contextgoroutinegoroutine

3.2 package context

contextstruct Context
// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this `Context` is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this Context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}
Done()channelErr()Done()contextDeadline()context cancelValue()Contextrequest
ContextContextgorotuinegoroutine
ContextCancelDone channelgoroutine

3.3 继承 context

ContextContextContextContextContext
BackgroundContext
// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level `Context` for incoming requests.
func Background() Context
WithCancelWithTimeoutContext
ContextWithCancelWithTimeout
// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithValueContext
// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

3.4 context例子

当然,想要知道 Context 包是如何工作的,最好的方法是看一个例子。

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Message struct {
    netId int
    Data  string
}

type ServerConn struct {
    sendCh   chan Message
    handleCh chan Message
    wg       *sync.WaitGroup
    ctx      context.Context
    cancel   context.CancelFunc
    netId    int
}

func main() {

    conn := &ServerConn{
        sendCh:   make(chan Message),
        handleCh: make(chan Message),
        wg:       &sync.WaitGroup{},
        netId:    100,
    }

    conn.ctx, conn.cancel = context.WithCancel(context.WithValue(context.Background(), "key", conn.netId))
    loopers := []func(*ServerConn, *sync.WaitGroup){readLoop, writeLoop, handleLoop}

    for _, looper := range loopers {
        conn.wg.Add(1)
        go looper(conn, conn.wg)
    }

    go func() {
        time.Sleep(time.Second * 3)
        conn.cancel()
    }()

    conn.wg.Wait()

}

func readLoop(c *ServerConn, wg *sync.WaitGroup) {

    netId, _ := c.ctx.Value("key").(int)
    handlerCh := c.handleCh
    ctx, _ := context.WithCancel(c.ctx)
    cDone := ctx.Done()

    defer wg.Done()

    for {
        time.Sleep(time.Second * 1)
        select {
        case <-cDone:
            fmt.Println("readLoop close")
            return
        default:
            handlerCh <- Message{netId, "Hello world"}
        }
    }
}

func handleLoop(c *ServerConn, wg *sync.WaitGroup) {
    handlerCh := c.handleCh
    sendCh := c.sendCh
    ctx, _ := context.WithCancel(c.ctx)
    cDone := ctx.Done()

    defer wg.Done()

    for {
        select {
        case handleData, ok := <-handlerCh:
            if ok {
                handleData.netId++
                handleData.Data = "I am whole world"
                sendCh <- handleData
            }

        case <-cDone:
            fmt.Println("handleLoop close")
            return
        }

    }
}

func writeLoop(c *ServerConn, wg *sync.WaitGroup) {
    sendCh := c.sendCh
    ctx, _ := context.WithCancel(c.ctx)
    cDone := ctx.Done()

    defer wg.Done()

    for {
        select {
        case sendData, ok := <-sendCh:
            if ok {
                fmt.Println(sendData)
            }
        case <-cDone:
            fmt.Println("writeLoop close")
            return
        }
    }
}


goroutinegoroutinechannelsync.WaitGroupcontext
goroutinegoroutinecontextgoroutinectxContextselectContextContextContextCancelContextContextContextgoroutinegoroutine

下面是运行结果:

3.5 Context 使用原则

ContextContextContextContextcontext.TODOContextValueContextgoroutine