RPC

RPC
RPCRemote Procedure Call
RPCHTTP
HTTPRPCHTTPSocket
RPCHTTPHTTPHTTPHTTPRPC
HTTPTCPHTTP(HTTP 1.x)

Thrift架構

Apache ThriftRPCThrift
TProtocol
Thriftbinary
TBinaryProtocolTCompactProtocolTJSONProtocolJSONTDebugProtocoldebug
TTransport
TTransportTTransportTTransportTProtocol
TSocketI/OTHttpTransportHTTPTFramedTransPortframeTFileTransPortTMemoryTransportI/OTZlibTransportzlibTBufferedTransporttransportbufferbufferbuffer
TServer
TServerthriftclientprocessorthriftTServerthriftserver
TSimpleServerI/OTTHreaadPoolServerI/OTNonblockingServerI/OTThreadedServerI/O

對於`golang`來講,只有`TSimpleServer`服務模式,而且是非阻塞的

TProcesser

TProcessorTServerinputProtocoloutputProtocolinputProtocolclientoutputProtocolTProcessorprocessclientrpc

ThriftClient

ThriftClientTProcessorinputProtocoloutputProtocolthriftClientrpcsendreceive
sendstructTProtocolTServersendthriftClientreceiveTServerTServerrpc

TSimpleServer服務模式

TSimpleServerTThreadedServergoroutinegolangconn-pool
type TSimpleServer struct {
	quit chan struct{}     // 採用阻塞channel進行判斷

	processorFactory       TProcessorFactory
	serverTransport        TServerTransport
	inputTransportFactory  TTransportFactory
	outputTransportFactory TTransportFactory
	inputProtocolFactory   TProtocolFactory
	outputProtocolFactory  TProtocolFactory
}

複製代碼
thrift-idl
namespace go echo

struct EchoReq {
    1: string msg;
}

struct EchoRes {
    1: string msg;
}

service Echo {
    EchoRes echo(1: EchoReq req);
}
複製代碼

服務端Server代碼

func (p *TSimpleServer) Serve() error {
	err := p.Listen()
	if err != nil {
		return err
	}
	p.AcceptLoop()
	return nil
}

func (p *TSimpleServer) AcceptLoop() error {
	for {
	    // 此處的Accept()是阻塞的,是調用listener.Accept()
		client, err := p.serverTransport.Accept()
		if err != nil {
			select {
			case <-p.quit:
				return nil
			default:
			}
			return err
		}
		if client != nil {
			go func() {
				if err := p.processRequests(client); err != nil {
					log.Println("error processing request:", err)
				}
			}()
		}
	}
}
複製代碼
serverthrift 1.0go thriftgolang waitgroup
func (p *TSimpleServer) processRequests(client TTransport) error {
	processor := p.processorFactory.GetProcessor(client)

	inputTransport := p.inputTransportFactory.GetTransport(client)
	outputTransport := p.outputTransportFactory.GetTransport(client)

	inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
	outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
	defer func() {
		if e := recover(); e != nil {
			log.Printf("panic in processor: %s: %s", e, debug.Stack())
		}
	}()
	if inputTransport != nil {
		defer inputTransport.Close()
	}
	if outputTransport != nil {
		defer outputTransport.Close()
	}
	for {
		ok, err := processor.Process(inputProtocol, outputProtocol)

		if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
			return nil
		} else if err != nil {
			log.Printf("error processing request: %s", err)
			return err
		}
		if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
			continue
		}
 		if !ok {
			break
		}
	}
	return nil
}
複製代碼
Process
func (p *EchoProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	name, _, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return false, err
	}
	// 獲取傳遞過來的name,若是存在則處理
	if processor, ok := p.GetProcessorFunction(name); ok {
		return processor.Process(seqId, iprot, oprot)
	}
	// 異常邏輯
	iprot.Skip(thrift.STRUCT)
	iprot.ReadMessageEnd()
	x3 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
	oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
	x3.Write(oprot)
	oprot.WriteMessageEnd()
	oprot.Flush()
	return false, x3

}
複製代碼
TServerrpcTProcessorprocessTProcessorprocessTTransport.readMessageBeginrpcrpc
rpcrpc callTProcessor.process_fnrpcTProcessor.process_fnrpcprocessMaprpcrpc
func (p *echoProcessorEcho) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
	args := EchoEchoArgs{}
	// 讀取入參的參數
	if err = args.Read(iprot); err != nil {
		iprot.ReadMessageEnd()
		x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return false, err
	}

	iprot.ReadMessageEnd()

	result := EchoEchoResult{}
	var retval *EchoRes
	var err2 error
	// 此處是thrift爲何err不能傳錯誤,若是傳業務錯誤會被阻塞
	if retval, err2 = p.handler.Echo(args.Req); err2 != nil {
		x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing echo: "+err2.Error())
		oprot.WriteMessageBegin("echo", thrift.EXCEPTION, seqId)
		x.Write(oprot)
		oprot.WriteMessageEnd()
		oprot.Flush()
		return true, err2
	} else {
		result.Success = retval
	}

	if err2 = oprot.WriteMessageBegin("echo", thrift.REPLY, seqId); err2 != nil {
		err = err2
	}

	if err2 = result.Write(oprot); err == nil && err2 != nil {
		err = err2
	}
	if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
		err = err2
	}

	if err2 = oprot.Flush(); err == nil && err2 != nil {
		err = err2
	}
	if err != nil {
		return
	}
	return true, err
}
複製代碼
stop
var once sync.Once
func (p *TSimpleServer) Stop() error {
	q := func() {
		p.quit <- struct{}{}
		p.serverTransport.Interrupt()
	}
	once.Do(q)
	return nil
}
複製代碼
stopserver

