前语

本章依据dubbo2.7.6版本,分析rpc调用流程。

依据上一章服务露出与引用(/post/721094…),全部简直瓜熟蒂落。

笔者将一次rpc同步调用拆分为三个阶段:

1)用户代码履行rpc办法,consumer发送rpc恳求给provider

2)provider处理rpc恳求呼应consumer

3)consumer收到呼应,回来用户代码

在总结部分整理了rpc调用流程。

发送rpc恳求

Dubbo源码(三)rpc调用流程

署理层

InvokerInvocationHandler#invoke:

1)把rpc办法的要害信息,都包装为一个RpcInvocation贯穿Invoker#invoke;

这个RpcInvocation就不细看了,目的无非是将长途调用需求的信息都封装为一个pojo,

相似于咱们往常写事务代码的时分aop用的MethodInvocation。

2)履行署理Invoker,一直通到DubboInvoker;

3)result.recreate:假如产生rpc反常,抛出,不然回来rpc办法回来值;

Dubbo源码(三)rpc调用流程

Cluster层

ClusterInterceptor

在上一章提到过,Cluster#join回来的Invoker会被ClusterInterceptor激活扩展点包一层,但是ClusterInterceptor并没有完结Invoker,所以要用适配器形式包一遍。

AbstractCluster内部类InterceptorInvokerNode,担任适配ClusterInterceptor完结到Invoker。

protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
    private AbstractClusterInvoker<T> clusterInvoker;
    private ClusterInterceptor interceptor;
    private AbstractClusterInvoker<T> next;
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult;
        try {
            // 前置阻拦
            interceptor.before(next, invocation);
            // 履行
            asyncResult = interceptor.intercept(next, invocation);
        } catch (Exception e) {
            // ...
            throw e;
        } finally {
            // 后置阻拦
            interceptor.after(next, invocation);
        }
        return asyncResult.whenCompleteWithContext((r, t) -> {
            // rpc恳求完结回调ClusterInterceptor.Listener
            if (interceptor instanceof ClusterInterceptor.Listener) {
                ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                if (t == null) {
                    listener.onMessage(r, clusterInvoker, invocation);
                } else {
                    listener.onError(t, clusterInvoker, invocation);
                }
            }
        });
    }
}

ConsumerContextClusterInterceptor:担任创立和整理rpc上下文,合作完结隐式传参特性。

  • before:发起rpc恳求前,创立rpc恳求上下文,铲除rpc呼应上下文
  • after:发起rpc恳求完结,还未收到rpc呼应,铲除rpc恳求上下文
  • onMessage:收到rpc呼应,且呼应成功,创立rpc呼应上下文
  • onError:rpc呼应失败,什么都不做;
@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {
    @Override
    public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 创立rpc【恳求】上下文
        RpcContext.getContext()
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        // 铲除rpc【呼应】上下文
        RpcContext.removeServerContext();
    }
    @Override
    public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
        // 铲除rpc【恳求】上下文
        RpcContext.removeContext();
    }
    @Override
    public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 创立rpc【呼应】上下文
        RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
    }
    @Override
    public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {
    }
}

其间rpc恳求上下文对应RpcContext.LOCAL,rpc呼应上下文对应RpcContext.SERVER_LOCAL。

关于服务调用方,

RpcContext.LOCAL的生命周期,是一次rpc恳求,用户也可以在rpc办法调用前,自动初始化RpcContext.LOCAL完结隐式传参

RpcContext.SERVER_LOCAL的生命周期很长,是一次rpc呼应到下一次rpc恳求之前,也便是说用户可以在rpc办法调用结束后一直使用本次rpc调用的呼应上下文,直至产生下次rpc调用

Dubbo源码(三)rpc调用流程

RpcContext中的InternalThreadLocal是学习了Netty的FastThreadLocal( /post/694493… ,用数组替代hashmap完结的threadlocal,里边javadoc都和FastThreadLocal如出一辙。

Dubbo源码(三)rpc调用流程

ClusterInvoker

AbstractClusterInvoker#invoke:本质上ClusterInvoker是依据一系列条件,从一切服务提供者Invoker中挑选其间一个Invoker履行。

Dubbo源码(三)rpc调用流程

1)Directory#list:列出Invocation对应Invokers(路由)

Dubbo源码(三)rpc调用流程

2)AbstractClusterInvoker#initLoadBalance:依据url参数找LoadBalance扩展,默许RandomLoadBalance

Dubbo源码(三)rpc调用流程

3)AbstractClusterInvoker#doInvoke:子类完结,会有不同的集群容错方式

路由

RegistryDirectory#doList:通过RouterChain路由过滤后回来Invoker集合。

需求留意的是,假如reference指定多个group,则不包含路由逻辑,直接回来一切Invoker。

Dubbo源码(三)rpc调用流程

