前提搭建consul集群,至少需要一个consul agent
consul服务注册与发现相关文档见官网:
服务注册

服务注销

服务发现

代码实现
服务注册与发现
discovery/discovery_client.go
实现服务注册、服务注销和服务发现的通用功能
package discovery
import (
"log"
"fmt"
"time"
"bytes"
"strconv"
"context"
"net/http"
"encoding/json"
)
// 服务实例结构体
type InstanceInfo struct {
ID string `json:"ID"` // 服务实例ID
Service string `json:"Service,omitempty"` // 服务发现时返回的服务名
Name string `json:"Name"` // 服务名
Tags []string `json:"Tags,omitempty"` // 标签,可用于进行服务过滤
Address string `json:"Address"` // 服务实例HOST
Port int `json:"Port"` // 服务实例端口
Meta map[string]string `json:"Meta,omitempty"` // 元数据
EnableTagOverride bool `json:"EnableTagOverride"` // 是否允许标签覆盖
Check `json:"Check,omitempty"` // 健康检查相关配置
Weights `json:"Weights,omitempty"` // 权重
CurWeight int `json:"CurWeights,omitempty"` // 当前权重
}
//健康检查相关的配置
type Check struct {
DeregisterCriticalServiceAfter string `json:"DeregisterCriticalServiceAfter"` // 多久之后注销服务
Args []string `json:"Args,omitempty"` // 请求参数
HTTP string `json:"HTTP"` // 健康检查地址
Interval string `json:"Interval,omitempty"` // Consul 主动检查间隔
TTL string `json:"TTL,omitempty"` // 服务实例主动维持心跳间隔,与Interval只存其一
}
//权重
type Weights struct {
Passing int `json:"Passing"`
Warning int `json:"Warning"`
}
//服务发现的客户端
type DiscoveryClient struct {
host string // Consul 的 Host
port int // Consul 的 端口
}
func NewDiscoveryClient(host string, port int) *DiscoveryClient {
return &DiscoveryClient{
host: host,
port: port,
}
}
//服务注册
func (consulClient *DiscoveryClient) Register(ctx context.Context, serviceName, instanceId, healthCheckUrl string, instanceHost string, instancePort int, meta map[string]string, weights *Weights) error {
instanceInfo := &InstanceInfo{
ID: instanceId,
Name: serviceName,
Address: instanceHost,
Port: instancePort,
Meta: meta,
EnableTagOverride: false,
Check: Check{
DeregisterCriticalServiceAfter: "30s",
HTTP: "http://" + instanceHost + ":" + strconv.Itoa(instancePort) + healthCheckUrl,
Interval: "15s",
},
}
if weights != nil {
instanceInfo.Weights = *weights
} else {
instanceInfo.Weights = Weights{
Passing: 10,
Warning: 1,
}
}
byteData, err := json.Marshal(instanceInfo)
if err != nil {
log.Printf("json format err: %s", err)
return err
}
req, err := http.NewRequest("PUT",
"http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/register",
bytes.NewReader(byteData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json;charset=UTF-8")
client := http.Client{}
client.Timeout = time.Second * 2
resp, err := client.Do(req)
if err != nil {
log.Printf("register service err : %s", err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Printf("register service http request errCode : %v", resp.StatusCode)
return fmt.Errorf("register service http request errCode : %v", resp.StatusCode)
}
log.Println("register service success")
return nil
}
//服务注销
func (consulClient *DiscoveryClient) Deregister(ctx context.Context, instanceId string) error {
req, err := http.NewRequest("PUT",
"http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/deregister/"+instanceId, nil)
if err != nil {
log.Printf("req format err: %s", err)
return err
}
client := http.Client{}
client.Timeout = time.Second * 2
resp, err := client.Do(req)
if err != nil {
log.Printf("deregister service err : %s", err)
return err
}
resp.Body.Close()
if resp.StatusCode != 200 {
log.Printf("deresigister service http request errCode : %v", resp.StatusCode)
return fmt.Errorf("deresigister service http request errCode : %v", resp.StatusCode)
}
log.Println("deregister service success")
return nil
}
//服务发现
func (consulClient *DiscoveryClient) DiscoverServices(ctx context.Context, serviceName string) ([]*InstanceInfo, error) {
req, err := http.NewRequest("GET",
"http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/health/service/"+serviceName, nil)
if err != nil {
log.Printf("req format err: %s", err)
return nil, err
}
client := http.Client{}
client.Timeout = time.Second * 2
resp, err := client.Do(req)
if err != nil {
log.Printf("discover service err : %s", err)
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Printf("discover service http request errCode : %v", resp.StatusCode)
return nil, fmt.Errorf("discover service http request errCode : %v", resp.StatusCode)
}
var serviceList []struct {
Service InstanceInfo `json:"Service"`
}
err = json.NewDecoder(resp.Body).Decode(&serviceList)
if err != nil {
log.Printf("format service info err : %s", err)
return nil, err
}
instances := make([]*InstanceInfo, len(serviceList))
for i := 0; i < len(instances); i++ {
instances[i] = &serviceList[i].Service
}
return instances, nil
}
服务注册代码示例
register.go (注意修改consul和服务器参数配置)
package main
import (
"io"
"os"
"log"
"fmt"
"flag"
"syscall"
"strings"
"context"
"net/http"
"os/signal"
"consul_study /discovery"
"github.com/google/uuid"
)
func main() {
consulAddr := flag.String("consul.addr", "192.168.20.165", "consul address") //consul addr
consulPort := flag.Int("consul.port", 8500, "consul port") //consul端口
servicePort := flag.Int("service.port", 12310, "service port") //服务端口
serviceName := flag.String("service.name", "instance", "service name") //服务名称
serviceAddr := flag.String("service.addr", "192.168.7.41", "service addr") //服务地址
flag.Parse()
ctx,_ := context.WithCancel(context.Background())
instanceId := *serviceName + "-" + strings.Replace(uuid.New().String(),"-","",-1)
client := discovery.NewDiscoveryClient(*consulAddr, *consulPort) //获取consul客户端,用于服务注册和发现
//将服务注册到consul中
err := client.Register(ctx , *serviceName, instanceId, "/health", *serviceAddr, *servicePort, nil, nil)
if err != nil {
log.Fatal(err)
}
//开启http服务,并注册handle
go func() {
http.HandleFunc("/health",checkHealth)
http.ListenAndServe(fmt.Sprintf("%s:%d",*serviceAddr,*servicePort),nil)
}()
// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
c := make(chan os.Signal, 1)
go func() {
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
}()
log.Printf("exit %s", <-c)
client.Deregister(ctx,instanceId)
log.Printf("Deregister service %s",instanceId)
}
func checkHealth(w http.ResponseWriter, _ *http.Request) {
io.WriteString(w,"SUCCESS")
}
服务发现代码示例
discovery.go (注意修改consul和服务器参数配置)
package main
import (
"os"
"log"
"fmt"
"flag"
"time"
"syscall"
"context"
"os/signal"
"consul_study/discovery"
)
func main() {
consulAddr := flag.String("consul.addr", "192.168.20.165", "consul address") //consul addr
consulPort := flag.Int("consul.port", 8500, "consul port") //consul端口
serviceName := flag.String("service.name", "instance", "service name") //服务名称
flag.Parse()
ctx,_ := context.WithCancel(context.Background())
client := discovery.NewDiscoveryClient(*consulAddr, *consulPort) //获取consul客户端,用于服务注册和发现
c := make(chan os.Signal,1)
go func() {
signal.Notify(c,syscall.SIGINT,syscall.SIGTERM)
}()
//定时刷新示例
ticker := time.NewTicker(time.Second*5)
for {
select {
case <- ticker.C:
instance,err := client.DiscoverServices(ctx,*serviceName)
if err != nil {
log.Fatal(err)
}
fmt.Println("get instance num",len(instance))
for _,v := range instance {
fmt.Printf("instanceID:%s,address:%s,port:%d\n",v.ID,v.Address,v.Port)
}
case <-c:
log.Println("discovery service exit!")
}
}
}
测试
1.我们先启动服务发现模块
> go run discovery.go

此时没有可用的服务实例
2.启动一个服务注册模块(我们称为instance1)
> go run register.go -service.port=12310
启动成功后我们观察服务发现模块的日志,发现了一个实例

3. 再启动一个服务注册模块(我们称为instance2)
> go run register.go -service.port=12311

此时我们通过consul的控制台也可以发现注册到consul的instance1和instance2实例信息

以上部分代码摘自以下课程,如有兴趣可以扫码学习
