前提搭建consul集群,至少需要一个consul agent

consul服务注册与发现相关文档见官网:

 

服务注册

 

服务注销

 

服务发现

 

 

代码实现

服务注册与发现

discovery/discovery_client.go

实现服务注册、服务注销和服务发现的通用功能

package discovery

import (
	"log"
	"fmt"
	"time"
	"bytes"
	"strconv"
	"context"
	"net/http"
	"encoding/json"
)

// 服务实例结构体
type InstanceInfo struct {
	ID                string                     `json:"ID"`                // 服务实例ID
	Service           string                     `json:"Service,omitempty"` // 服务发现时返回的服务名
	Name              string                     `json:"Name"`              // 服务名
	Tags              []string                   `json:"Tags,omitempty"`    // 标签,可用于进行服务过滤
	Address           string                     `json:"Address"`           // 服务实例HOST
	Port              int                        `json:"Port"`              // 服务实例端口
	Meta              map[string]string          `json:"Meta,omitempty"`    // 元数据
	EnableTagOverride bool                       `json:"EnableTagOverride"` // 是否允许标签覆盖
	Check             `json:"Check,omitempty"`   // 健康检查相关配置
	Weights           `json:"Weights,omitempty"` // 权重
	CurWeight         int                        `json:"CurWeights,omitempty"` // 当前权重
}

//健康检查相关的配置
type Check struct {
	DeregisterCriticalServiceAfter string   `json:"DeregisterCriticalServiceAfter"` // 多久之后注销服务
	Args                           []string `json:"Args,omitempty"`                 // 请求参数
	HTTP                           string   `json:"HTTP"`                           // 健康检查地址
	Interval                       string   `json:"Interval,omitempty"`             // Consul 主动检查间隔
	TTL                            string   `json:"TTL,omitempty"`                  // 服务实例主动维持心跳间隔,与Interval只存其一
}

//权重
type Weights struct {
	Passing int `json:"Passing"`
	Warning int `json:"Warning"`
}

//服务发现的客户端
type DiscoveryClient struct {
	host string // Consul 的 Host
	port int    // Consul 的 端口
}

func NewDiscoveryClient(host string, port int) *DiscoveryClient {
	return &DiscoveryClient{
		host: host,
		port: port,
	}
}

//服务注册
func (consulClient *DiscoveryClient) Register(ctx context.Context, serviceName, instanceId, healthCheckUrl string, instanceHost string, instancePort int, meta map[string]string, weights *Weights) error {

	instanceInfo := &InstanceInfo{
		ID:                instanceId,
		Name:              serviceName,
		Address:           instanceHost,
		Port:              instancePort,
		Meta:              meta,
		EnableTagOverride: false,
		Check: Check{
			DeregisterCriticalServiceAfter: "30s",
			HTTP:                           "http://" + instanceHost + ":" + strconv.Itoa(instancePort) + healthCheckUrl,
			Interval:                       "15s",
		},
	}

	if weights != nil {
		instanceInfo.Weights = *weights
	} else {
		instanceInfo.Weights = Weights{
			Passing: 10,
			Warning: 1,
		}
	}

	byteData, err := json.Marshal(instanceInfo)

	if err != nil {
		log.Printf("json format err: %s", err)
		return err
	}

	req, err := http.NewRequest("PUT",
		"http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/register",
		bytes.NewReader(byteData))

	if err != nil {
		return err
	}

	req.Header.Set("Content-Type", "application/json;charset=UTF-8")
	client := http.Client{}
	client.Timeout = time.Second * 2
	resp, err := client.Do(req)

	if err != nil {
		log.Printf("register service err : %s", err)
		return err
	}

	defer resp.Body.Close()
	if resp.StatusCode != 200 {
		log.Printf("register service http request errCode : %v", resp.StatusCode)
		return fmt.Errorf("register service http request errCode : %v", resp.StatusCode)
	}

	log.Println("register service success")
	return nil
}

//服务注销
func (consulClient *DiscoveryClient) Deregister(ctx context.Context, instanceId string) error {
	req, err := http.NewRequest("PUT",
		"http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/deregister/"+instanceId, nil)

	if err != nil {
		log.Printf("req format err: %s", err)
		return err
	}

	client := http.Client{}
	client.Timeout = time.Second * 2

	resp, err := client.Do(req)

	if err != nil {
		log.Printf("deregister service err : %s", err)
		return err
	}

	resp.Body.Close()

	if resp.StatusCode != 200 {
		log.Printf("deresigister service http request errCode : %v", resp.StatusCode)
		return fmt.Errorf("deresigister service http request errCode : %v", resp.StatusCode)
	}

	log.Println("deregister service success")

	return nil
}