在服务订阅之前,RegistryProtocol会用订阅url结构RouterChain注入RegistryDirectory。

Dubbo源码(三)rpc调用流程

RouterChain结构时会依据订阅url获取RouterFactory激活扩展点,通过RouterFactory#getRouter创立Router对象。

Dubbo源码(三)rpc调用流程

RouterChain#route:循环一切Router,过滤invokers。

留意:invokers可以认为是内存注册表(rpc服务级别),只要注册中心providers列表变更,这儿才会更新,rpc期间不强依赖注册中心的长途注册表。

Dubbo源码(三)rpc调用流程

默许情况下会有四个Router:

1)MockInvokersSelector:本地伪装特性,疏忽

2)TagRouter:依据tag过滤Invoker

支持三种形式装备tag路由,优先级从高到低:

a)装备中心路由规矩;b)RpcContext指定tag;c)reference指定tag

比方reference指定tag为gray。

ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("consumer-app"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setTag("gray");

TagRouter优先会找tag=gray的provider,假如找不到tag=gray的provider,会取tag=null的provider。

Dubbo源码(三)rpc调用流程

3)ServiceRouter:依据rpc服务的路由,需求结合装备中心

Dubbo源码(三)rpc调用流程

4)AppRouter:依据应用的路由,需求结合装备中心

Dubbo源码(三)rpc调用流程

集群容错

FailoverClusterInvoker是默许集群容错完结。

假如产生故障,从头挑选Invoker进行长途调用,最多会调用3次(retry=2)。

Dubbo源码(三)rpc调用流程

AbstractClusterInvoker#select:在实践挑选invoker前,优先走了一圈粘滞衔接特性。

Dubbo源码(三)rpc调用流程

AbstractClusterInvoker#doSelect:

1)假如invokers只要一个,直接回来第一个

2)调用loadbalance履行一次挑选,回来invoker

2-1)invoker现已被选过或者invoker不可用,再次进行一次挑选

2-2)不然回来loadbalance的挑选,结束

3)从头挑选invoker,假如仍然为空,兜底回来上次invoker的下一个invoker

Dubbo源码(三)rpc调用流程

AbstractClusterInvoker#reselect逻辑如下:

1)获取未挑选过的可用invokers,再次履行loadbalance

2)假如第一类invokers为空,把已选过的可用invokers也考虑进去(关于failover来说,便是之前履行失败的invoker),再次履行loadbalance

Dubbo源码(三)rpc调用流程

负载均衡

当通过路由后还存在多个invokers,往往就需求loadbalance通过特定算法找到其间一个invoker。

默许完结是RandomLoadBalance,随机挑选invoker。

需求留意的是,大部分LoadBalance完结都有权重逻辑。

Dubbo源码(三)rpc调用流程

AbstractLoadBalance#getWeight:获取运行时不同provider对应invoker的权重

默许每个invoker权重都是100,依据provider的启动时刻和预热时刻可能会削减权重。

Dubbo源码(三)rpc调用流程

Protocol层

通过Cluster+Directory挑选,终究剩余一个服务提供者的ProtocolInvoker。

以Dubbo协议为例,终究会进入DubboInvoker

在进入DubboInvoker之前会通过一系列group=consumer的Filter,这儿略过。

在AbstractProtocol#refer引用时,自动在DubboInvoker外部包装了AsyncToSyncInvoker。

Dubbo源码(三)rpc调用流程

AsyncToSyncInvoker#invoke调用协议Invoker,待底层协议Invoker回来,假如是同步调用,堵塞等待rpc呼应。

Dubbo源码(三)rpc调用流程

DubboInvoker承继自AbstractInvoker抽象类。

AbstractInvoker#invoke做一些通用控制,比方填充RpcInvocation、反常封装到Result等等,底层调用DubboInvoker#doInvoke。

Dubbo源码(三)rpc调用流程

DubboProtocol#doInvoke:

1)getCallbackExecutor:决议计划处理rpc呼应的线程池

2)ExchangeClient#request:将Invocation、超时时刻、处理rpc呼应线程池提交给通讯层,得到Future回来;

Dubbo源码(三)rpc调用流程

关于一般同步调用来说,getCallbackExecutor每次会回来一个新的ThreadlessExecutor

Dubbo源码(三)rpc调用流程

ThreadlessExecutor的javadoc如下:

Dubbo源码(三)rpc调用流程

ThreadlessExecutor不办理任何线程,假如使命被提交到这个executor,不会马上被调度。

直到某个线程调用waitAndDrain办法,该线程当即履行这个使命。

这部分咱们到第三阶段再分析。

通讯层

一般情况下,rpc框架事务线程和io线程都是分开的。

为了解决这个问题,一般规划方式都遵从如下规矩:

1)【事务线程】为request分配仅有id

