一、为什么要使用Deadlines#
DEADLINE_EXCEEDED
在正常情况下,你没有设置deadline,那么所有的请求可能在最大请求时间过后才超时。这样你对于你的服务器资源,可能存在风险,比如内存,可能因为这个长期运行的服务而增长很快,从而耗尽资源。
为了避免这种情况,需要给你的客户端请求程序设置一个默认的超时时间,在这段时间内请求没有返回,那么就超时报错。
二、怎么使用?Deadlines使用步骤#
2.1 使用步骤:#
1.设置deadlines
Copy
var deadlineMs = flag.Int("deadline_ms", 20*1000, "Default deadline in milliseconds.")
clientDeadline := time.Now().Add(time.Duration(*deadlineMs) * time.Millisecond)
ctx, cancel := context.WithDeadline(ctx, clientDeadline)
2.检查deadlines
Copy
if ctx.Err() == context.Canceled {
return status.New(codes.Canceled, "Client cancelled, abandoning.")
}
2.2 具体使用:#
1. 建立连接时超时控制:
客户端建立连接时,使用的Dial()函数,它位于
google.golang.org/grpc/clientconn.go 中,我们看看这个函数内容:
Copy
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
google.golang.org/grpc/clientconn.goDialContext
使用的时候传入设置timeout的context,如下:
Copy
ctx, cancel := context.Timeout(context.Bakcground(), time.Second*5)
defer cancel()
conn, err := grpc.DialContext(ctx, address, grpc.WithBlock(), grpc.WithInsecure())
grpc.WithInsecure()
2. 调用时超时:
函数的调用超时控制
Copy
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
result, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
三、实例#
用grpc官方的例子来练习下
目录结构:
Copy
grpc-tutorial
-- 04deadlines
--client
- main.go
--server
- main.go
--proto/echo
- echo.proto
- echo.pb.go
1.首先是定义服务 echo.proto#
Copy
syntax = "proto3";
package echo;message
EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
service Echo {
rpc UnaryEcho(EchoRequest) returns (EchoRequest) {}
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse){}
}
进入到proto/echo目录,生成go文件,命令如下:
protoc -I . --go_out=plugins=grpc:. ./echo.proto
2.客户端#
clientmain.go
有2个主要的函数,2端都是stream和都不是stream,先看都不是stream的函数
都不是stream的函数
Copy
// unaryCall 不是stream的请求
func unaryCall(c pb.EchoClient, requestID int, message string, want codes.Code) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) //超时设置
defer cancel()
req := &pb.EchoRequest{Message: message} //参数部分
_, err := c.UnaryEcho(ctx, req) //调用函数发送请求给服务端
got := status.Code(err) //
fmt.Printf("[%v] wanted = %v, got = %v
", requestID, want, got)
}
上面的code设置在文件 grpc/codes/codes.go
Copy
type Code uint32
const (
OK Code = 0
Canceled Code = 1
Unknown Code = 2
InvalidArgument Code = 3
DeadlineExceeded Code = 4
... ...
)
2端都是stream的函数:
Copy
// streamingCall,2端都是stream
func streamingCall(c pb.EchoClient, requestID int, message string, want codes.Code) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)//超时设置
defer cancel()
stream, err := c.BidirectionalStreamingEcho(ctx)//双向stream
if err != nil {
log.Printf("Send error : %v", err)
return
}
err = stream.Send(&pb.EchoRequest{Message: message})//发送
if err != nil {
log.Printf("Send error : %v", err)
return
}
_, err = stream.Recv() //接收
got := status.Code(err)
fmt.Printf("[%v] wanted = %v, got = %v
", requestID, want, got)
}
main 执行函数
Copy
func main() {
flag.Parse()
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect : %v ", err)
}
defer conn.Close()
c :=pb.NewEchoClient(conn)
// 成功请求
unaryCall(c, 1, "word", codes.OK)
// 超时 deadline
unaryCall(c, 2, "delay", codes.DeadlineExceeded)
// A successful request with propagated deadline
unaryCall(c, 3, "[propagate me]world", codes.OK)
// Exceeds propagated deadline
unaryCall(c, 4, "[propagate me][propagate me]world", codes.DeadlineExceeded)
// Receives a response from the stream successfully.
streamingCall(c, 5, "[propagate me]world", codes.OK)
// Exceeds propagated deadline before receiving a response
streamingCall(c, 6, "[propagate me][propagate me]world", codes.DeadlineExceeded)
}
3.服务端#
定义一个struct
Copy
type server struct {
pb.UnimplementedEchoServer
client pb.EchoClient
cc *grpc.ClientConn
}
2端不是stream的函数:
Copy
func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest)(*pb.EchoResponse, error) {
message := req.Message
if strings.HasPrefix(message, "[propagate me]") {//判断接收的值
time.Sleep(800 * time.Millisecond)
message := strings.TrimPrefix(message, "[propagate me]")
return s.client.UnaryEcho(ctx, &pb.EchoRequest{Message:message}) //<1>
}
if message == "delay" {
time.Sleep(1500 * time.Millisecond) // message=delay 时睡眠1500毫秒,大于client的设置的1秒,这里就超时了
}
return &pb.EchoResponse{Message:message}, nil
}
func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest)
2端设置stream的函数:
Copy
func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return status.Error(codes.InvalidArgument, "request message not received")
}
if err != nil {
return err
}
message := req.Message
if strings.HasPrefix(message, "[propagate me]") {
time.Sleep(800 * time.Millisecond)
message = strings.TrimPrefix(message, "[propagate me]")
res, err := s.client.UnaryEcho(stream.Context(), &pb.EchoRequest{Message:message})//再次执行客户端请求服务端函数,这里可能会超时
if err != nil {
return err
}
stream.Send(res)
}
if message == "delay" {
time.Sleep(1500 * time.Millisecond)
}
stream.Send(&pb.EchoResponse{Message:message})
}
}
main函数
Copy
func main() {
flag.Parse()
address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v ", err)
}
echoServer := newEchoServer()
defer echoServer.Close()
grpcServer := grpc.NewServer()
pb.RegisterEchoServer(grpcServer, echoServer)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v ", err)
}
}
4 执行#
先运行 /server/main.go , go run main.go
在运行 /client/main.go, go run main.go
执行结果:
Copy
go run main.go
[1] wanted = OK, got = OK
[2] wanted = DeadlineExceeded, got = DeadlineExceeded
[3] wanted = OK, got = Unavailable
[4] wanted = DeadlineExceeded, got = Unavailable
[5] wanted = OK, got = Unavailable
[6] wanted = DeadlineExceeded, got = Unavailable