//服务发现
func (consulClient *DiscoveryClient) DiscoverServices(ctx context.Context, serviceName string) ([]*InstanceInfo, error) {

	req, err := http.NewRequest("GET",
		"http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/health/service/"+serviceName, nil)

	if err != nil {
		log.Printf("req format err: %s", err)
		return nil, err
	}

	client := http.Client{}
	client.Timeout = time.Second * 2

	resp, err := client.Do(req)
	if err != nil {
		log.Printf("discover service err : %s", err)
		return nil, err
	}

	defer resp.Body.Close()
	if resp.StatusCode != 200 {
		log.Printf("discover service http request errCode : %v", resp.StatusCode)
		return nil, fmt.Errorf("discover service http request errCode : %v", resp.StatusCode)
	}

	var serviceList []struct {
		Service InstanceInfo `json:"Service"`
	}
	err = json.NewDecoder(resp.Body).Decode(&serviceList)
	if err != nil {
		log.Printf("format service info err : %s", err)
		return nil, err
	}

	instances := make([]*InstanceInfo, len(serviceList))
	for i := 0; i < len(instances); i++ {
		instances[i] = &serviceList[i].Service
	}
	return instances, nil

}


 

服务注册代码示例

register.go  (注意修改consul和服务器参数配置)

package main

import (
	"io"
	"os"
	"log"
	"fmt"
	"flag"
	"syscall"
	"strings"
	"context"
	"net/http"
	"os/signal"

	"consul_study /discovery"
	
	"github.com/google/uuid"
)

func main() {
	consulAddr := flag.String("consul.addr", "192.168.20.165", "consul address")  //consul addr
	consulPort := flag.Int("consul.port", 8500, "consul port") //consul端口
	servicePort := flag.Int("service.port", 12310, "service port")  //服务端口
	serviceName := flag.String("service.name", "instance", "service name")  //服务名称
	serviceAddr := flag.String("service.addr", "192.168.7.41", "service addr")  //服务地址


	flag.Parse()
	ctx,_ := context.WithCancel(context.Background())

	instanceId := *serviceName + "-" + strings.Replace(uuid.New().String(),"-","",-1)

	client := discovery.NewDiscoveryClient(*consulAddr, *consulPort) //获取consul客户端,用于服务注册和发现

	//将服务注册到consul中
	err := client.Register(ctx , *serviceName, instanceId, "/health", *serviceAddr, *servicePort, nil, nil)

	if err != nil {
		log.Fatal(err)
	}

	//开启http服务,并注册handle
	go func() {
		http.HandleFunc("/health",checkHealth)
		http.ListenAndServe(fmt.Sprintf("%s:%d",*serviceAddr,*servicePort),nil)
	}()

	// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
	c := make(chan os.Signal, 1)
	go func() {
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	}()
	log.Printf("exit %s", <-c)
	client.Deregister(ctx,instanceId)
	log.Printf("Deregister service %s",instanceId)

}

func checkHealth(w http.ResponseWriter, _ *http.Request) {
	io.WriteString(w,"SUCCESS")
}

 

服务发现代码示例

discovery.go  (注意修改consul和服务器参数配置)


package main

import (
	"os"
	"log"
	"fmt"
	"flag"
	"time"
	"syscall"
	"context"
	"os/signal"
	
	"consul_study/discovery"
)

func main() {
	consulAddr := flag.String("consul.addr", "192.168.20.165", "consul address")  //consul addr
	consulPort := flag.Int("consul.port", 8500, "consul port") //consul端口
	serviceName := flag.String("service.name", "instance", "service name")  //服务名称



	flag.Parse()
	ctx,_ := context.WithCancel(context.Background())

	client := discovery.NewDiscoveryClient(*consulAddr, *consulPort) //获取consul客户端,用于服务注册和发现


	c := make(chan os.Signal,1)
	go func() {
		signal.Notify(c,syscall.SIGINT,syscall.SIGTERM)
	}()

	//定时刷新示例
	ticker := time.NewTicker(time.Second*5)

	for {
		select {
		case <- ticker.C:
			instance,err := client.DiscoverServices(ctx,*serviceName)
			if err != nil {
				log.Fatal(err)
			}

			fmt.Println("get instance num",len(instance))
			for _,v := range instance {
				fmt.Printf("instanceID:%s,address:%s,port:%d\n",v.ID,v.Address,v.Port)
			}

		case <-c:
			log.Println("discovery service exit!")
		}
	}

}

测试

 

1.我们先启动服务发现模块

> go run discovery.go

此时没有可用的服务实例

 

 

2.启动一个服务注册模块(我们称为instance1)

> go run register.go -service.port=12310

启动成功后我们观察服务发现模块的日志,发现了一个实例

 

3. 再启动一个服务注册模块(我们称为instance2)

> go run register.go -service.port=12311

 

 

此时我们通过consul的控制台也可以发现注册到consul的instance1和instance2实例信息

 

 

以上部分代码摘自以下课程,如有兴趣可以扫码学习