https://github.com/pion/ion是用go实现的会议系统,它基于go实现的消息队列nats 和redis,官网文档https://pionion.github.io/docs/awesome-ion/awesome-ion很简明扼要,本地搭建:

./scripts/deps_inst
./scripts/all start
./scripts/all status

初始化所有的依赖,然后启动服务。它同时提供了web app端https://github.com/pion/ion-app-web,启动它,就可以看到可以进行视频会议了

cd ion-app-web
npm i
npm start

它主要包括下面几个部分

  1. RTC: (Real-Time Communication) RTC system scenes: conference/live-broadcasting/voip..
  2. Signal :( Signal server ) support signal logic
  3. Room : ( Room server) support room logic
  4. ISLB:(Intelligent server load balancing server) support node-discovery/load-bancing..
  5. SFU: (Selective Forwarding Unit ) broadcasting media streams
  6. AVP:( Audio video process server) Video Recoder/Audio Video Mixer/ AI Processor
  7. MCU:( Multipoint Control Unit )Audio Video Mixer and broadcasting

下面首先分析下islb的源码,它的入口位置在cmd/islb/main.go,先看线main函数。

func main(){
      parse()
      node := islb.NewISLB()
      node.Start(conf)
      defer node.Close()
}

解析配置,然后初始化islb模块,启动这个模块,在函数退出时关闭这个模块。它对应的配置文件在configs/islb.toml,主要配置依赖的队列系统nats和redis地址

nats
redis

islb定义在pkg/node/islb/islb.go

 type ISLB struct {
  ion.Node
  s        *islbServer
  registry *Registry
  redis    *db.Redis
}

其中node定义在pkg/ion/node.go

type Node struct {
  // Node ID
  NID string
  // Nats Client Conn
  nc *nats.Conn
  // gRPC Service Registrar
  nrpc *nrpc.Server
  // Service discovery client
  ndc *ndc.Client


  nodeLock sync.RWMutex
  //neighbor nodes
  neighborNodes map[string]discovery.Node


  cliLock sync.RWMutex
  clis    map[string]*nrpc.Client
}

islbServer 定义在pkg/node/islb/server.go

type islbServer struct {
  islb.UnimplementedISLBServer
  redis *db.Redis
  islb  *ISLB
  conf  Config
  //watchers map[string]islb.ISLB_WatchISLBEventServer
}

registry定义在pkg/node/islb/registry.go

type Registry struct {
  dc    string
  redis *db.Redis
  reg   *registry.Registry
  mutex sync.Mutex
  nodes map[string]discovery.Node
}

下面重点看下Start函数:

func (i *ISLB) Start(conf Config) error
          err = i.Node.Start(conf.Nats.URL)
          i.redis = db.NewRedis(conf.Redis)
          i.registry, err = NewRegistry(conf.Global.Dc, i.Node.NatsConn(), i.redis)
          i.s = newISLBServer(conf, i, i.redis)
  pb.RegisterISLBServer(i.Node.ServiceRegistrar(), i.s)
          go func() {
              err := i.Node.KeepAlive(node)
              return n.ndc.KeepAlive(node)
          go func() {
               err := i.Node.Watch(proto.ServiceALL)
              resp, err := n.ndc.Get(service, map[string]interface{}{})
       for _, node := range resp.Nodes {
           n.handleNeighborNodes(discovery.NodeUp, &node)
        }
   return n.ndc.Watch(context.Background(), service, n.handleNeighborNodes)

它实现了会议系统中服务发现的核心逻辑,首先初始化依赖的nats消息队列和redis,然后注册了ServiceRegistrar,最后启动了两个协程,分别发送保活信号和watch服务的变化,它获取所有的节点,根据节点的状态,来处理neighbor节点的增删。逻辑如下:

func (n *Node) handleNeighborNodes(state discovery.NodeState, node *discovery.Node) 
       state == discovery.NodeUp
        n.neighborNodes[id] = *node
      state == discovery.NodeDown
        delete(n.neighborNodes, id)

如果邻居启动,加入集合,如果邻居挂掉,从集合中删除。

保活协程里面是一个定时器,如果它退出,它会发送删除消息,否则发送更新消息,代码位于pkg/mod/github.com/cloudwebrtc/nats-discovery@v0.3.0/pkg/client/client.go

func (c *Client) KeepAlive(node discovery.Node) error {
   t := time.NewTicker(discovery.DefaultLivecycle)
        defer func() {
    c.sendAction(node, discovery.Delete)
    t.Stop()
  }()
      for {
            case <-t.C:
      c.sendAction(node, discovery.Update)

接下来看下sendAction是如何包装的:

func (c *Client) sendAction(node discovery.Node, action discovery.Action) error
        data, err := util.Marshal(&discovery.Request{
    Action: action, Node: node,
  })
      msg, err := c.nc.Request(subj, data, time.Duration(time.Second*15))

它调用了Request发布一个消息并获取结果。

最后,我们重点看下NewRegistry干了什么,它的代码位置在pkg/node/islb/registry.go

func NewRegistry(dc string, nc *nats.Conn, redis *db.Redis) (*Registry, error) 
      reg, err := registry.NewRegistry(nc, discovery.DefaultExpire)
      err = reg.Listen(r.handleNodeAction, r.handleGetNodes)

它初始化了一个NewRegistry 然后注册处理事件和获取节点两个handler

func (r *Registry) handleNodeAction(action discovery.Action, node discovery.Node) (bool, error) 
      r.nodes[node.ID()] = node  
func (r *Registry) handleGetNodes(service string, params map[string]interface{}) ([]discovery.Node, error) 
      if service == proto.ServiceRTC {
        for _, key := range r.redis.Keys(mkey) {
      value := r.redis.Get(key)

从redis中获取所有的节点,然后存储到nodes对象中。其中的Listen函数是nats 客户端的一个入口,代码位于github.com/cloudwebrtc/nats-discovery@v0.3.0/pkg/registry/registry.go

func (s *Registry) Listen(
  handleNodeAction func(action discovery.Action, node discovery.Node) (bool, error),
  handleGetNodes func(service string, params map[string]interface{}) ([]discovery.Node, error)) error {
      sub, err := s.nc.Subscribe(subj, func(msg *nats.Msg) {
    msgCh <- msg
  })
      case discovery.Save:
        handleNodeAction(req.Action, req.Node)
      case discovery.Update:
        ok, err := handleNodeAction(req.Action, req.Node);
      case discovery.Delete:
      case discovery.Get:
         handleGetNodes(req.Service, req.Params)
      s.nc.Publish(msg.Reply, data)
      go func() error {
        sub.Unsubscribe()
        t := time.NewTicker(time.Second * 1)
        if err := s.checkExpires(nodes, now, handleNodeAction); err != nil {
        err := handleNatsMsg(msg)

如果有消息到来就处理,处理函数对应了它的两个入参handler,并且检查过期的节点。