摘自<<Go语言高级编程>>书中
发布订阅模型(publish-and-subscribe)通常简写为pub/sub模型。在这个模型中消息生产者称为发布者(publisher),消息消费者称为订阅者(subscriber),生产者和消费者是多对多关系。在传统生产者和消费者模型中是将消息发送给一个队列,而发布订阅模型是将消息发布到一个主题。
下面是代码实现:
//发布订阅模型实现
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} //订阅者,类型为管道
topicFunc func(v interface{}) bool //主题,是一个过滤器函数
)
//发布者对象
type publisher struct {
m sync.RWMutex //读写锁
buffer int //订阅队列缓存大小
timeout time.Duration //发布超时时间
subscribers map[subscriber]topicFunc //订阅者信息
}
//构建一个新的发布者对象
func NewPublisher(buffer int, publishTimeout time.Duration) *publisher {
return &publisher{
m: sync.RWMutex{},
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
//添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *publisher) SubscriberTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
defer p.m.Unlock()
p.subscribers[ch] = topic
return ch
}
//添加一个订阅者,订阅所有主题
func (p *publisher) SubscriberAllTopic() chan interface{} {
return p.SubscriberTopic(nil)
}
//退出订阅
func (p *publisher) Exict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
//关闭发布者对象,同时关闭所有订阅者管道
func (p *publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
close(sub)
delete(p.subscribers, sub)
}
}
//发布一个主题
func (p *publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
wg := sync.WaitGroup{}
for sub, topic := range p.subscribers { //向所有的订阅者管道发送主题
wg.Add(1)
go p.SendTopic(sub, topic, v, &wg)
}
}
//向订阅者发送主题
func (p *publisher) SendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) { //订阅者未订阅这个主题,不发送
return
}
select {
case sub <- v:
case <-time.After(p.timeout): //超时后就不再发送
}
}
package main
import (
"fmt"
"pubsub/pubsub"
"strings"
"time"
)
func main() {
//初始化一个发布者对象
publisher := pubsub.NewPublisher(3, 5*time.Second)
//创建一个订阅所有主题的订阅者
all := publisher.SubscriberAllTopic()
//创建一个订阅golang主题的订阅者
golang := publisher.SubscriberTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
//发布2条主题
publisher.Publish("hello world")
publisher.Publish("hello golang")
go func() {
for i := range all {
fmt.Println("all:", i)
}
}()
go func() {
for i := range golang {
fmt.Println("golang:", i)
}
}()
time.Sleep(5 * time.Second)
publisher.Close()
fmt.Println(<-all, <-golang) //发布者对象关闭后读取的都是nil
}
发布订阅模型中,每条消息都会发送给多个订阅者,发布者不知道也不关心哪个个订阅者在接收主题消息。订阅者和发布者可以在运行时动态添加,是一种松耦合关系,系统负复杂性会随时间推移而增长。天气预报可以应用这个模式