1. 简介
RPC在分布式系统中负责各组件之间的通信,比如Hadoop,Spark等分布式计算框架中都使用RPC作为通信模块的实现模式。本文主要介绍了如何用GOlang中的HTTP包实现分布式各个server之间的跨网络RPC和消息处理。关于这篇文章介绍的RPC实现已经被应用在MIT6.824 raft的实现中,具体的实现的代码托管在github上,主要的代码实现位于raft.go中。(注:为了通过测试用例采用了test case中给出的labrpc在单机上模拟不稳定的网络环境,但接下来的实现分析中均采用实际分布式系统中使用的HTTP协议)
2. 实现分析
一次完整的RPC调用是一个C/S的过程,即client端发起一次调用,server端接到请求之后对请求进行对应的处理,然后将处理的结果写回,此时完成一次完整的RPC调用。接下来的分析以这个流程的顺序进行。
以raft 协议中一个candidate server(候选者)向另外一个follower server(跟随者)发起投票请求为例,首先candidate调用SendVoteRequest()向follower发起投票请求。代码如下:
//--------------------------------------
// Outgoing
//--------------------------------------
// Sends an AppendEntries RPC to a peer.
func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
return nil
}
url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath())
traceln(server.Name(), "POST", url)
httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
if httpResp == nil || err != nil {
traceln("transporter.rv.response.error:", err)
return nil
}
defer httpResp.Body.Close()
resp := &RequestVoteResponse{}
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
traceln("transporter.rv.decoding.error:", err)
return nil
}
return resp
}
上面的代码逻辑比较简单,首先编码要发送的包体,然后拼接出一个url,这个url包括ConnectionString(类似IP+PORT,用来找到server端的服务)和t.RequestVotePath()(返回要调用的RPC方法名:“”requestVote”),接着通过Golang中http包的Post方法来调用远端follower server 上的requstVote方法,注意这个Post是一个同步请求,Post请求完成得到httpResp解码后即可返回。
接下来来到follower端,follower server端已经提前install了HTTP 的路由,这样candidate server POST过来的请求通过“”requestVote”标识找到对应的处理句柄,server端安装路由(为每种请求找到对应的处理句柄)的函数如下:
// Applies Raft routes to an HTTP router for a given server.
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server))
mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server))
}
紧接着就会触发requestVoteHandler。
// Handles incoming RequestVote requests.
func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /requestVote")
req := &RequestVoteRequest{}
if _, err := req.Decode(r.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
resp := server.RequestVote(req)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
}
}
}
requestVoteHandler只是起到一个中转的作用,首先将请求解码,然后调用RequestVote(req),然后将调用结果写回http请求。接下来继续看RequestVote()
func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
ret, _ := s.send(req)
resp, _ := ret.(*RequestVoteResponse)
return resp
}
RequestVote只有短短几行,但是包含了一个很重要的函数:send(),send同步的将请求发送到follower 的 Eventloop的消息队列里,等待follower处理完成后将resp返回。接下来重点分析send函数
// Sends an event to the event loop to be processed. The function will wait
// until the event is actually processed before returning.
func (s *server) send(value interface{}) (interface{}, error) {
if !s.Running() {
return nil, StopError
}
event := &ev{target: value, c: make(chan error, 1)}
select {
case s.c <- event:
case <-s.stopped:
return nil, StopError
}
select {
case <-s.stopped:
return nil, StopError
case err := <-event.c:
return event.returnValue, err
}
}
send调用接受任何类型的参数,类似C语言中的void *,然后将value包裹成一个event发送到sever的event chan(s.c)中,然后在第二个select中等待eventloop处理完成(标识为event.c中有err返回,如果无错误,返回的是空err)后返回。
接下来就是实际处理event的eventloop了,这里截取followerloop中重要的部分进行分析:
func (s *server) followerLoop() {
.........
for s.State() == Follower {
...........
select {
..............
case e := <-s.c:
switch req := e.target.(type) {
case *AppendEntriesRequest:
e.returnValue, update = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, update = s.processRequestVoteRequest(req)
default:
...............
}
// Callback to event.
e.c <- err
}
}
}
eventloop持续的从消息队列拿到req然后根据req的类型进行不同的处理,处理后在e.c中写入一个err(有错误就写入具体报错,无错误就写入nil),此时前一步中的send调用就会返回,返回之后之前的各个阻塞步骤就会返回,client端收到调用结果,一次请求投票的RPC调用完整结束。
3. 结束语
以上就是使用HTTP实现RPC调用和Eventloop的实现方式,欢迎各位通过私信或者留言的方式交流分布式系统相关的东西。