摘自<<Go语言高级编程>>书中

发布订阅模型(publish-and-subscribe)通常简写为pub/sub模型。在这个模型中消息生产者称为发布者(publisher),消息消费者称为订阅者(subscriber),生产者和消费者是多对多关系。在传统生产者和消费者模型中是将消息发送给一个队列,而发布订阅模型是将消息发布到一个主题。

下面是代码实现:

//发布订阅模型实现
package pubsub

import (
	"sync"
	"time"
)

type (
	subscriber chan interface{}         //订阅者,类型为管道
	topicFunc  func(v interface{}) bool //主题,是一个过滤器函数
)

//发布者对象
type publisher struct {
	m           sync.RWMutex             //读写锁
	buffer      int                      //订阅队列缓存大小
	timeout     time.Duration            //发布超时时间
	subscribers map[subscriber]topicFunc //订阅者信息
}

//构建一个新的发布者对象
func NewPublisher(buffer int, publishTimeout time.Duration) *publisher {
	return &publisher{
		m:           sync.RWMutex{},
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}
}

//添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *publisher) SubscriberTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, p.buffer)
	p.m.Lock()
	defer p.m.Unlock()
	p.subscribers[ch] = topic
	return ch
}

//添加一个订阅者,订阅所有主题
func (p *publisher) SubscriberAllTopic() chan interface{} {
	return p.SubscriberTopic(nil)
}

//退出订阅
func (p *publisher) Exict(sub chan interface{}) {
	p.m.Lock()
	defer p.m.Unlock()
	delete(p.subscribers, sub)
	close(sub)
}

//关闭发布者对象,同时关闭所有订阅者管道
func (p *publisher) Close() {
	p.m.Lock()
	defer p.m.Unlock()

	for sub := range p.subscribers {
		close(sub)
		delete(p.subscribers, sub)
	}
}

//发布一个主题
func (p *publisher) Publish(v interface{}) {
	p.m.RLock()
	defer p.m.RUnlock()
	wg := sync.WaitGroup{}
	for sub, topic := range p.subscribers { //向所有的订阅者管道发送主题
		wg.Add(1)
		go p.SendTopic(sub, topic, v, &wg)
	}
}

//向订阅者发送主题
func (p *publisher) SendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
	defer wg.Done()
	if topic != nil && !topic(v) { //订阅者未订阅这个主题,不发送
		return
	}
	select {
	case sub <- v:
	case <-time.After(p.timeout): //超时后就不再发送
	}
}

package main

import (
	"fmt"
	"pubsub/pubsub"
	"strings"
	"time"
)

func main() {
	//初始化一个发布者对象
	publisher := pubsub.NewPublisher(3, 5*time.Second)
	//创建一个订阅所有主题的订阅者
	all := publisher.SubscriberAllTopic()
	//创建一个订阅golang主题的订阅者
	golang := publisher.SubscriberTopic(func(v interface{}) bool {
		if s, ok := v.(string); ok {
			return strings.Contains(s, "golang")
		}
		return false
	})
	
	//发布2条主题
	publisher.Publish("hello world")
	publisher.Publish("hello golang")

	go func() {
		for i := range all {
			fmt.Println("all:", i)
		}
	}()

	go func() {
		for i := range golang {
			fmt.Println("golang:", i)
		}
	}()

	time.Sleep(5 * time.Second)
	publisher.Close()
	fmt.Println(<-all, <-golang) //发布者对象关闭后读取的都是nil
}

发布订阅模型中,每条消息都会发送给多个订阅者,发布者不知道也不关心哪个个订阅者在接收主题消息。订阅者和发布者可以在运行时动态添加,是一种松耦合关系,系统负复杂性会随时间推移而增长。天气预报可以应用这个模式