ETCD实现简单服务注册与发现(Go语言)

前言

​ 服务发现要解决的也是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立连接。本质上来说,服务发现就是想要了解集群中是否有进程在监听 udp 或 tcp 端口,并且通过名字就可以查找和连接。正常情况下当我们要访问服务时需要知道服务实例地址和端口,如果服务实例地址和端口都是固定的我们可以直接将其配置在文件中使用,但大多数线上生产环境尤其容器部署情况下服务实例地址都是动态分配的,只有当服务实例实际部署之后才能获得地址,服务调用者根本无法提取获取服务实例地址和端口,只能在运行时通过服务发现组件解析服务名来获取服务实例地址和端口。

服务发现简单来讲就是通过服务名找到提供服务的实例地址和端口,主要用于解决如何获取服务实例地址问题。近年来随着容器技术的兴起,大量服务分散在系统各处,服务彼此之间调用都需要通过服务发现来实现。

服务发现需要实现一下基本功能:

服务注册健康检查服务发现

注意:本文旨在实现简单的服务注册与发现,作为入门及熟悉etcdApi等,实际会比这复杂的多,考虑的更多。

服务注册

go语言实现

ServiceRegister
type ServiceRegister struct {
    // etcd client
	cli *clientv3.Client
	// service register key
	serviceKey string
	// service register prefix
	serviceKeyPrefix string
	// service register endpoint
	serviceEndpoint string
	// leaseID
	leaseID clientv3.LeaseID
}
NewServiceRegister
func NewServiceRegister(
	endpoints []string,
	serviceKeyPrefix string,
	serviceKey string,
	serviceEndpoint string) *ServiceRegister {

	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal("etcd connect faith...")
	}

	serviceReg := &ServiceRegister{
		cli:              cli,
		serviceKey:       serviceKey,
		serviceKeyPrefix: serviceKeyPrefix,
		serviceEndpoint:  serviceEndpoint,
	}

	return serviceReg
}
Register

例如:/web/node1 /web/node2 拥有同一个前缀/web 将其绑定在同一个租约上,当然你也可以给与绑定不同租约,设置不同的过期时间,道理都是相同的。

func (s *ServiceRegister) Register(ttl int64) error {

	serviceResp, err := s.cli.Get(context.Background(), s.serviceKeyPrefix, clientv3.WithPrefix())
	if err != nil {
		log.Println("etcd 操作出错")
		return err
	}
	// 首先判断当前前缀下是否有同一Lease
	for _, kv := range serviceResp.Kvs {
		// exist Grant
		if kv.Lease != 0 {
            // 设置租约ID
			s.leaseID = clientv3.LeaseID(kv.Lease)
			break
		}
	}
	
    // 没有租约 进行申请
	if s.leaseID == 0 {
		grant, err := s.cli.Grant(context.Background(), ttl)
		if err != nil {
			log.Println("申请租约失败...")
			return err
		}
		s.leaseID = grant.ID
	}
	
    // 进行注册 设置服务 并绑定租约
	_, err = s.cli.Put(context.Background(), s.serviceKey, s.serviceEndpoint, clientv3.WithLease(s.leaseID))
	if err != nil {
		log.Println("服务注册失败...")
		return err
	}
    
    // 续约操作
	go s.ListenKeepAliveChan()

	return nil
}
ListenKeepAliveChan

续约的功能作为健康检查

func (s *ServiceRegister) ListenKeepAliveChan() {
	lease := clientv3.NewLease(s.cli)
    // 进行持续的续约
	keepAlive, err := lease.KeepAlive(context.Background(), s.leaseID)
	if err != nil {
		log.Fatal("keepAlive faith...")
	}

	for resp := range keepAlive {
		println("租约 :: ", resp.ID, "续约成功!")
	}
}

现在可以开始测试

// test 测试register1

