本地发布订阅模式样本:基于pubsub包
package main
import (
"fmt"
"github.com/moby/moby/pkg/pubsub"
"strings"
"time"
)
func main() {
// 创建发布者
p := pubsub.NewPublisher(100*time.Millisecond, 10)
// 订阅主题
golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok && strings.HasPrefix(key, "golang:") {
return true
}
return false
})
docker := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok && strings.HasPrefix(key, "docker:") {
return true
}
return false
})
go func() {
for {
// 发布
p.Publish("golang: https://golang.org")
}
}()
go func() {
for {
p.Publish("docker: https://www.docker.com/")
}
}()
go func() {
for {
// 获取
fmt.Println("golang topic:", <-golang)
time.Sleep(1 * time.Second)
}
}()
go func() {
for {
fmt.Println("docker topic:", <-docker)
time.Sleep(2 * time.Second)
}
}()
// 阻塞,避免主进程退出
<-make(chan bool)
}
一、创建发布者
// NewPublisher creates a new pub/sub publisher to broadcast messages.
// The duration is used as the send timeout as to not block the publisher publishing
// messages to other clients if one client is slow or unresponsive.
// The buffer is used when creating new channels for subscribers.
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
type Publisher struct {
m sync.RWMutex
buffer int // subscriber chan 的大小,用于缓存数据
timeout time.Duration
subscribers map[subscriber]topicFunc // topicFunc 过滤函数,用于把消息发送到特定的 subscriber 通道
}
type subscriber chan interface{}
type topicFunc func(v interface{}) bool
二、订阅主题
// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
// SubscribeTopic添加一个新的订户,其实就是一个 chan 类型,每一个用户,对应特定的过滤函数 topic。
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
// 学习点:要过滤处理信息,可以添加函数对象,要通信,可以添加通道对象
// 在这里,通道对象作为键,函数对象作为值
p.subscribers[ch] = topic
// 注意:在 map 中 key 为 chan 时,为同一个 make 生成的 key 是相等的,不同一个 make 生成的 chan 是不相等的;可以参考 http://lanlingzi.cn/post/technical/2016/0904_go_map/
p.m.Unlock()
return ch
}
三、发布
// Publish sends the data in v to all subscribers currently registered with the publisher.
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
if len(p.subscribers) == 0 {
p.m.RUnlock()
return
}
wg := wgPool.Get().(*sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
// 尝试向每个订阅者(chan)发送消息
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
wgPool.Put(wg)
p.m.RUnlock()
}
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
// 消息过滤处理,如果消息满足该订阅者,则该订阅者尝试接受数据,否则返回
if topic != nil && !topic(v) {
return
}
// send under a select as to not block if the receiver is unavailable
// 当超时设置不为0时,阻塞
if p.timeout > 0 {
timeout := time.NewTimer(p.timeout)
defer timeout.Stop()
select {
// 消息进入通道(当缓存没满时,订阅者收到信息)
case sub <- v:
// 当被阻塞时,超时处理
case <-timeout.C:
}
return
}
// 当超时设置为0时,默认不阻塞
select {
case sub <- v:
default:
}
}
四、获取
fmt.Println("golang topic:", <-golang)