2)【事务线程】将request-id和request-future缓存到大局map

3)【守时线程】关于大局map做守时扫描,假如超时,用指定线程池履行future的回调

4)【io线程】发送恳求到io线程

5)【io线程】io线程收到呼应,从大局map依据呼应里的request-id拿到request-future,提交到6线程

6)【rpc呼应线程(异步)/事务线程(同步)】履行future的回调

HeaderExchangeChannel#request:这儿做了1234步,第四步便是channel#send。

Dubbo源码(三)rpc调用流程

首要Request创立时分配了自增id,作为rpc恳求id(1)。

public class Request {
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    private final long mId;
    public Request() {
        mId = newId();
    }
    private static long newId() {
        return INVOKE_ID.getAndIncrement();
    }
}

DefaultFuture#newFuture:创立DefaultFuture实例,提交超时检测使命(3)。

Dubbo源码(三)rpc调用流程

DefaultFuture结构时将request-id和自己放到大局map中(2)。

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : 
        channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    FUTURES.put(id, this); // 核心
    CHANNELS.put(id, channel);
}

序列化

DubboProtocol#initClient:在创立客户端衔接时,Dubbo协议序列化方式为dubbo。

Dubbo源码(三)rpc调用流程

终究Request都会走DubboCountCodec进行序列化,序列化细节疏忽。

Dubbo源码(三)rpc调用流程

同步调用堵塞等待

DubboInvoker将rpc恳求提交到通讯层后,直接回来。

在AsyncToSyncInvoker中由于是同步调用,走AsyncRpcResult#get堵塞等待rpc呼应。

Dubbo源码(三)rpc调用流程

关于同步调用AsyncRpcResult#get会自动调用ThreadlessExecutor#waitAndDrain。

Dubbo源码(三)rpc调用流程

ThreadlessExecutor#waitAndDrain:堵塞等待rpc呼应使命投递,当呼应到来,当前线程履行rpc呼应使命。

Dubbo源码(三)rpc调用流程

之所以这个Executor的行为不同,是因为重写了execute办法。

当没有调用waitAndDrain履行使命前,使命会投递到queue中。

至于rpc呼应,放到第三阶段持续看。

Dubbo源码(三)rpc调用流程

处理rpc恳求

反序列化

同样,反序列化也是走DubboCountCodec,区别在于Request的data部分不是一个一般的RpcInvocation。

rpc恳求体会被反序列化为一个DecodeableRpcInvocation,承继自RpcInvocation。

Dubbo源码(三)rpc调用流程

DecodeableRpcInvocation是个特别的Invocation,特点在于可以自己依据输入流反序列化。

Dubbo源码(三)rpc调用流程

通讯层

AllChannelHandler#received:收到反序列化之后的Request,依据url找事务线程池履行rpc恳求。

Dubbo源码(三)rpc调用流程

在露出阶段,provider关于每个端口开启一个事务线程池,处理事务。

Dubbo源码(三)rpc调用流程

默许选用固定巨细200+无行列+日志打印拒绝策略的线程池。

Dubbo源码(三)rpc调用流程

HeaderExchangeHandler#handleRequest:履行ChannelEventRunnable

1)提交到ExchangeHandler完结,履行Invocation调用

2)注册future回调,当事务处理结束,再将rpc呼应提交到io线程,呼应客户端

Dubbo源码(三)rpc调用流程

接下来的流程,便是依据Request找到Invoker,通过层层包装,调用到rpc服务完结。

Dubbo源码(三)rpc调用流程

Protocol层

DubboProtocol#requestHandler:

1)getInvoker:依据Invocation找Invoker

2)履行Invoker

Dubbo源码(三)rpc调用流程

找Invoker

DubboProtocol#getInvoker:依据Invocation信息拼接serviceKey,定位到之前露出的DubboExporter,回来DubboExporter持有的Invoker。

Dubbo源码(三)rpc调用流程

Filter

接下来通过一系列group=provider的Filter。

介绍一个比较要害的ContextFilter

在进入下流invoker前,将Invocation中的要害信息注入RpcContext恳求rpc上下文中。

Dubbo源码(三)rpc调用流程

在rpc服务办法履行结束后,将RpcContext呼应rpc上下文中要害信息,注入rpc呼应。

Dubbo源码(三)rpc调用流程

署理层

接下来会来到Rpc服务署理层,露出阶段会通过ProxyFactory创立rpc服务完结署理,默许javassist完结。

Dubbo源码(三)rpc调用流程

AbstractProxyInvoker#invoke关于方针rpc办法履行包了一层,异步适配、反常处理等等。

Dubbo源码(三)rpc调用流程

关于javassist来说履行动态生成的Wrapper#invokeMethod,动态生成的Wrapper在上一章提到过,不再赘述。

