package nanomsg import ( "sync" "git.cdsdjx.net/sdjx/gdserver/common/setting" "git.cdsdjx.net/sdjx/gdserver/logs" "nanomsg.org/go/mangos/v2" "nanomsg.org/go/mangos/v2/protocol/sub" ) // 连接的接口 type ConnRes interface { Close() error Recv() ([]byte, error) } type Factory func() ConnRes // 连接池 type Pool struct { conns chan ConnRes factory Factory maxConn int } var oncePool sync.Once var pool *Pool func NewPool(factory Factory, maxConn int) *Pool { return &Pool{ conns: make(chan ConnRes, maxConn), factory: factory, maxConn: maxConn, } } func (p *Pool) new() ConnRes { for i := 0; i < p.maxConn; i++ { p.conns <- p.factory() } return p.factory() } func (p *Pool) Get() (conn ConnRes) { select { case conn = <-p.conns: p.Put(conn) default: conn = p.new() } return } func (p *Pool) Put(conn ConnRes) { select { case p.conns <- conn: { } default: conn.Close() } } func GetSub() (conn ConnRes) { oncePool.Do(func() { setting.Setting() pool = NewPool(func() ConnRes { var err error if SockSub, err = sub.NewSocket(); err != nil { logs.Error("创建socket失败: %s", err.Error()) return nil } if err = SockSub.Dial(setting.NanomsgPubsubUrl); err != nil { logs.Error("连接socket失败: %s", err.Error()) return nil } err = SockSub.SetOption(mangos.OptionSubscribe, []byte("")) if err != nil { logs.Error("订阅失败: %s", err.Error()) return nil } return SockSub }, setting.NanomsgMixConn) }) return pool.Get() }