1. 简介

RPC在分布式系统中负责各组件之间的通信,比如Hadoop,Spark等分布式计算框架中都使用RPC作为通信模块的实现模式。本文主要介绍了如何用GOlang中的HTTP包实现分布式各个server之间的跨网络RPC和消息处理。关于这篇文章介绍的RPC实现已经被应用在MIT6.824 raft的实现中,具体的实现的代码托管在github上,主要的代码实现位于raft.go中。(注:为了通过测试用例采用了test case中给出的labrpc在单机上模拟不稳定的网络环境,但接下来的实现分析中均采用实际分布式系统中使用的HTTP协议)

2. 实现分析

一次完整的RPC调用是一个C/S的过程,即client端发起一次调用,server端接到请求之后对请求进行对应的处理,然后将处理的结果写回,此时完成一次完整的RPC调用。接下来的分析以这个流程的顺序进行。

以raft 协议中一个candidate server(候选者)向另外一个follower server(跟随者)发起投票请求为例,首先candidate调用SendVoteRequest()向follower发起投票请求。代码如下:

//--------------------------------------
// Outgoing
//--------------------------------------

// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
	var b bytes.Buffer
	if _, err := req.Encode(&b); err != nil {
		traceln("transporter.rv.encoding.error:", err)
		return nil
	}

	url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath())
	traceln(server.Name(), "POST", url)

	httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
	if httpResp == nil || err != nil {
		traceln("transporter.rv.response.error:", err)
		return nil
	}
	defer httpResp.Body.Close()

	resp := &RequestVoteResponse{}
	if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
		traceln("transporter.rv.decoding.error:", err)
		return nil
	}

	return resp
}

上面的代码逻辑比较简单,首先编码要发送的包体,然后拼接出一个url,这个url包括ConnectionString(类似IP+PORT,用来找到server端的服务)和t.RequestVotePath()(返回要调用的RPC方法名:“”requestVote”),接着通过Golang中http包的Post方法来调用远端follower server 上的requstVote方法,注意这个Post是一个同步请求,Post请求完成得到httpResp解码后即可返回。

接下来来到follower端,follower server端已经提前install了HTTP 的路由,这样candidate server POST过来的请求通过“”requestVote”标识找到对应的处理句柄,server端安装路由(为每种请求找到对应的处理句柄)的函数如下:

// Applies Raft routes to an HTTP router for a given server.
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
	mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
	mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
	mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server))
	mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server))
}

紧接着就会触发requestVoteHandler。

// Handles incoming RequestVote requests.
func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		traceln(server.Name(), "RECV /requestVote")

		req := &RequestVoteRequest{}
		if _, err := req.Decode(r.Body); err != nil {
			http.Error(w, "", http.StatusBadRequest)
			return
		}

		resp := server.RequestVote(req)
		if resp == nil {
			http.Error(w, "Failed creating response.", http.StatusInternalServerError)
			return
		}
		if _, err := resp.Encode(w); err != nil {
			http.Error(w, "", http.StatusInternalServerError)
			return
		}
	}
}

requestVoteHandler只是起到一个中转的作用,首先将请求解码,然后调用RequestVote(req),然后将调用结果写回http请求。接下来继续看RequestVote()

func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
	ret, _ := s.send(req)
	resp, _ := ret.(*RequestVoteResponse)
	return resp
}

RequestVote只有短短几行,但是包含了一个很重要的函数:send(),send同步的将请求发送到follower 的 Eventloop的消息队列里,等待follower处理完成后将resp返回。接下来重点分析send函数

// Sends an event to the event loop to be processed. The function will wait
// until the event is actually processed before returning.
func (s *server) send(value interface{}) (interface{}, error) {
	if !s.Running() {
		return nil, StopError
	}

	event := &ev{target: value, c: make(chan error, 1)}
	select {
	case s.c <- event:
	case <-s.stopped:
		return nil, StopError
	}
	select {
	case <-s.stopped:
		return nil, StopError
	case err := <-event.c:
		return event.returnValue, err
	}
}

send调用接受任何类型的参数,类似C语言中的void *,然后将value包裹成一个event发送到sever的event chan(s.c)中,然后在第二个select中等待eventloop处理完成(标识为event.c中有err返回,如果无错误,返回的是空err)后返回。

接下来就是实际处理event的eventloop了,这里截取followerloop中重要的部分进行分析:

func (s *server) followerLoop() {
	.........
	for s.State() == Follower {
		...........
		select {
		..............
		case e := <-s.c:
			switch req := e.target.(type) {
			case *AppendEntriesRequest:
				e.returnValue, update = s.processAppendEntriesRequest(req)
			case *RequestVoteRequest:
				e.returnValue, update = s.processRequestVoteRequest(req)
			default:
				...............
			}
			// Callback to event.
			e.c <- err
		}
	}
}

eventloop持续的从消息队列拿到req然后根据req的类型进行不同的处理,处理后在e.c中写入一个err(有错误就写入具体报错,无错误就写入nil),此时前一步中的send调用就会返回,返回之后之前的各个阻塞步骤就会返回,client端收到调用结果,一次请求投票的RPC调用完整结束。


3. 结束语

以上就是使用HTTP实现RPC调用和Eventloop的实现方式,欢迎各位通过私信或者留言的方式交流分布式系统相关的东西。