继续上一篇golang源码分析:grpc 链接池(1),我们从源码来分析,我们将从连接池的建立,请求发起的时候获取连接,以及最终关闭连接三个流程进行源码分析。
1,创建连接的过程
源码入口位于google.golang.org/grpc@v1.46.0/clientconn.go
func Dial(target string, opts ...DialOption) (*ClientConn, error) {return DialContext(context.Background(), target, opts...)}
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target: target,csMgr: &connectivityStateManager{},conns: make(map[*addrConn]struct{}),dopts: defaultDialOptions(),blockingpicker: newPickerWrapper(),czData: new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}resolverBuilder, err := cc.parseTargetAndFindResolver()cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{DialCreds: credsClone,CredsBundle: cc.dopts.copts.CredsBundle,Dialer: cc.dopts.copts.Dialer,Authority: cc.authority,CustomUserAgent: cc.dopts.copts.UserAgent,ChannelzParentID: cc.channelzID,Target: cc.parsedTarget,})rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if cc.dopts.block {for {cc.Connect()}}
首先,获取域名解析器的构造器,比如拿到获取自定义的dns解析器;然后得到负载均衡器cc.balancerWrapper;最后得到解析器。
google.golang.org/grpc@v1.46.0/resolver_conn_wrapper.go
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error)ccr := &ccResolverWrapper{cc: cc,done: grpcsync.NewEvent(),}ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
如果我们在Dial的时候指定选项grpc.WithBlock(),就会在for循环里进行尝试连接,否则返回连接器,在请求到来的时候才进行真正的连接。接函数的定义如下,将当前连接退出idle状态:
func (cc *ClientConn) Connect() {cc.balancerWrapper.exitIdle()}
连接的状态定义在google.golang.org/grpc@v1.46.0/connectivity/connectivity.go,可以看到有6个状态:
func (s State) String() string {switch s {case Idle:return "IDLE"case Connecting:return "CONNECTING"case Ready:return "READY"case TransientFailure:return "TRANSIENT_FAILURE"case Shutdown:return "SHUTDOWN"default:logger.Errorf("unknown connectivity state: %d", s)return "INVALID_STATE"}}
在banlancer里会启动一个协程watch,服务端连接状态的变化
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapperccb := &ccBalancerWrapper{cc: cc,updateCh: buffer.NewUnbounded(),resultCh: buffer.NewUnbounded(),closed: grpcsync.NewEvent(),done: grpcsync.NewEvent(),}go ccb.watcher()ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
根据状态channel里的类型执行响应的操作分发,代码位于google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go
type ccBalancerWrapper struct {cc *ClientConn// Since these fields are accessed only from handleXxx() methods which are// synchronized by the watcher goroutine, we do not need a mutex to protect// these fields.balancer *gracefulswitch.BalancercurBalancerName stringupdateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.closed *grpcsync.Event // Indicates if close has been called.done *grpcsync.Event // Indicates if close has completed its work.}
里面有两个channel分别是处理更新时间和执行结果
func (ccb *ccBalancerWrapper) watcher() {case u := <-ccb.updateCh.Get():ccb.updateCh.Load()if ccb.closed.HasFired() {break}switch update := u.(type) {case *ccStateUpdate:ccb.handleClientConnStateChange(update.ccs)
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})}
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {ccb.updateCh.Put(&scStateUpdate{
回过头来,我们就可以看到,我们发起连接的时候,exitIdle具体干了什么,就是往channel里发送更新的事件:
func (ccb *ccBalancerWrapper) exitIdle() {ccb.updateCh.Put(&exitIdleUpdate{})}
对应的处理事件如下:如果状态是Idle,执行ExitIdle,退出空闲状态
func (ccb *ccBalancerWrapper) handleExitIdle() {if ccb.cc.GetState() != connectivity.Idle {return}ccb.balancer.ExitIdle()}
channel的类型是一个无限长的队列,每次处理第一个,如果来不及处理,就放在对接末尾,类似nginx的backlog队列:
google.golang.org/grpc@v1.46.0/internal/buffer/unbounded.go
type Unbounded struct {c chan interface{}mu sync.Mutexbacklog []interface{}}
func (b *Unbounded) Put(t interface{}) {if len(b.backlog) == 0 {select {case b.c <- t:b.backlog = append(b.backlog, t)
func (b *Unbounded) Load() {if len(b.backlog) > 0 {select {case b.c <- b.backlog[0]:b.backlog[0] = nilb.backlog = b.backlog[1:]
func (b *Unbounded) Get() <-chan interface{} {
分析完队列后,我们看下ExitIdle的内容是什么,源码位于:
google.golang.org/grpc@v1.46.0/internal/balancer/gracefulswitch/gracefulswitch.go
func (gsb *Balancer) ExitIdle() {for sc := range balToUpdate.subconns {sc.Connect()}
对每个subConn都尝试着去进行连接,这里才是发起服务端连接的真正地方。subconns保存在balancerWrapper里。
type balancerWrapper struct {balancer.Balancergsb *BalancerlastState balancer.Statesubconns map[balancer.SubConn]bool // subconns created by this balancer}
当需要更新连接状态的时候,根据传入的状态,发送对应的消息,进行连接状态的更新:
func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {if state.ConnectivityState == connectivity.Shutdown {delete(bw.subconns, sc)bw.Balancer.UpdateSubConnState(sc, state)
真正执行状态更新的函数是:
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {balToUpdate = gsb.balancerCurrent} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {balToUpdate = gsb.balancerPending}balToUpdate.UpdateSubConnState(sc, state)
SubConn的定义位于google.golang.org/grpc@v1.46.0/balancer/balancer.go
type SubConn interface {// UpdateAddresses updates the addresses used in this SubConn.// gRPC checks if currently-connected address is still in the new list.// If it's in the list, the connection will be kept.// If it's not in the list, the connection will gracefully closed, and// a new connection will be created.//// This will trigger a state transition for the SubConn.//// Deprecated: This method is now part of the ClientConn interface and will// eventually be removed from here.UpdateAddresses([]resolver.Address)// Connect starts the connecting for this SubConn.Connect()}
这里定义了Picker接口,用来从连接池中选择一个可用连接
type Picker interface {// Pick returns the connection to use for this RPC and related information.//// Pick should not block. If the balancer needs to do I/O or any blocking// or time-consuming work to service this call, it should return// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when// the Picker is updated (using ClientConn.UpdateState).//// If an error is returned://// - If the error is ErrNoSubConnAvailable, gRPC will block until a new// Picker is provided by the balancer (using ClientConn.UpdateState).//// - If the error is a status error (implemented by the grpc/status// package), gRPC will terminate the RPC with the code and message// provided.//// - For all other errors, wait for ready RPCs will wait, but non-wait for// ready RPCs will be terminated with this error's Error() string and// status code Unavailable.Pick(info PickInfo) (PickResult, error)}
并且定义了Banlancer的接口,用户可以自定义banlancer实现这个接口,包括UpdateClientConnState和 UpdateSubConnState
type Balancer interface {// UpdateClientConnState is called by gRPC when the state of the ClientConn// changes. If the error returned is ErrBadResolverState, the ClientConn// will begin calling ResolveNow on the active name resolver with// exponential backoff until a subsequent call to UpdateClientConnState// returns a nil error. Any other errors are currently ignored.UpdateClientConnState(ClientConnState) error// ResolverError is called by gRPC when the name resolver reports an error.ResolverError(error)// UpdateSubConnState is called by gRPC when the state of a SubConn// changes.UpdateSubConnState(SubConn, SubConnState)// Close closes the balancer. The balancer is not required to call// ClientConn.RemoveSubConn for its existing SubConns.Close()}
官方包里给了很多种banlancer的实现,比如ringhash
google.golang.org/grpc@v1.46.0/xds/internal/balancer/ringhash/ringhash.go
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {b := &ringhashBalancer{cc: cc,subConns: make(map[resolver.Address]*subConn),scStates: make(map[balancer.SubConn]*subConn),csEvltr: &connectivityStateEvaluator{},}
2,用户发起客户端请求的时候的调用过程
源码入口位于:google.golang.org/grpc@v1.46.0/call.go
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {return invoke(ctx, method, args, reply, cc, opts...)
它会创建一个clientStream然后发送和接收消息:
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
google.golang.org/grpc@v1.46.0/stream.go
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {cs.finish(err)return nil, err}
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
google.golang.org/grpc@v1.46.0/clientconn.go,从banlancer中pick一个链接
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error)t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{Ctx: ctx,FullMethodName: method,})
google.golang.org/grpc@v1.46.0/picker_wrapper.go,通过wrapper在用户自定义balancer里面的picke方法,获取连接
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {pickResult, err := p.Pick(info)acw, ok := pickResult.SubConn.(*acBalancerWrapper)if t := acw.getAddrConn().getReadyTransport(); t != nil {
3,关闭连接的过程
源码入口位于:google.golang.org/grpc@v1.46.0/clientconn.go
func (cc *ClientConn) Close() error {cc.csMgr.updateState(connectivity.Shutdown)for ac := range conns {ac.tearDown(ErrClientConnClosing)}
依次关闭连接池中的所有连接。
func (ac *addrConn) tearDown(err error) {tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct// will leak. In most cases, call cc.removeAddrConn() instead.ac.updateConnectivityState(connectivity.Shutdown, nil)
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {ac.state = sac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
它也是通过更细状态的方式来影响连接池状态机的
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {cc.balancerWrapper.updateSubConnState(sc, s, err)}