func TestRegister1(t *testing.T) {
	endpoint := []string{"http://localhost:2379"}
	servicePrefix := "/web/userService/"
	serviceKey := servicePrefix + "node1"
	port := "port:8888"

	register := NewServiceRegister(endpoint, servicePrefix, serviceKey, port)
	err := register.Register(60)
	if err != nil {
		fmt.Printf("err : %+v\n", err)
	}
	t.Run("test 2", TestRegister2)

}

func TestRegister2(t *testing.T) {
	endpoint := []string{"http://localhost:2379"}
	servicePrefix := "/web/userService/"
	serviceKey := servicePrefix + "node2"
	port := "port:1111"

	register := NewServiceRegister(endpoint, servicePrefix, serviceKey, port)
	err := register.Register(60)
	if err != nil {
		fmt.Printf("err : %+v\n", err)
	}
	time.Sleep(time.Minute)
}

程序绿了 这个时候通过etcd客户端查看

可以看到对应的值已经设置进去 也绑定在同一个租约下面

服务发现

go语言实现

通过ETCD的watch可以动态的感知kv的变化,作为服务发现的核心

ServiceDiscover
type ServiceDiscover struct {
	//etcd client
	cli *clientv3.Client
	// service list
	serviceList map[string]string
	// lock
	lock        sync.Mutex
}
NewServiceDiscover
func NewServiceDiscover(endpoint []string) *ServiceDiscover {

	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoint,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal("etcd client connect faith...")
	}

	return &ServiceDiscover{
		cli:         cli,
		serviceList: map[string]string{},
		lock:        sync.Mutex{},
	}

}
DiscoverService
// 服务发现
func (s *ServiceDiscover) DiscoverService(servicePrefix string) (err error) {
	kvCli := clientv3.NewKV(s.cli)
    // 先通过前缀查询所有val
	valResp, err := kvCli.Get(context.Background(), servicePrefix, clientv3.WithPrefix())
	if err != nil {
		log.Fatal("etcd error")
		return err
	}
	
    // 服务未找到
	if len(valResp.Kvs) == 0 {
		return errors.New("service not found")
	}
	
    // 将所有服务遍历进list
	for _, kv := range valResp.Kvs {
		s.PutServiceInList(string(kv.Key), string(kv.Value))
	}
	
    // 获取etcd的watch
	watcher := clientv3.NewWatcher(s.cli)
    
    // 通过watch住服务的前缀 对应上文的 "/web/userService/"
    // 会检测该前缀下的所有变化
	watchChan := watcher.Watch(context.Background(), servicePrefix, clientv3.WithPrefix())
	
    // 启一个协程进行监视
	go func() {
		for watchResponse := range watchChan {
			for _, e := range watchResponse.Events {
				switch e.Type {
                    // [新增 | 修改]
				case clientv3.EventTypePut:
					s.PutServiceInList(string(e.Kv.Key), string(e.Kv.Value))
                    // 删除
				case clientv3.EventTypeDelete:
					s.RemoveServiceInList(string(e.Kv.Key))
				}
			}
		}
	}()
	return nil
}
PutServiceInListRemoveServiceInList
func (s *ServiceDiscover) PutServiceInList(k, v string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.serviceList[k] = v
    
    // 作为调试使用
	log.Println("svc :: ", k, " endpoint :: ", v)
	for k, v := range s.serviceList {
		println(k, " -- ", v)
	}
}

func (s *ServiceDiscover) RemoveServiceInList(k string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	delete(s.serviceList, k)
    
    // 作为调试使用
	log.Println("remove svc :: ", k)
}

现在可以开始测试

func TestDiscover(t *testing.T) {
	endpoint := []string{"http://localhost:2379"}
	discover := NewServiceDiscover(endpoint)
	servicePrefix := "/web/userService/"
	err := discover.DiscoverService(servicePrefix)
	if err != nil {
		fmt.Printf("err : %+v\n", err)
		return
	}

	for k, v := range discover.serviceList {
		println(k, " :: ", v)
	}

	time.Sleep(time.Minute)
}
TestRegister1
TestDiscover

观察控制台 即可打印出信息

/web/userService/node1 :: port:8888
/web/userService/node2 :: port:1111