https://github.com/alibaba/nacos 是阿里开源的服务发现和配置同步组件,上手非常容易,我们介绍下如何部署,然后看下nacos提供的golang sdk:https://github.com/nacos-group/nacos-sdk-go如何使用,分析下具体的源码实现。
docker run --name nacos-quick -e MODE=standalone -p 8848:8848 -p 9848:9848 -d nacos/nacos-server:2.0.2Unable to find image 'nacos/nacos-server:2.0.2' locally2.0.2: Pulling from nacos/nacos-server9a03b1668b6d: Pull completeDigest: sha256:ac66d2fbc1ba432beff88beb165e5012686863d72a5e0f25da06e23c5e7b329dStatus: Downloaded newer image for nacos/nacos-server:2.0.2db9558d41223b12bd58f2c120ead7d506a50bd40327a3fc6518178b27e50dd99
在nacos 1.X的版本中使用http方式来做服务注册和发现,配置主端口(默认8848);在2.0版本支持了grpc 服务发现:9848 是客户端gRPC请求服务端端口,用于客户端向服务端发起连接和请求9849是服务端gRPC请求服务端端口,用于服务间同步等。
我们实现一个服务注册
curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=20.18.7.10&port=8080'ok
拉取注册结果
curl -X GET 'http://127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=nacos.naming.serviceName'{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[],"lastRefTime":1667400684240,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[{"instanceId":"20.18.7.10#8080#DEFAULT#DEFAULT_GROUP@@nacos.naming.serviceName","ip":"20.18.7.10","port":8080,"weight":1.0,"healthy":true,"enabled":true,"ephemeral":true,"clusterName":"DEFAULT","serviceName":"DEFAULT_GROUP@@nacos.naming.serviceName","metadata":{},"instanceHeartBeatInterval":5000,"instanceIdGenerator":"simple","ipDeleteTimeout":30000,"instanceHeartBeatTimeOut":15000}],"lastRefTime":1667400719947,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}
同样我们也可以使用nacos的配置中心功能,发布配置
curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test&content=helloWorld"true
获取配置
curl -X GET "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test"helloWorld
其实golang的sdk就是基于上述api做的封装来实现服务注册与发现的。
我们定义一个服务
syntax = "proto3";import "google/protobuf/empty.proto";package grpcnacos;option go_package = ".;grpcnacos";service Test{rpc Test(google.protobuf.Empty) returns( TestResponse) {};}message TestResponse{string msg = 1;}
生成对应的golang代码
mkdir -p ../pkg/protocol/grpcnacosprotoc --go_out=../pkg/protocol/grpcnacos --go_opt=paths=source_relative --go-grpc_out=../pkg/protocol/grpcnacos --go-grpc_opt=paths=source_relative grpcnacos.proto
定义grpc服务的实现逻辑
package serviceimport ("context""learn/learn/Nacos/pkg/protocol/grpcnacos""log"emptypb "google.golang.org/protobuf/types/known/emptypb")type Service struct {grpcnacos.UnimplementedTestServer}func (s Service) Test(ctx context.Context, empty *emptypb.Empty) (*grpcnacos.TestResponse, error) {log.Println("收到一个请求")return &grpcnacos.TestResponse{Msg: "test"}, nil}
注册我们的服务
package mainimport ("fmt""learn/learn/Nacos/exp1/service""learn/learn/Nacos/pkg/protocol/grpcnacos""log""net""github.com/nacos-group/nacos-sdk-go/clients""github.com/nacos-group/nacos-sdk-go/common/constant""github.com/nacos-group/nacos-sdk-go/vo""google.golang.org/grpc")func main() {server := grpc.NewServer()service := service.Service{}grpcnacos.RegisterTestServer(server, service)port := GenFreePort()listen, err := net.Listen("tcp", fmt.Sprintf(":%d", port))if err != nil {log.Fatalf("监听端口:%d失败: %s", port, err.Error())}// 创建serverConfig// 支持多个;至少一个ServerConfigserverConfig := []constant.ServerConfig{{IpAddr: "127.0.0.1",Port: 8848,},}// 创建clientConfigclientConfig := constant.ClientConfig{NamespaceId: "", // 如果需要支持多namespace,我们可以场景多个client,它们有不同的NamespaceId。当namespace是public时,此处填空字符串。TimeoutMs: 50000,NotLoadCacheAtStart: true,LogLevel: "debug",}// 创建服务发现客户端的另一种方式 (推荐)namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ClientConfig: &clientConfig,ServerConfigs: serverConfig,},)if err != nil {log.Fatalf("初始化nacos失败: %s", err.Error())}success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{Ip: "127.0.0.1",Port: uint64(port),ServiceName: "test-server",Weight: 10,Enable: true,Healthy: true,Ephemeral: true,Metadata: map[string]string{"name": "test"},ClusterName: "DEFAULT", // 默认值DEFAULTGroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUP})if err != nil {log.Fatalf("注册服务失败: %s", err.Error())}log.Println("success: ", success)log.Printf("服务启动成功;PORT:%d\n", port)_ = server.Serve(listen)}// GenFreePort 获取一个空闲的端口;端口避免写死,因为要启动多个实例,测试负载均衡func GenFreePort() int {addr, err := net.ResolveTCPAddr("tcp", "localhost:0")if err != nil {panic(err)}listen, err := net.ListenTCP("tcp", addr)if err != nil {panic(err)}defer listen.Close()return listen.Addr().(*net.TCPAddr).Port}
通过名字获取服务的实力,请求获取结果
package mainimport ("context""fmt""learn/learn/Nacos/pkg/protocol/grpcnacos""log""github.com/nacos-group/nacos-sdk-go/clients""github.com/nacos-group/nacos-sdk-go/common/constant""github.com/nacos-group/nacos-sdk-go/vo""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""google.golang.org/protobuf/types/known/emptypb")func main() {// 创建serverConfig// 支持多个;至少一个ServerConfigserverConfig := []constant.ServerConfig{{IpAddr: "127.0.0.1",Port: 8848,},}// 创建clientConfigclientConfig := constant.ClientConfig{// 如果需要支持多namespace,我们可以场景多个client,它们有不同的NamespaceId。当namespace是public时,此处填空字符串。NamespaceId: "",TimeoutMs: 5000,NotLoadCacheAtStart: true,LogLevel: "debug",}// 创建服务发现客户端的另一种方式 (推荐)namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ClientConfig: &clientConfig,ServerConfigs: serverConfig,},)if err != nil {log.Fatalf("初始化nacos失败: %s", err.Error())}// SelectOneHealthyInstance将会按加权随机轮询的负载均衡策略返回一个健康的实例// 实例必须满足的条件:health=true,enable=true and weight>0instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{ServiceName: "test-server",GroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUPClusters: []string{"DEFAULT"}, // 默认值DEFAULT})log.Printf("获取到的实例IP:%s;端口:%d", instance.Ip, instance.Port)conn, err := grpc.Dial(fmt.Sprintf("%s:%d", instance.Ip, instance.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("监听%s:%d失败:%s", instance.Ip, instance.Port, err.Error())}client := grpcnacos.NewTestClient(conn)res, err := client.Test(context.Background(), &emptypb.Empty{})if err != nil {log.Fatalf("调用TestClient失败: %s", err.Error())}log.Println(res.Msg)}
至此我们完成了简单的服务注册和服务发现功能。测试下
% go run learn/Nacos/exp1/server/main.go2022/11/04 00:04:38 success: true2022/11/04 00:04:38 服务启动成功;PORT:563582022/11/04 00:04:51 收到一个请求
% go run learn/Nacos/exp1/client/main.go2022/11/04 00:04:51 获取到的实例IP:127.0.0.1;端口:563582022/11/04 00:04:51 test
我们可以在页面上看下我们服务的注册情况http://127.0.0.1:8848/nacos/#/login用户名密码都是nacos


可以看到,不论是服务端注册还是客户端拉取,我们首先都需要初始化namingService的客户端,它需要两组参数
namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ClientConfig: &clientConfig,ServerConfigs: serverConfig,},)
其中clientConfig配置了客户端,也就是我们的应用允许的超时时间等配置,serverConfigs是一组服务端的地址和端口后,也就是我们的nacos服务的地址,可以配置多个实例实现多活。
对于server端来说是通过RegisterInstance来实现服务的注册的
success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{Ip: "127.0.0.1",Port: uint64(port),ServiceName: "test-server",Weight: 10,Enable: true,Healthy: true,Ephemeral: true,Metadata: map[string]string{"name": "test"},ClusterName: "DEFAULT", // 默认值DEFAULTGroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUP})
客户端是通过SelectOneHealthyInstance将会按加权随机轮询的负载均衡策略返回一个健康的实例
// 实例必须满足的条件:health=true,enable=true and weight>0instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{ServiceName: "test-server",GroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUPClusters: []string{"DEFAULT"}, // 默认值DEFAULT})log.Printf("获取到的实例IP:%s;端口:%d", instance.Ip, instance.Port)
使用起来很简单方便又没有。下面分析下源码实现,注册参数定义如下
type RegisterInstanceParam struct {Ip string `param:"ip"` //requiredPort uint64 `param:"port"` //requiredWeight float64 `param:"weight"` //required,it must be lager than 0Enable bool `param:"enabled"` //required,the instance can be access or notHealthy bool `param:"healthy"` //required,the instance is health or notMetadata map[string]string `param:"metadata"` //optionalClusterName string `param:"clusterName"` //optional,default:DEFAULTServiceName string `param:"serviceName"` //requiredGroupName string `param:"groupName"` //optional,default:DEFAULT_GROUPEphemeral bool `param:"ephemeral"` //optional}
注册的时候先生成了服务的实例信息和心跳信息,然后请求nacos服务进行注册
type Instance struct {Valid bool `json:"valid"`Marked bool `json:"marked"`InstanceId string `json:"instanceId"`Port uint64 `json:"port"`Ip string `json:"ip"`Weight float64 `json:"weight"`Metadata map[string]string `json:"metadata"`ClusterName string `json:"clusterName"`ServiceName string `json:"serviceName"`Enable bool `json:"enabled"`Healthy bool `json:"healthy"`Ephemeral bool `json:"ephemeral"`}
type BeatInfo struct {Ip string `json:"ip"`Port uint64 `json:"port"`Weight float64 `json:"weight"`ServiceName string `json:"serviceName"`Cluster string `json:"cluster"`Metadata map[string]string `json:"metadata"`Scheduled bool `json:"scheduled"`Period time.Duration `json:"-"`State int32 `json:"-"`}
具体动作的执行是通过我们初始化naming客户端的时候指定的proxy agent执行的,默认的agent是一个httpagent
func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error)_, err := sc.serviceProxy.RegisterInstance(util.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
其中注册实例是直接调用的我们前面提到的服务注册的http接口
return proxy.nacosServer.ReqApi(constant.SERVICE_PATH, params, http.MethodPost)SERVICE_BASE_PATH = "/v1/ns"SERVICE_PATH = SERVICE_BASE_PATH + "/instance"
发送心跳是单独启用了一个协程
go br.sendInstanceBeat(k, beatInfo)
如果当前实例注销,则进行停止心跳,否则进行心跳通信
beatInterval, err := br.serviceProxy.SendBeat(beatInfo)api := constant.SERVICE_BASE_PATH + "/instance/beat"result, err := proxy.nacosServer.ReqApi(api, params, http.MethodPut)
具体调用的是
SERVICE_BASE_PATH = "/v1/ns"result, err = server.callServer(api, params, method, getAddress(curServer), curServer.ContextPath)
最终调用的agent实现位于 github.com/nacos-group/nacos-sdk-go@v1.1.2/common/http_agent/http_agent.go
type HttpAgent struct {}
func (agent *HttpAgent) Getget(path, header, timeoutMs, params)
func (agent *HttpAgent) RequestOnlyResultagent.Getagent.Postagent.Putagent.Deletebytes, errRead := ioutil.ReadAll(response.Body)
func (agent *HttpAgent) Requestagent.Getagent.Postagent.Putagent.Delete
其中get实现如下
func get(path string, header http.Header, timeoutMs uint64, params map[string]string) (response *http.Response, err error)client := http.Client{}resp, errDo := client.Do(request)
客户端采用随机策略选取一个实例
func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error)service, err := sc.hostReactor.GetServiceInfo(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))return serviceName + constant.SERVICE_INFO_SPLITER + clusters
其中
SERVICE_INFO_SPLITER = "@@"
通过list方法获取服务列表
cacheService, ok := hr.serviceInfoMap.Get(key)hr.updateServiceNow(serviceName, clusters)result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)api := constant.SERVICE_PATH + "/list"return proxy.nacosServer.ReqApi(api, param, http.MethodGet)
然后解析json
SERVICE_BASE_PATH = "/v1/ns"SERVICE_PATH = SERVICE_BASE_PATH + "/instance"hr.ProcessServiceJson(result)
获取到实例列表后,就通过随机算法选取一个活着的节点
return sc.selectOneHealthyInstances(service)for _, host := range hosts {if host.Healthy && host.Enable && host.Weight > 0 {cw := int(math.Ceil(host.Weight))if cw > mw {mw = cw}result = append(result, host)chooser := newChooser(result)instance := chooser.pick()
其中选择器定义:
sort.Sort(instance(instances))return Chooser{data: instances, totals: totals, max: runningTotal}
选择算法实现:
instance := chooser.pick()r := rand.Intn(chs.max) + 1i := sort.SearchInts(chs.totals, r)return chs.data[i]

