服务发现封装
package lib
import (
"context"
"log"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
serverList map[string]string //服务列表
lock sync.Mutex
}
//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}
//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
//监视前缀,修改变更的server
go s.watcher(prefix)
return nil
}
//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}
//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}
//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
s.lock.Lock()
defer s.lock.Unlock()
addrs := make([]string, 0)
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
测试代码:
package main
import (
"demo/lib"
"log"
"time"
)
func main() {
var endpoints = []string{"localhost:2379"}
ser := lib.NewServiceDiscovery(endpoints)
defer ser.Close()
ser.WatchService("srv/goods")
for {
log.Println("service list:", ser.GetServices())
time.Sleep(5 * time.Second)
}
}
执行:
go run .\main.go
2022-05-08 14:37:38.436393 I | service list: []
2022-05-08 14:37:38.437991 I | watching prefix:srv/goods now...
2022-05-08 14:37:42.779572 I | put key : srv/goods/43.12.6.44:8899 val: 43.12.6.44:8899
2022-05-08 14:37:43.446450 I | service list: [43.12.6.44:8899]
2022-05-08 14:37:48.414564 I | put key : srv/goods/225.12.6.44:6699 val: 225.12.6.44:6699
2022-05-08 14:37:48.448809 I | service list: [43.12.6.44:8899 225.12.6.44:6699]
2022-05-08 14:37:53.449450 I | service list: [43.12.6.44:8899 225.12.6.44:6699]
2022-05-08 14:37:58.451543 I | service list: [43.12.6.44:8899 225.12.6.44:6699]
2022-05-08 14:38:02.787721 I | del key: srv/goods/43.12.6.44:8899
2022-05-08 14:38:03.471530 I | service list: [225.12.6.44:6699]
2022-05-08 14:38:08.429398 I | del key: srv/goods/225.12.6.44:6699
2022-05-08 14:38:08.471644 I | service list: []
2022-05-08 14:38:13.478285 I | service list: []