RPC
RPC
RPCRemote Procedure Call
RPCHTTP
HTTPRPCHTTPSocket
RPCHTTPHTTPHTTPHTTPRPC
HTTPTCPHTTP(HTTP 1.x)
Thrift架構
Apache ThriftRPCThrift
TProtocol
Thriftbinary
TBinaryProtocolTCompactProtocolTJSONProtocolJSONTDebugProtocoldebug
TTransport
TTransportTTransportTTransportTProtocol
TSocketI/OTHttpTransportHTTPTFramedTransPortframeTFileTransPortTMemoryTransportI/OTZlibTransportzlibTBufferedTransporttransportbufferbufferbuffer
TServer
TServerthriftclientprocessorthriftTServerthriftserver
TSimpleServerI/OTTHreaadPoolServerI/OTNonblockingServerI/OTThreadedServerI/O
對於`golang`來講,只有`TSimpleServer`服務模式,而且是非阻塞的
TProcesser
TProcessorTServerinputProtocoloutputProtocolinputProtocolclientoutputProtocolTProcessorprocessclientrpc
ThriftClient
ThriftClientTProcessorinputProtocoloutputProtocolthriftClientrpcsendreceive
sendstructTProtocolTServersendthriftClientreceiveTServerTServerrpc
TSimpleServer服務模式
TSimpleServerTThreadedServergoroutinegolangconn-pool
type TSimpleServer struct {
quit chan struct{} // 採用阻塞channel進行判斷
processorFactory TProcessorFactory
serverTransport TServerTransport
inputTransportFactory TTransportFactory
outputTransportFactory TTransportFactory
inputProtocolFactory TProtocolFactory
outputProtocolFactory TProtocolFactory
}
複製代碼
thrift-idl
namespace go echo
struct EchoReq {
1: string msg;
}
struct EchoRes {
1: string msg;
}
service Echo {
EchoRes echo(1: EchoReq req);
}
複製代碼
服務端Server代碼
func (p *TSimpleServer) Serve() error {
err := p.Listen()
if err != nil {
return err
}
p.AcceptLoop()
return nil
}
func (p *TSimpleServer) AcceptLoop() error {
for {
// 此處的Accept()是阻塞的,是調用listener.Accept()
client, err := p.serverTransport.Accept()
if err != nil {
select {
case <-p.quit:
return nil
default:
}
return err
}
if client != nil {
go func() {
if err := p.processRequests(client); err != nil {
log.Println("error processing request:", err)
}
}()
}
}
}
複製代碼
serverthrift 1.0go thriftgolang waitgroup
func (p *TSimpleServer) processRequests(client TTransport) error {
processor := p.processorFactory.GetProcessor(client)
inputTransport := p.inputTransportFactory.GetTransport(client)
outputTransport := p.outputTransportFactory.GetTransport(client)
inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
defer func() {
if e := recover(); e != nil {
log.Printf("panic in processor: %s: %s", e, debug.Stack())
}
}()
if inputTransport != nil {
defer inputTransport.Close()
}
if outputTransport != nil {
defer outputTransport.Close()
}
for {
ok, err := processor.Process(inputProtocol, outputProtocol)
if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
return nil
} else if err != nil {
log.Printf("error processing request: %s", err)
return err
}
if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
continue
}
if !ok {
break
}
}
return nil
}
複製代碼
Process
func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
name, _, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return false, err
}
// 獲取傳遞過來的name,若是存在則處理
if processor, ok := p.GetProcessorFunction(name); ok {
return processor.Process(seqId, iprot, oprot)
}
// 異常邏輯
iprot.Skip(thrift.STRUCT)
iprot.ReadMessageEnd()
x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
x3.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, x3
}
複製代碼
TServerrpcTProcessorprocessTProcessorprocessTTransport.readMessageBeginrpcrpc
rpcrpc callTProcessor.process_fnrpcTProcessor.process_fnrpcprocessMaprpcrpc
func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
args := EchoEchoArgs{}
// 讀取入參的參數
if err = args.Read(iprot); err != nil {
iprot.ReadMessageEnd()
x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return false, err
}
iprot.ReadMessageEnd()
result := EchoEchoResult{}
var retval *EchoRes
var err2 error
// 此處是thrift爲何err不能傳錯誤,若是傳業務錯誤會被阻塞
if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
x.Write(oprot)
oprot.WriteMessageEnd()
oprot.Flush()
return true, err2
} else {
result.Success = retval
}
if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
err = err2
}
if err2 = result.Write(oprot); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
err = err2
}
if err2 = oprot.Flush(); err == nil && err2 != nil {
err = err2
}
if err != nil {
return
}
return true, err
}
複製代碼
stop
var once sync.Once
func (p *TSimpleServer) Stop() error {
q := func() {
p.quit <- struct{}{}
p.serverTransport.Interrupt()
}
once.Do(q)
return nil
}
複製代碼
stopserver
客戶端代碼
Client
func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {
if err = p.sendEcho(req); err != nil {
return
}
return p.recvEcho()
}
複製代碼
sendEcho()
func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
oprot := p.OutputProtocol
if oprot == nil {
oprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.OutputProtocol = oprot
}
// seqid + 1
p.SeqId++
if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
return
}
// 構建參數
args := EchoEchoArgs{
Req: req,
}
if err = args.Write(oprot); err != nil {
return
}
// 通知服務器發送完畢
if err = oprot.WriteMessageEnd(); err != nil {
return
}
return oprot.Flush()
}
複製代碼
recvEcho()
func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
iprot := p.InputProtocol
if iprot == nil {
iprot = p.ProtocolFactory.GetProtocol(p.Transport)
p.InputProtocol = iprot
}
//
method, mTypeId, seqId, err := iprot.ReadMessageBegin()
if err != nil {
return
}
if method != "echo" {
err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
return
}
if p.SeqId != seqId {
err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
return
}
if mTypeId == thrift.EXCEPTION {
error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
var error1 error
error1, err = error0.Read(iprot)
if err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
err = error1
return
}
if mTypeId != thrift.REPLY {
err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
return
}
result := EchoEchoResult{}
if err = result.Read(iprot); err != nil {
return
}
if err = iprot.ReadMessageEnd(); err != nil {
return
}
value = result.GetSuccess()
return
}
複製代碼
thrift在mac機器安裝問題
go get git.apache.org/thrift.git/lib/go/thriftgithub.com
thrift -versionthriftthrift版本是0.10.0thrifthttps://github.com/apache/thrift/archive/0.10.0.zip
mkdir -p git.apache.org/thrift.git/lib/go/go