留意:这儿proxy实践上是target,即rpc服务完结类实例。

Dubbo源码(三)rpc调用流程

HeaderExchangeHandler#handleRequest:再复述一次

当rpc服务办法履行结束,future完结后会将response提交到io线程,进入三阶段。

Dubbo源码(三)rpc调用流程

处理rpc呼应

反序列化

rpc呼应模型Response:

1)mId:对应request-id

2)mResult:DecodeableRpcResult实例

Dubbo源码(三)rpc调用流程

Response.mResult是DecodeableRpcResult实例。

DecodeableRpcResult完结和DecodeableRpcInvocation相似,自己支持反序列化,承继了AppResponse。

Dubbo源码(三)rpc调用流程

通讯层

AllChannelHandler#received:

当前是io线程(netty),依据入参message(Response)获取rpc呼应处理线程池,提交ChannelEventRunnable使命到对应线程池。

Dubbo源码(三)rpc调用流程

WrappedChannelHandler#getPreferredExecutorService:

依据恳求id找到挂起的DefaultFuture,再依据DefaultFuture找到绑定的线程池。

关于同步调用,这儿便是ThreadlessExecutor

Dubbo源码(三)rpc调用流程

io线程将ChannelEventRunnable,提交到ThreadlessExecutor之后,就能唤醒用户线程,完结同步调用

Dubbo源码(三)rpc调用流程

Protocol层

HeaderExchangeHandler持续处理,调用DefaultFuture#received。

Dubbo源码(三)rpc调用流程

DefaultFuture#received:从大局futures中移除挂起恳求

1)取消超时检测

2)完结future

Dubbo源码(三)rpc调用流程

DefaultFuture#doReceived:future以正常或反常完结。

Dubbo源码(三)rpc调用流程

AsyncToSyncInvoker调用Result#get从堵塞中回来。

Dubbo源码(三)rpc调用流程

署理层

最后回到rpc服务署理。

InvokerInvocationHandler#invoke:AsyncRpcResult#recreate获取成果。

Dubbo源码(三)rpc调用流程

AsyncRpcResult#recreate:关于同步调用,走底层AppResponse的recreate。

Dubbo源码(三)rpc调用流程

AppResponse#recreate:调用正常,回来反序列化后的result。

Dubbo源码(三)rpc调用流程

总结

rpc流程

一次同步rpc调用流程大致如下:

Dubbo源码(三)rpc调用流程

阶段一(consumer):

1)署理层:用户代码进入rpc服务署理,搜集rpc调用必要参数,封装RpcInvocation

2)Cluster层:履行Interceptor,通过路由、容错、负载均衡终究选定一个provider

3)Protocol层:

  • 履行Filter
  • 构建Request并缓存到内存map,后台线程扫描map校验Request是否超时
  • 提交Request到io线程,堵塞等待Response

4)通讯层:Request序列化,发送Request给对端

阶段二(provider):

1)通讯层:Request反序列化,得到RpcInvocation,提交到事务线程池并注册future回调,回调时将rpc回来值封装为Response并序列化提交到io线程

2)Protocol层:通过RpcInvocation找到之前露出的Exporter,找到Exporter对应的Invoker,履行Filter

3)署理层:调用rpc办法完结

阶段三(consumer):

1)通讯层:将Response反序列化,提交使命到事务线程池

2)Protocol层:依据恳求id反向找到挂起的future,取消超时检测,设置future履行成果

3)署理层:解析future回来成果,回来用户代码

一些细节

完结同步rpc调用

rpc调用事务线程和io线程一般是独立的,所以有如下规划:

1)【事务线程】为request分配仅有id

2)【事务线程】将request-id和request-future缓存到大局map

3)【守时线程】关于大局map做守时扫描,假如超时,用指定线程池履行future的回调

4)【io线程】发送恳求到io线程

5)【io线程】io线程收到呼应,从大局map依据呼应里的request-id拿到request-future,提交到6线程

6)【rpc呼应线程(异步)/事务线程(同步)】履行future的回调

一般框架底层通讯都会规划成异步,然后同步去适配异步。

不例外,dubbo规划了ThreadlessExecutor。

本质上ThreadlessExecutor是一个堵塞行列适配了ExecutorService。

Dubbo源码(三)rpc调用流程

事务线程会堵塞等待行列非空(queue.take),io线程收到response投递恳求到堵塞行列,唤醒事务线程,从而完结同步调用。

负载均衡算法

之前对默许负载均衡算法有一点误解,认为是纯粹的随机算法,实践上包含权重逻辑。

而权重逻辑中会包含warmup减权,默许warmup时长是10分钟,权重是100。

Dubbo源码(三)rpc调用流程

欢迎大家评论或私信评论问题。

本文原创,未经许可不得转载。

欢迎重视公众号【程序猿阿越】。