本地发布订阅模式样本:基于pubsub包

package main

import (
	"fmt"
	"github.com/moby/moby/pkg/pubsub"
	"strings"
	"time"
)

func main() {
	// 创建发布者
	p := pubsub.NewPublisher(100*time.Millisecond, 10)

	// 订阅主题
	golang := p.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok && strings.HasPrefix(key, "golang:") {
			return true
		}
		return false
	})
	docker := p.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok && strings.HasPrefix(key, "docker:") {
			return true
		}
		return false
	})


	go func() {
		for {
			// 发布
			p.Publish("golang: https://golang.org")
		}
	}()
	go func() {
		for {
			p.Publish("docker: https://www.docker.com/")
		}
	}()

	go func() {
		for {
			// 获取
			fmt.Println("golang topic:", <-golang)
			time.Sleep(1 * time.Second)
		}
	}()

	go func() {
		for {
			fmt.Println("docker topic:", <-docker)
			time.Sleep(2 * time.Second)
		}
	}()

	// 阻塞,避免主进程退出
	<-make(chan bool)
}

一、创建发布者

// NewPublisher creates a new pub/sub publisher to broadcast messages.
// The duration is used as the send timeout as to not block the publisher publishing
// messages to other clients if one client is slow or unresponsive.
// The buffer is used when creating new channels for subscribers.
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}
}

type Publisher struct {
	m           sync.RWMutex  
	buffer      int  // subscriber chan 的大小,用于缓存数据
	timeout     time.Duration
	subscribers map[subscriber]topicFunc  // topicFunc 过滤函数,用于把消息发送到特定的 subscriber 通道
}

type subscriber chan interface{}
type topicFunc func(v interface{}) bool

二、订阅主题

// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
// SubscribeTopic添加一个新的订户,其实就是一个 chan 类型,每一个用户,对应特定的过滤函数 topic。
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, p.buffer)
	p.m.Lock()
	// 学习点:要过滤处理信息,可以添加函数对象,要通信,可以添加通道对象
	// 在这里,通道对象作为键,函数对象作为值
	p.subscribers[ch] = topic
	// 注意:在 map 中 key 为 chan 时,为同一个 make 生成的 key 是相等的,不同一个 make 生成的 chan 是不相等的;可以参考 http://lanlingzi.cn/post/technical/2016/0904_go_map/
	p.m.Unlock()
	return ch
}

三、发布

// Publish sends the data in v to all subscribers currently registered with the publisher.
func (p *Publisher) Publish(v interface{}) {
	p.m.RLock()
	if len(p.subscribers) == 0 {
		p.m.RUnlock()
		return
	}
	wg := wgPool.Get().(*sync.WaitGroup)
	for sub, topic := range p.subscribers {
		wg.Add(1)
		// 尝试向每个订阅者(chan)发送消息
		go p.sendTopic(sub, topic, v, wg)
	}
	wg.Wait()
	wgPool.Put(wg)
	p.m.RUnlock()
}

func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
	defer wg.Done()

	// 消息过滤处理,如果消息满足该订阅者,则该订阅者尝试接受数据,否则返回
	if topic != nil && !topic(v) {
		return
	}
	// send under a select as to not block if the receiver is unavailable
	// 当超时设置不为0时,阻塞
	if p.timeout > 0 {
		timeout := time.NewTimer(p.timeout)
		defer timeout.Stop()
		select {
		// 消息进入通道(当缓存没满时,订阅者收到信息)
		case sub <- v:
		// 当被阻塞时,超时处理
		case <-timeout.C:
		}
		return
	}

	// 当超时设置为0时,默认不阻塞
	select {
	case sub <- v:
	default:
	}
}

四、获取

fmt.Println("golang topic:", <-golang)