https://github.com/pion/ion是用go实现的会议系统,它基于go实现的消息队列nats 和redis,官网文档https://pionion.github.io/docs/awesome-ion/awesome-ion很简明扼要,本地搭建:
./scripts/deps_inst
./scripts/all start
./scripts/all status初始化所有的依赖,然后启动服务。它同时提供了web app端https://github.com/pion/ion-app-web,启动它,就可以看到可以进行视频会议了
cd ion-app-web
npm i
npm start它主要包括下面几个部分
- RTC: (Real-Time Communication) RTC system scenes: conference/live-broadcasting/voip..
- Signal :( Signal server ) support signal logic
- Room : ( Room server) support room logic
- ISLB:(Intelligent server load balancing server) support node-discovery/load-bancing..
- SFU: (Selective Forwarding Unit ) broadcasting media streams
- AVP:( Audio video process server) Video Recoder/Audio Video Mixer/ AI Processor
- MCU:( Multipoint Control Unit )Audio Video Mixer and broadcasting
下面首先分析下islb的源码,它的入口位置在cmd/islb/main.go,先看线main函数。
func main(){
parse()
node := islb.NewISLB()
node.Start(conf)
defer node.Close()
}解析配置,然后初始化islb模块,启动这个模块,在函数退出时关闭这个模块。它对应的配置文件在configs/islb.toml,主要配置依赖的队列系统nats和redis地址
nats
redisislb定义在pkg/node/islb/islb.go
type ISLB struct {
ion.Node
s *islbServer
registry *Registry
redis *db.Redis
}其中node定义在pkg/ion/node.go
type Node struct {
// Node ID
NID string
// Nats Client Conn
nc *nats.Conn
// gRPC Service Registrar
nrpc *nrpc.Server
// Service discovery client
ndc *ndc.Client
nodeLock sync.RWMutex
//neighbor nodes
neighborNodes map[string]discovery.Node
cliLock sync.RWMutex
clis map[string]*nrpc.Client
}islbServer 定义在pkg/node/islb/server.go
type islbServer struct {
islb.UnimplementedISLBServer
redis *db.Redis
islb *ISLB
conf Config
//watchers map[string]islb.ISLB_WatchISLBEventServer
}registry定义在pkg/node/islb/registry.go
type Registry struct {
dc string
redis *db.Redis
reg *registry.Registry
mutex sync.Mutex
nodes map[string]discovery.Node
}下面重点看下Start函数:
func (i *ISLB) Start(conf Config) error
err = i.Node.Start(conf.Nats.URL)
i.redis = db.NewRedis(conf.Redis)
i.registry, err = NewRegistry(conf.Global.Dc, i.Node.NatsConn(), i.redis)
i.s = newISLBServer(conf, i, i.redis)
pb.RegisterISLBServer(i.Node.ServiceRegistrar(), i.s)
go func() {
err := i.Node.KeepAlive(node)
return n.ndc.KeepAlive(node)
go func() {
err := i.Node.Watch(proto.ServiceALL)
resp, err := n.ndc.Get(service, map[string]interface{}{})
for _, node := range resp.Nodes {
n.handleNeighborNodes(discovery.NodeUp, &node)
}
return n.ndc.Watch(context.Background(), service, n.handleNeighborNodes)它实现了会议系统中服务发现的核心逻辑,首先初始化依赖的nats消息队列和redis,然后注册了ServiceRegistrar,最后启动了两个协程,分别发送保活信号和watch服务的变化,它获取所有的节点,根据节点的状态,来处理neighbor节点的增删。逻辑如下:
func (n *Node) handleNeighborNodes(state discovery.NodeState, node *discovery.Node)
state == discovery.NodeUp
n.neighborNodes[id] = *node
state == discovery.NodeDown
delete(n.neighborNodes, id)如果邻居启动,加入集合,如果邻居挂掉,从集合中删除。
保活协程里面是一个定时器,如果它退出,它会发送删除消息,否则发送更新消息,代码位于pkg/mod/github.com/cloudwebrtc/nats-discovery@v0.3.0/pkg/client/client.go
func (c *Client) KeepAlive(node discovery.Node) error {
t := time.NewTicker(discovery.DefaultLivecycle)
defer func() {
c.sendAction(node, discovery.Delete)
t.Stop()
}()
for {
case <-t.C:
c.sendAction(node, discovery.Update)接下来看下sendAction是如何包装的:
func (c *Client) sendAction(node discovery.Node, action discovery.Action) error
data, err := util.Marshal(&discovery.Request{
Action: action, Node: node,
})
msg, err := c.nc.Request(subj, data, time.Duration(time.Second*15))它调用了Request发布一个消息并获取结果。
最后,我们重点看下NewRegistry干了什么,它的代码位置在pkg/node/islb/registry.go
func NewRegistry(dc string, nc *nats.Conn, redis *db.Redis) (*Registry, error)
reg, err := registry.NewRegistry(nc, discovery.DefaultExpire)
err = reg.Listen(r.handleNodeAction, r.handleGetNodes)它初始化了一个NewRegistry 然后注册处理事件和获取节点两个handler
func (r *Registry) handleNodeAction(action discovery.Action, node discovery.Node) (bool, error)
r.nodes[node.ID()] = node func (r *Registry) handleGetNodes(service string, params map[string]interface{}) ([]discovery.Node, error)
if service == proto.ServiceRTC {
for _, key := range r.redis.Keys(mkey) {
value := r.redis.Get(key)从redis中获取所有的节点,然后存储到nodes对象中。其中的Listen函数是nats 客户端的一个入口,代码位于github.com/cloudwebrtc/nats-discovery@v0.3.0/pkg/registry/registry.go
func (s *Registry) Listen(
handleNodeAction func(action discovery.Action, node discovery.Node) (bool, error),
handleGetNodes func(service string, params map[string]interface{}) ([]discovery.Node, error)) error {
sub, err := s.nc.Subscribe(subj, func(msg *nats.Msg) {
msgCh <- msg
})
case discovery.Save:
handleNodeAction(req.Action, req.Node)
case discovery.Update:
ok, err := handleNodeAction(req.Action, req.Node);
case discovery.Delete:
case discovery.Get:
handleGetNodes(req.Service, req.Params)
s.nc.Publish(msg.Reply, data)
go func() error {
sub.Unsubscribe()
t := time.NewTicker(time.Second * 1)
if err := s.checkExpires(nodes, now, handleNodeAction); err != nil {
err := handleNatsMsg(msg)如果有消息到来就处理,处理函数对应了它的两个入参handler,并且检查过期的节点。