简介:Java 的天下里,大师普遍利用的一个高性能收集通讯框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的天下里,getty 也是一个类似 netty 的高性能收集通讯库。getty 最初由 dubbogo 项目负责人于雨开辟,作为底层通讯库在 dubbo-go 中利用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的配合尽力下,getty 也终极进入到 apache 这个大师庭,并更名 dubbo-getty 。
一、简介
Java 的天下里,大师普遍利用的一个高性能收集通讯框架 netty,很多 RPC 框架都是基于 netty 来实现的。在 golang 的天下里,getty 也是一个类似 netty 的高性能收集通讯库。getty 最初由 dubbogo 项目负责人于雨开辟,作为底层通讯库在 dubbo-go 中利用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的配合尽力下,getty 也终极进入到 apache 这个大师庭,并更名 dubbo-getty 。
18 年的时辰,我在公司里理论微效力,那时碰到最大的题目就是散布式事务题目。同年,阿里在社区开源他们的散布式事务处置计划,我也很快关注到这个项目,开初还叫 fescar,后来更名 seata。由于我对开源技术很感爱好,加了很多社区群,那时也很关注 dubbo-go 这个项目,在里面冷静潜水。随着对 seata 的了解,慢慢萌生了做一个 go 版本的散布式事务框架的想法。
要做一个 golang 版的散布式事务框架,重要的一个题目就是若何实现 RPC 通讯。dubbo-go 就是很好的一个例子摆在眼前,遂初步研讨 dubbo-go 的底层 getty。
二、若何基于 getty 实现 RPC 通讯
getty 框架的整体模子图以下:
下面连系相关代码,详述 seata-golang 的 RPC 通讯进程。
1. 建立毗连
实现 RPC 通讯,首先要建立收集毗连吧,我们从 client.go 初步看起。
func (c *client) connect() {
var (
err error
ss Session
for {
// 建立一个 session 毗连
ss = c.dial()
if ss == nil {
// client has been closed
break
}
err = c.newSession(ss)
if err == nil {
// 收发报文
ss.(*session).run()
// 此处省略部分代码
break
}
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
ss.Conn().Close()
}
}
connect() 方式经过 dial() 方式获得了一个 session 毗连,进入 dial() 方式:
func (c *client) dial() Session {
switch c.endPointType {
case TCP_CLIENT:
return c.dialTCP()
case UDP_CLIENT:
return c.dialUDP()
case WS_CLIENT:
return c.dialWS()
case WSS_CLIENT:
return c.dialWSS()
}
return nil
}
我们关注的是 TCP 毗连,所以继续进入 c.dialTCP() 方式:
func (c *client) dialTCP() Session {
var (
err error
conn net.Conn
for {
if c.IsClosed() {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout}
// 建立加密毗连
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
} else {
// 建立 tcp 毗连
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
}
if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close()
err = errSelfConnect
}
if err == nil {
// 返回一个 TCPSession
return newTCPSession(conn, c)
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))
<-wheel.After(connectInterval)
}
}
至此,我们晓得了 getty 若何建立 TCP 毗连,并返回 TCPSession。
2. 收发报文
那它是怎样收发报文的呢,我们回到 connection 方式接着往下看,有这样一行 ss.(*session).run(),在这行代码以后代码都是很简单的操纵,我们猜测这行代码运转的逻辑里面必定包括收发报文的逻辑,接着进入 run() 方式:
func (s *session) run() {
// 省略部分代码
go s.handleLoop()
go s.handlePackage()
}
这里起了两个 goroutine,handleLoop 和 handlePackage,看字面意义合适我们的料想,进入 handleLoop() 方式:
func (s *session) handleLoop() {
// 省略部分代码
for {
// A select blocks until one of its cases is ready to run.
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select {
// 省略部分代码
case outPkg, ok = <-s.wQ:
// 省略部分代码
iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
// 经过 s.writer 将 interface{} 范例的 outPkg 编码成二进制的比特
pkgBytes, err = s.writer.Write(s, outPkg)
// 省略部分代码
iovec = append(iovec, pkgBytes)
//省略部分代码
}
// 将这些二进制比特发送进来
err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), perrors.WithStack(err))
s.stop()
// break LOOP
flag = false
}
case <-wheel.After(s.period):
if flag {
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
// 按时履行的逻辑,心跳等
s.listener.OnCron(s)
}
}
}
}
经过上面的代码,我们不难发现,handleLoop() 方式处置的是发送报文的逻辑,RPC 需要发送的消息首先由 s.writer 编码成二进制比特,然后经过建立的 TCP 毗连发送进来。这个 s.writer 对应的 Writer 接口是 RPC 框架必必要实现的一个接口。
继续看 handlePackage() 方式:
func (s *session) handlePackage() {
// 省略部分代码
if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}
err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}
进入 handleTCPPackage() 方式:
func (s *session) handleTCPPackage() error {
// 省略部分代码
conn = s.Connection.(*gettyTCPConn)
for {
// 省略部分代码
bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
// 从 TCP 毗连中收到报文
bufLen, err = conn.recv(buf)
// 省略部分代码
break
}
// 省略部分代码
// 将收到的报文二进制比特写入 pkgBuf
pktBuf.Write(buf[:bufLen])
for {
if pktBuf.Len() <= 0 {
break
}
// 经过 s.reader 将收到的报文解码成 RPC 消息
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// 省略部分代码
s.UpdateActive()
// 将收到的消息放入 TaskQueue 供 RPC 消耗端消耗
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
}
if exit {
break
}
}
return perrors.WithStack(err)
}
从上面的代码逻辑我们分析出,RPC 消耗端需要将从 TCP 毗连收到的二进制比特报文解码成 RPC 能消耗的消息,这个工作由 s.reader 实现,所以,我们要构建 RPC 通讯层也需要实现 s.reader 对应的 Reader 接口。
3. 底层处置收集报文的逻辑若何与营业逻辑解耦
我们都晓得,netty 经过 boss 线程和 worker 线程实现了底层收集逻辑和营业逻辑的解耦。那末,getty 是若何实现的呢?
在 handlePackage() 方式最初,我们看到,收到的消息被放入了 s.addTask(pkg) 这个方式,接着往下分析:
func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return
}
f()
}
pkg 参数传递到了一个匿名方式,这个方式终极放入了 taskPool。这个方式很关键,在我后来写 seata-golang 代码的时辰,就碰到了一个坑,这个坑前面分析。
接着我们看一下 taskPool 的界说:
// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
if size < 1 {
size = runtime.NumCPU() * 100
}
return &taskPoolSimple{
work: make(chan task),
sem: make(chan struct{}, size),
done: make(chan struct{}),
}
}
构建了一个缓冲巨细为 size (默以为 runtime.NumCPU() * 100) 的 channel sem。再看方式 AddTaskAlways(t task):
func (p *taskPoolSimple) AddTaskAlways(t task) {
select {
case <-p.done:
return
default:
}
select {
case p.work <- t:
return
default:
}
select {
case p.work <- t:
case p.sem <- struct{}{}:
p.wg.Add(1)
go p.worker(t)
default:
goSafely(t)
}
}
加入的使命,会先由 len(p.sem) 个 goroutine 去消耗,假定没有 goroutine 余暇,则会启动一个姑且的 goroutine 去运转 t()。相当于有 len(p.sem) 个 goroutine 组成了 goroutine pool,pool 中的 goroutine 去向置营业逻辑,而不是由处置收集报文的 goroutine 去运转营业逻辑,从而实现了解耦。写 seata-golang 时碰到的一个坑,就是忘记设备 taskPool 组成了处置营业逻辑和处置底层收集报文逻辑的 goroutine 是同一个,我在营业逻辑中阻塞期待一个使命完成时,阻塞了全部 goroutine,使得阻塞时代收不到任何报文。
4. 具体实现
下面的代码见 getty.go:
// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
Read(Session, []byte) (interface{}, int, error)
}
// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
}
// ReadWriter interface use for handle application packages
type ReadWriter interface {
Reader
Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error
// invoked when session closed.
OnClose(Session)
// invoked when got error.
(Session, error)
// invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)
// invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
//
// If ur logic processing in this func will take a long time, u should start a goroutine
// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
// can do the logic processing in other asynchronous way.
// !!!In short, ur OnMessage callback func should return asap.
//
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}
经过对全部 getty 代码的分析,我们只要实现 ReadWriter 来对 RPC 消息编解码,再实现 EventListener 来处置 RPC 消息的对应的具体逻辑,将 ReadWriter 实现和 EventLister 实现注入到 RPC 的 Client 和 Server 端,则可实现 RPC 通讯。
4.1 编解码协议实现
下面是 seata 协议的界说:
在 ReadWriter 接口的实现 RpcPackageHandler 中,挪用 Codec 方式抵消息体按照上面的格式编解码:
// 消息编码为二进制比特
func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA:
return SeataEncoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil
}
}
// 二进制比特解码为消息体
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA:
return SeataDecoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}
4.2 Client 端实现
再来看 client 端 EventListener 的实现 RpcRemotingClient:
func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func()
request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{
ApplicationId: client.conf.ApplicationId,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
// 建立毗连后向 Transaction Coordinator 倡议注册 TransactionManager 的哀告
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
// 将与 Transaction Coordinator 建立的毗连保存在毗连池供后续利用
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()
return nil
}
// ...
func (client *RpcRemoteClient) (session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}
// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}
// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Info("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
}
}
if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
// 处置事务消息,提交 or 回滚
client.onMessage(rpcMessage, session.RemoteAddr())
} else {
resp, loaded := client.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.Id)
}
}
}
// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
// 发送心跳
client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}
clientSessionManager.RegisterGettySession(session) 的逻辑将鄙人文平分析。
4.3 Server 端 Transaction Coordinator 实现
代码见 DefaultCoordinator:
func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {
log.Infof("got getty_session:%s", session.Stat())
return nil
}
func (coordinator *DefaultCoordinator) (session getty.Session, err error) {
// 开释 TCP 毗连
SessionManager.ReleaseGettySession(session)
session.Close()
log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)
}
func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {
log.Info("getty_session{%s} is closing......", session.Stat())
}
func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message:{%v}", pkg)
rpcMessage, ok := pkg.(protocal.RpcMessage)
if ok {
_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
if isRegTM {
// 将 TransactionManager 信息和 TCP 毗连建立映照关系
coordinator.OnRegTmMessage(rpcMessage, session)
return
}
heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {
coordinator.OnCheckMessage(rpcMessage, session)
return
}
if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {
log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)
_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
if isRegRM {
// 将 ResourceManager 信息和 TCP 毗连建立映照关系
coordinator.OnRegRmMessage(rpcMessage, session)
} else {
if SessionManager.IsRegistered(session) {
defer func() {
if err := recover(); err != nil {
log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)
}
}()
// 处置事务消息,全局事务注册、分支事务注册、分支事务提交、全局事务回滚等
coordinator.OnTrxMessage(rpcMessage, session)
} else {
session.Close()
log.Infof("close a unhandled connection! [%v]", session)
}
}
} else {
resp, loaded := coordinator.futures.Load(rpcMessage.Id)
if loaded {
response := resp.(*getty2.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
coordinator.futures.Delete(rpcMessage.Id)
}
}
}
}
func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {
}
coordinator.OnRegTmMessage(rpcMessage, session) 注册 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session) 注册 Resource Manager。具体逻辑分析见下文。
消息进入 coordinator.OnTrxMessage(rpcMessage, session) 方式,将按照消息的范例码路由到具体的逻辑傍边:
switch msg.GetTypeCode() {
case protocal.TypeGlobalBegin:
req := msg.(protocal.GlobalBeginRequest)
resp := coordinator.doGlobalBegin(req, ctx)
return resp
case protocal.TypeGlobalStatus:
req := msg.(protocal.GlobalStatusRequest)
resp := coordinator.doGlobalStatus(req, ctx)
return resp
case protocal.TypeGlobalReport:
req := msg.(protocal.GlobalReportRequest)
resp := coordinator.doGlobalReport(req, ctx)
return resp
case protocal.TypeGlobalCommit:
req := msg.(protocal.GlobalCommitRequest)
resp := coordinator.doGlobalCommit(req, ctx)
return resp
case protocal.TypeGlobalRollback:
req := msg.(protocal.GlobalRollbackRequest)
resp := coordinator.doGlobalRollback(req, ctx)
return resp
case protocal.TypeBranchRegister:
req := msg.(protocal.BranchRegisterRequest)
resp := coordinator.doBranchRegister(req, ctx)
return resp
case protocal.TypeBranchStatusReport:
req := msg.(protocal.BranchReportRequest)
resp := coordinator.doBranchReport(req, ctx)
return resp
default:
return nil
}
4.4 session manager 分析
Client 端同 Transaction Coordinator 建立毗连起毗连后,经过 clientSessionManager.RegisterGettySession(session) 将毗连保存在 serverSessions = sync.Map{} 这个 map 中。map 的 key 为从 session 中获得的 RemoteAddress 即 Transaction Coordinator 的地址,value 为 session。这样,Client 端就可以经过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见 getty_client_session_manager.go。
Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后,一个毗连既有能够用来发送 TM 消息也有能够用来发送 RM 消息。我们经过 RpcContext 来标识一个毗连信息:
type RpcContext struct {
Version string
TransactionServiceGroup string
ClientRole meta.TransactionRole
ApplicationId string
ClientId string
ResourceSets *model.Set
Session getty.Session
}
当收到事务消息时,我们需要机关这样一个 RpcContext 供后续事务处置逻辑利用。所以,我们会机关以下 map 来缓存映照关系:
var (
// session -> transactionRole
// TM will register before RM, if a session is not the TM registered,
// it will be the RM registered
session_transactionroles = sync.Map{}
// session -> applicationId
identified_sessions = sync.Map{}
// applicationId -> ip -> port -> session
client_sessions = sync.Map{}
// applicationId -> resourceIds
client_resources = sync.Map{}
这样,Transaction Manager 和 Resource Manager 分袂经过 coordinator.OnRegTmMessage(rpcMessage, session) 和 coordinator.OnRegRmMessage(rpcMessage, session) 注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在 client_resources map 中缓存 applicationId 与 resourceIds(一个利用能够存在多个 Resource Manager) 的关系。在需要时,我们就可以经过上述映照关系机关一个 RpcContext。这部分的实现和 java 版 seata 有很大的分歧,感爱好的可以深化了解一下。具体代码见 getty_session_manager.go。
至此,我们就分析完了 seata-golang 全部 RPC 通讯模子的机制。
三、seata-golang 的未来
seata-golang 从今年 4 月份初步开辟,到 8 月份底子实现和 java 版 seata 1.2 协议的互通,对 mysql 数据库实现了 AT 形式(自动调和散布式事务的提交回滚),实现了 TCC 形式,TC 端利用 mysql 存储数据,使 TC 酿成一个无状态利用支撑高可用安插。下图展现了 AT 形式的道理:
后续,还有很多工作可以做,比如:对注册中心的支撑、对设置中心的支撑、和 java 版 seata 1.4 的协议互通、其他数据库的支撑、raft transaction coordinator 的实现等,希望对散布式事务题目感爱好的开辟者可以加入进来一路来打造一个完善的 golang 的散布式事务框架。
原文链接
本文为阿里云原创内容,未经答应不得转载