客戶端代碼

Client
func (p *EchoClient) Echo(req *EchoReq) (r *EchoRes, err error) {

	if err = p.sendEcho(req); err != nil {
		return
	}

	return p.recvEcho()
}
複製代碼
sendEcho()
func (p *EchoClient) sendEcho(req *EchoReq) (err error) {
	oprot := p.OutputProtocol
	if oprot == nil {
		oprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.OutputProtocol = oprot
	}
	// seqid + 1
	p.SeqId++

	if err = oprot.WriteMessageBegin("echo", thrift.CALL, p.SeqId); err != nil {
		return
	}

	// 構建參數
	args := EchoEchoArgs{
		Req: req,
	}

	if err = args.Write(oprot); err != nil {
		return
	}
	// 通知服務器發送完畢
	if err = oprot.WriteMessageEnd(); err != nil {
		return
	}
	return oprot.Flush()
}
複製代碼
recvEcho()
func (p *EchoClient) recvEcho() (value *EchoRes, err error) {
	iprot := p.InputProtocol
	if iprot == nil {
		iprot = p.ProtocolFactory.GetProtocol(p.Transport)
		p.InputProtocol = iprot
	}
	//
	method, mTypeId, seqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return
	}
	if method != "echo" {
		err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "echo failed: wrong method name")
		return
	}
	if p.SeqId != seqId {
		err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "echo failed: out of sequence response")
		return
	}
	if mTypeId == thrift.EXCEPTION {
		error0 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception")
		var error1 error
		error1, err = error0.Read(iprot)
		if err != nil {
			return
		}
		if err = iprot.ReadMessageEnd(); err != nil {
			return
		}
		err = error1
		return
	}
	if mTypeId != thrift.REPLY {
		err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "echo failed: invalid message type")
		return
	}
	result := EchoEchoResult{}
	if err = result.Read(iprot); err != nil {
		return
	}
	if err = iprot.ReadMessageEnd(); err != nil {
		return
	}
	value = result.GetSuccess()
	return
}
複製代碼

thrift在mac機器安裝問題

go get git.apache.org/thrift.git/lib/go/thriftgithub.com
thrift -versionthriftthrift版本是0.10.0thrifthttps://github.com/apache/thrift/archive/0.10.0.zip
mkdir -p git.apache.org/thrift.git/lib/go/go

Reference