背景

近期在看开源项目CloudWeGo中看到目前GoLang微服务框架Hertz中支持通过Redis实现服务注册与服务发现功能。便想着阅读下源码

源码阅读

git clone了hertz-contrib后看到在一级目录下有目前各种主流的服务注册与发现的实现方案。为了便于学习选择阅读redis

图片.png

服务注册源码分析

看到redis/example/server/main.go中有服务注册的实现示例


func main() {
    r := redis.NewRedisRegistry("127.0.0.1:6379")
    addr := "127.0.0.1:8888"
    h := server.Default(
        server.WithHostPorts(addr),
        server.WithRegistry(r, &registry.Info{
            ServiceName: "hertz.test.demo",
            Addr:        utils.NewNetAddr("tcp", addr),
            Weight:      10,
            Tags:        nil,
        }),
    )
    h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) {
        ctx.JSON(consts.StatusOK, utils.H{"ping": "pong"})
    })
    h.Spin()
}


复制代码

代码主要逻辑是实现一个简单的webservice,其中用到了服务注册机制。可以看到,在hertz中服务注册可以通过配置engine的形式在webservice初始化时定义,其中


r := redis.NewRedisRegistry("127.0.0.1:6379")

复制代码

定义了一个服务注册的地址,即要把这个微服务注册到哪个主机中。而server.WithRegistry()使得服务初始化时引入了这个服务注册。Info即是服务注册的相关信息

进入redis/registry.go查看服务注册的定义,可以看到redis服务注册是实现的registry.Registry接口

var _ registry.Registry = (*redisRegistry)(nil)

  


type redisRegistry struct {

    client *redis.Client

    rctx   *registryContext

    mu     sync.Mutex

    wg     sync.WaitGroup

}

  


type registryContext struct {

    ctx    context.Context

    cancel context.CancelFunc

}

  


// Registry is extension interface of service registry.

type Registry interface {

    Register(info *Info) error

    Deregister(info *Info) error

}

  


// Info is used for registry.

// The fields are just suggested, which is used depends on design.

type Info struct {

    // ServiceName will be set in hertz by default

    ServiceName string

    // Addr will be set in hertz by default

    Addr net.Addr

    // Weight will be set in hertz by default

    Weight int

    // extend other infos with Tags.

    Tags map[string]string

}
复制代码

registry.Registry通过Register(info *Info)和Deregister(info *Info)描述服务注册与服务发现

接下来看如何创建一个redis服务注册


// NewRedisRegistry creates a redis registry

func NewRedisRegistry(addr string, opts ...Option) registry.Registry {

    redisOpts := &redis.Options{

        Addr:     addr,

        Password: "",

        DB:       0,

    }

    for _, opt := range opts {

        opt(redisOpts)

    }

    rdb := redis.NewClient(redisOpts)

    return &redisRegistry{

        client: rdb,

    }

}

复制代码

我们已经可以猜到了,配置redis客户端连接User Server的redis,用redis来存储服务映射关系,实现服务注册中心,那么是不是这样呢,我们接着往下看服务注册的实现源码


func (r *redisRegistry) Register(info *registry.Info) error {

  // 校验配置信息

    if err := validateRegistryInfo(info); err != nil {

        return err

    }

    rctx := registryContext{}

    rctx.ctx, rctx.cancel = context.WithCancel(context.Background())

    m := newMentor()

    r.wg.Add(1)

    // 并发监控redis

    go m.subscribe(rctx.ctx, info, r)

    r.wg.Wait()

    rdb := r.client

    // 将注册信息hash化

    hash, err := prepareRegistryHash(info)

    if err != nil {

        return err

    }

    // 上锁

    r.mu.Lock()

    r.rctx = &rctx

    // 注册信息写入到redis,即我们的服务注册中心

    rdb.HSet(rctx.ctx, hash.key, hash.field, hash.value)

    rdb.Expire(rctx.ctx, hash.key, defaultExpireTime)

    // 生成服务相关信息和发送

    rdb.Publish(rctx.ctx, hash.key, generateMsg(register, info.ServiceName, info.Addr.String()))

    // 写完,解锁

    r.mu.Unlock()

    go m.monitorTTL(rctx.ctx, hash, info, r)

    // 保持长连接

    go keepAlive(rctx.ctx, hash, r)

    return nil

}

复制代码

Register方法已经对服务注册的主要流程进行了描述,下面来看一些细节


func validateRegistryInfo(info *registry.Info) error {

    if info == nil {

        return fmt.Errorf("registry.Info can not be empty")

    }

    if info.ServiceName == "" {

        return fmt.Errorf("registry.Info ServiceName can not be empty")

    }

    if info.Addr == nil {

        return fmt.Errorf("registry.Info Addr can not be empty")

    }

    return nil

}

复制代码

校验服务注册时并不会对客户端是否连接上进行校验,只会校验参数和结构体是否为空


func prepareRegistryHash(info *registry.Info) (*registryHash, error) {

    meta, err := json.Marshal(convertInfo(info))

    if err != nil {

        return nil, err

    }

    return &registryHash{

        key:   generateKey(info.ServiceName, server),

        field: info.Addr.String(),

        value: string(meta),

    }, nil

}

  


复制代码

服务注册信息hash即生成key-velue,方便写入到redis中


func keepAlive(ctx context.Context, hash *registryHash, r *redisRegistry) {

    ticker := time.NewTicker(defaultTickerTime)

    defer ticker.Stop()

    for {

        select {

        case <-ticker.C:

            r.client.Expire(ctx, hash.key, defaultKeepAliveTime)

        case <-ctx.Done():

            break

        }

    }

}

  


复制代码

