为什么需要contex?

在任何语言编程过程中,都希望有一个上下文对象记录全局需要的变量信息以及控制信息,例如下面的例子:

func main() {
	messages := make(chan int, 10)
	done := make(chan bool)

	defer close(messages)
	// consumer
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		for _ = range ticker.C {
			select {
			case <-done:
				fmt.Println("child process interrupt...")
				return
			default:
				fmt.Printf("send message: %d\n", <-messages)
			}
		}
	}()

	// producer
	for i := 0; i < 10; i++ {
		messages <- i
	}
	time.Sleep(5 * time.Second)
	close(done)
	time.Sleep(1 * time.Second)
	fmt.Println("main process exit!")
}

此示例中主要还是需要一个控制信息,在主协程退出之前,发出信号关闭子协程.因为程序比较简单,还看不出context的必要性.如果协程数量增加或者全局变量也逐渐递增(实际应用中很常见),则是否可以使用统一的对象整合这些信息呢?

Context接口

type Context interface {

	Deadline() (deadline time.Time, ok bool)

	Done() <-chan struct{}

	Err() error

	Value(key interface{}) interface{}
}

符合此interface的对象context_instance, 通过它可以得到deadline(父协程希望自己--子协程--结束的时间); 通过Done()函数可以得到一个channel,监听它判断是否退出; Err()函数提供了退出原因; Value()则可以得到key的键值.

emptyCtx存在的意义

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

Context接口定义了上下文需要包含的功能属性, 至于如何实现, 完全是灵活的. 但是试想, 某种情况下, 我只需要设置和获取key-value的功能怎么办呢? 或者我只需要控制功能不需要Value(xxx)功能? 因为接口是很霸道的, 定义的功能项必须都要实现, 才能转化为接口实例.

如此, 一个巧妙的设计产生了, 定义一个空的祖对象, 实现了空函数(看似实现却只是空函数)用来欺骗编译器和运行时. emptyCtx的意义就在此.

valueCtx

// WithValue returns a copy of parent in which the value associated with key is val.
func WithValue(parent Context, key, val interface{}) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val interface{}
}

valueCtx通过包含内部类型Context, 得到了所有Context的功能属性. 类似继承的概念, valueCtx继承了父Context的功能. 另外我们知道, 外部类型如果实现了同样的接口,则会覆盖内部类型实现(类似重写). 如下:

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

如此valueCtx相当于实现了自己的设置和获取key-value的功能. 往父Context直到祖Context是另一个巧妙设置, 但是原理比较简单, 这里不赘述.

示例

提供一个示例,但是没有实际意义,只是测试:

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"
)

var context_root = context.Background()

func main() {
	context_t := context_root

	for i := 0; i < 10; i++ {
		context_t = context.WithValue(context_t, i, strconv.Itoa(i))
	}

	messages := make(chan string, 10)
	done := make(chan bool)

	defer close(messages)
	// consumer
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		for _ = range ticker.C {
			select {
			case <-done:
				fmt.Println("child process interrupt...")
				return
			default:
				fmt.Printf("send message: %s\n", <-messages)
			}
		}
	}()

	// producer
	for i := 0; i < 10; i++ {
		messages <- context_t.Value(i).(string)
	}
	time.Sleep(5 * time.Second)
	close(done)
	time.Sleep(1 * time.Second)
	fmt.Println("main process exit!")
}

cancelCtx

type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

同上,cancelCtx也是继承了Context功能, 但是着重实现了cancel功能, 可以控制子协程的中断.

父子协程示例

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"
)

var context_root = context.Background()

func main() {
	context_t := context_root

	for i := 0; i < 10; i++ {
		context_t = context.WithValue(context_t, i, strconv.Itoa(i))
	}

	conext_t, cancel := context.WithCancel(context_t)

	messages := make(chan string, 10)
	//done := make(chan bool)

	defer close(messages)
	// consumer
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		for _ = range ticker.C {
			select {
			//case <-done:
			case <-conext_t.Done():
				fmt.Println("child process interrupt...")
				return
			default:
				fmt.Printf("send message: %s\n", <-messages)
			}
		}
	}()

	// producer
	for i := 0; i < 10; i++ {
		messages <- context_t.Value(i).(string)
	}
	time.Sleep(5 * time.Second)
	//close(done)
	cancel()
	time.Sleep(1 * time.Second)
	fmt.Println("main process exit!")
}

孙协程示例

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"
)

var context_root = context.Background()

func main() {
	context_t := context_root

	for i := 0; i < 100; i++ {
		context_t = context.WithValue(context_t, i, strconv.Itoa(i))
	}

	conext_t, cancel := context.WithCancel(context_t)

	messages := make(chan string, 100)
	//done := make(chan bool)

	defer close(messages)
	// consumer
	go func() {
		conext_grandchild, cancel_g := context.WithCancel(context_t)
		defer cancel_g()
		go func() {
			ticker := time.NewTicker(1 * time.Second)
			for _ = range ticker.C {
				select {
				case <-conext_grandchild.Done():
					fmt.Println("grandchild process interrupt...")
					return
				default:
					fmt.Printf("grandchild got message: %s\n", <-messages)
				}
			}
		}()

		ticker := time.NewTicker(1 * time.Second)
		for _ = range ticker.C {
			select {
			//case <-done:
			case <-conext_t.Done():
				fmt.Println("child process interrupt...")
				return
			default:
				fmt.Printf("child got message: %s\n", <-messages)
			}
		}
	}()

	// producer
	for i := 0; i < 100; i++ {
		messages <- context_t.Value(i).(string)
	}
	time.Sleep(5 * time.Second)
	//close(done)
	cancel()
	time.Sleep(2 * time.Second)
	fmt.Println("main process exit!")
}

此处有个问题, 理论上祖协程关闭父协程时, 应当同时关闭孙协程, 但是没有?

timerCtx

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

timerCtx继承了cancelCtx, 另外实现了deadline功能

示例

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"
)

var context_root = context.Background()

func main() {
	context_t := context_root

	for i := 0; i < 100; i++ {
		context_t = context.WithValue(context_t, i, strconv.Itoa(i))
	}

	conext_t, cancel := context.WithTimeout(context_t, 5*time.Second)

	messages := make(chan string, 100)
	//done := make(chan bool)

	defer close(messages)
	defer cancel()
	// consumer
	go func() {
		conext_grandchild, cancel_g := context.WithCancel(context_t)
		defer cancel_g()
		go func() {
			ticker := time.NewTicker(1 * time.Second)
			for _ = range ticker.C {
				select {
				case <-conext_grandchild.Done():
					fmt.Println("grandchild process interrupt...")
					return
				default:
					fmt.Printf("grandchild got message: %s\n", <-messages)
				}
			}
		}()

		ticker := time.NewTicker(1 * time.Second)
		for _ = range ticker.C {
			select {
			//case <-done:
			case <-conext_t.Done():
				fmt.Println("child process interrupt...")
				return
			default:
				fmt.Printf("child got message: %s\n", <-messages)
			}
		}
	}()

	// producer
	for i := 0; i < 100; i++ {
		messages <- context_t.Value(i).(string)
	}

	time.Sleep(10 * time.Second)
	fmt.Println("main process exit!")
}