RPC 远程方法调用
优点:
提升系统可扩展性,
提升可维护性,和吃持续交付能力
实现系统的高可用等
缺点
rpc受限于网络
实现一个rcp远程调用关键在于带里层的实现
还是贴代码吧
package client
import (
"bytes"
"fmt"
"github.com/gorilla/rpc/json"
"net/http"
"time"
)
//声明clent 链接客户端地址
type Client struct {
Address string
}
//将client 地址赋值
func New(addr string) *Client {
return &Client{
Address: addr,
}
}
//jrp实现
func (c *Client) jrpc(method string, in interface{}, out interface{}) error {
message, err := json.EncodeClientRequest(method, in)
if err != nil {
return err
}
//封装Http请求作物rpc 的载体
fmt.Println("c.address", c.Address)
req, err := http.NewRequest(http.MethodPost, c.Address, bytes.NewBuffer(message))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("请求执行失败")
return err
}
defer resp.Body.Close()
return json.DecodeClientResponse(resp.Body, out)
}
type PingMessage struct {
Payload string
}
//测试远程服务是否启动
func (c *Client) Ping(message string) (string, error) {
in := PingMessage{Payload: message}
var out PingMessage
err := c.jrpc("TEST.Ping", in, &out)
if err != nil {
fmt.Println("接口调用失败", err)
return "", err
}
return out.Payload, nil
}
//其他方法加入开箱加入即可(代理层代码)
A调用B
A层实现
package main
import (
"fmt"
"github.com/gin-gonic/gin"
"jrpc_test/client"
"log"
)
func main() {
router := gin.Default()
//router := gin.New()
router.GET("/v1/registry/sign", func(context *gin.Context) {
log.Println(">>>> hello jrpc <<<<")
_decoderClient := client.New("http://127.0.0.1:9999/rpc")
resq, err := _decoderClient.Ping("我的接口通了")
if err != nil {
fmt.Println("接口调用失败")
} else {
fmt.Println("远程调用成功", resq)
}
context.JSON(200, gin.H{
"code": 200,
"success": true,
})
})
// 指定地址和端口号
router.Run("localhost:8888")
}
B层代码实现
package main
import (
"context"
"flag"
"github.com/gorilla/rpc"
"github.com/gorilla/rpc/json"
"jrpc_test/service"
"log"
"net/http"
"os"
"os/signal"
"time"
)
var bind = flag.String("bind", ":9999", "server port")
var port = 9999
func main() {
s := rpc.NewServer()
//可传递参数配置文件参数等
w, _ := service.W.New()
s.RegisterCodec(json.NewCodec(), "application/json")
err := s.RegisterService(&service.ControlService{Work: w}, "TEST")
if err != nil {
panic(err)
}
http.Handle("/rpc", s)
srv := &http.Server{
Addr: ":9999",
}
//保持心跳,
service.W.Sign(port)
go func() {
//监听注册rpc服务
if err = srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()
//make channel 终止程序
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("程序终止")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatalf("服务终止: %v", err)
}
}
B层方法实现
package service
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"jrpc_test/client"
"net/http"
"sync"
"time"
)
type Work struct {
sync.Mutex
db *sql.DB
//可多个handler
}
type (
// SignPayload represent a request for vip-processor
SignPayload struct {
Name string `json:"server_name"`
Address string `json:"server_addr"`
Port int `json:"server_port"`
}
)
var W Work
//work可实现具体任务
func (w *Work) AddTask() error {
fmt.Println("远程调用此方法")
return nil
}
//在new 里初始化一些中间建如db等
func (w *Work) New() (*Work, error) {
worker := &Work{}
return worker, nil
}
type ControlService struct {
Work *Work
}
func (c *ControlService) Ping(r *http.Request, in *client.PingMessage, out *client.PingMessage) error {
out.Payload = in.Payload
fmt.Println("我是远程服务我已启动,谢谢")
return nil
}
func (c *Work) Sign(port int) {
//获取心跳地址
var addr string = "127.0.0.1:8888"
go func(_addr string) {
c := http.Client{
Timeout: 250 * time.Millisecond,
}
for {
_payload := &SignPayload{
Name: "sign",
Address: "127.0.0.1",
Port: port,
}
_reqURL := "http://" + _addr + "/v1/registry/sign"
blob, err := json.Marshal(_payload)
if err != nil {
return
}
req, err := http.NewRequest(http.MethodGet, _reqURL, bytes.NewBuffer(blob))
if err != nil {
fmt.Println("error", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Connection", "close")
resp, err := c.Do(req)
if err != nil {
fmt.Println("心跳链接中断", err)
}
if resp != nil {
if err := resp.Body.Close(); err != nil {
panic("Close response body error")
}
}
time.Sleep(time.Duration(3 * time.Second))
}
}(addr)
}
至此简单的rpc服务就启动了,但是要想实现复杂的逻辑需补充方法,结构体中需要添加其必要的初始化信息
github地址:https://github.com/tsxylhs/golang_jrpc_test.git