最后再起一个协程在生命期内监听保持长连接,这里用到的是多路复用


func keepAlive(ctx context.Context, hash *registryHash, r *redisRegistry) {

    ticker := time.NewTicker(defaultTickerTime)

    defer ticker.Stop()

    for {

        select {

        case <-ticker.C:

            r.client.Expire(ctx, hash.key, defaultKeepAliveTime)

        case <-ctx.Done():

            break

        }

    }

}

  


复制代码

再来看服务注册退出:


func (r *redisRegistry) Deregister(info *registry.Info) error {

    if err := validateRegistryInfo(info); err != nil {

        return err

    }

    rctx := r.rctx

    rdb := r.client

    hash, err := prepareRegistryHash(info)

    if err != nil {

        return err

    }

    r.mu.Lock()

    // 删除redis中的注册信息

    rdb.HDel(rctx.ctx, hash.key, hash.field)

    rdb.Publish(rctx.ctx, hash.key, generateMsg(deregister, info.ServiceName, info.Addr.String()))

    rctx.cancel()

    r.mu.Unlock()

    return nil

}

  


复制代码

整体逻辑和服务注册相似,只是最后把注册信息删掉

服务发现源码分析

看到redis/example/client/main.go中有服务注册的实现示例


func main() {

    cli, err := client.NewClient()

    if err != nil {

        panic(err)

    }

    r := redis.NewRedisResolver("127.0.0.1:6379")

    cli.Use(sd.Discovery(r))

    for i := 0; i < 10; i++ {

        status, body, err := cli.Get(context.Background(), nil, "http://hertz.test.demo/ping", config.WithSD(true))

        if err != nil {

            hlog.Fatal(err)

        }

        hlog.Infof("HERTZ: code=%d,body=%s", status, string(body))

    }

}

  


复制代码

config.WithSD(true)即通过中间件形式,使得客户端发送请求时,并非直接请求服务器,而是请求注册中心,通过服务发现再进一步转到服务器上

接前文中在redis里进行了服务注册,这里客户端想要进行服务发现找到自己请求的微服务。这里服务发现还是通过复用接口实现的


var _ discovery.Resolver = (*redisResolver)(nil)

  


type redisResolver struct {

    client *redis.Client

}

  


// NewRedisResolver creates a redis resolver

func NewRedisResolver(addr string, opts ...Option) discovery.Resolver {

    redisOpts := &redis.Options{Addr: addr}

    for _, opt := range opts {

        opt(redisOpts)

    }

    rdb := redis.NewClient(redisOpts)

    return &redisResolver{

        client: rdb,

    }

}

复制代码

服务发现开始和服务注册一样,需要先连接上redis


func (r *redisResolver) Target(_ context.Context, target *discovery.TargetInfo) string {

    return target.Host

}

  


func (r *redisResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {

    rdb := r.client

    // 查询服务列表

    fvs := rdb.HGetAll(ctx, generateKey(desc, server)).Val()

    var its []discovery.Instance

    for f, v := range fvs {

      // 反序列化获取服务信息

        var ri registryInfo

        err := json.Unmarshal([]byte(v), &ri)

        if err != nil {

            hlog.Warnf("HERTZ: fail to unmarshal with err: %v, ignore instance Addr: %v", err, f)

            continue

        }

        // 负载均衡参数

        weight := ri.Weight

        if weight <= 0 {

            weight = defaultWeight

        }

        its = append(its, discovery.NewInstance(tcp, ri.Addr, weight, ri.Tags))

    }

    return discovery.Result{

      // 服务发现的结果

        CacheKey:  desc,//redis表中的key

        Instances: its,//服务表

    }, nil

}

  


func (r *redisResolver) Name() string {

    return Redis

}

  


复制代码

Target、Name、Resolve即为实现自方法的接口,其中target和Name分别解出redis的地址和Name,Resolve方法用来在Redis中发现服务

我们还可以细扣一下,服务发现中间件进一步是怎么实现的?

/pkg/mod/github.com/cloudwego/hertz@v0.6.0/pkg/common/config/request_option.go:58中WithSD如下:


// WithSD set isSD in RequestOptions.

func WithSD(b bool) RequestOption {

    return RequestOption{F: func(o *RequestOptions) {

        o.isSD = b

    }}

}

复制代码

可见这里是用来高速请求,这个请求是有服务发现机制的。循着client.Get()方法一路往下找,这项配置写入到了req中:


func GetURL(ctx context.Context, dst []byte, url string, c Doer, requestOptions ...config.RequestOption) (statusCode int, body []byte, err error) {

    req := protocol.AcquireRequest()

    req.SetOptions(requestOptions...)

  


    statusCode, body, err = doRequestFollowRedirectsBuffer(ctx, req, dst, url, c)

  


    protocol.ReleaseRequest(req)

    return statusCode, body, err

}

复制代码

在hertz中的Request定义中其实是包含有config定义,里面就有sd的flag


type Request struct {

    noCopy nocopy.NoCopy //lint:ignore U1000 until noCopy is used

  


    Header RequestHeader

  


    uri      URI

    postArgs Args

  


    bodyStream      io.Reader

    w               requestBodyWriter

    body            *bytebufferpool.ByteBuffer

    bodyRaw         []byte

    maxKeepBodySize int

  


    multipartForm         *multipart.Form

    multipartFormBoundary string

  


    // Group bool members in order to reduce Request object size.

    parsedURI      bool

    parsedPostArgs bool

  


    isTLS bool

  


    multipartFiles  []*File

    multipartFields []*MultipartField

  


    // Request level options, service discovery options etc.

    options *config.RequestOptions

}

  


复制代码

也就是会从req中解析出服务地址