研讨一款入门级其他 RPC 结构 — XXL-RPC

XXL-RPC 结构是 XXL-JOB 作者出品, 官网地址 分布式服务结构XXL-RPC

XXL-RPC 是一款轻量、简略,但五脏俱全的 RPC 结构,本地环境搭建十分简略, 对初学 RPC 的人来说,十分的友好,建议阅览学习,关于了解 RPC 大有裨益。 本文也是环绕 XXL-RPC 结构行文的。

官网文章现已写得很好,而本文是基于自己的学习和了解而成文的;内容会涉及源码,篇幅偏长。为了更好了解 XXL-PRC,需求有 Spring 初始化进程和 Netty 知识背景。

一、RPC 根底

1.1 入门了解

PRC 是为调用长途的服务像调用本地一样服务简略。

举例:在 node1 节点上调用 node2 的 serviceB,就像 node1 上调用 node1 上的 serviceB 一样。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

要完成这一方针,由上图可见,通讯组件必不可少。假如不考虑集群,能够将 node2 的各种信息装备写入 node1,可是考虑集群的扩容和缩容,就必需要有一个组件能及时地更新各节点的信息,暂时称这个组件为服务注册与发现组件。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

对 RPC 有了最根本的了解后,接着咱们再进一步看 RPC 的理论和规划。

1.2 根底原理和规划

假如 node1 要调用 node2 服务就需求规划一个简略的通讯;能够粗略概括如下:

研讨一款入门级其他 RPC 结构 -- XXL-RPC

为了更好的了解上图,补充一下术语:

术语 了解
stub Stub 是在客户端端点的署理程序,用于代表客户端应用程序与长途服务进行通讯。它躲藏了底层的网络通讯细节,当客户端应用程序调用 stub 中的函数时,stub 将担任将恳求打包并经过网络发送给长途服务端。
skeleton Skeleton 是在服务端端点的署理程序,用于接纳来自客户端的恳求,并将其解包后传递给实践的服务完成。Skeleton 躲藏了底层的网络通讯细节,并担任将恳求分派到相应的服务完成函数上。当服务完成函数履行完毕后,skeleton将担任将成果打包并发送给客户端

stub 和 skeleton 是 RPC 中用于署理客户端和服务端之间通讯的组件。

RPC 并没有具体规范,可是大致原理是一样的, 一次简略 RPC 调用进程如下:

研讨一款入门级其他 RPC 结构 -- XXL-RPC

图来源网络

有了这些根底概念以后,接下来咱们正式进入 XXL-RPC 这个结构的学习。

二、XXL-RPC 的规划和理论

2.1 XXL-RPC 的规划

XXL-RPC 中关于 RPC 的一次调用进程,原理分析如下,作者的规划完成,本文的代码分析也是环绕下面这张图具体展开了解的。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

其间 TCP 的模型, XXL-RPC 选用 NIO 进行底层通讯。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

XXL-RPC 底层通讯运用了 Netty。 假如要想学习了解 RPC 结构, Netty 是必需要掌握的。

客户端能够跟每一个服务供给者( ip + port )树立一个 socket 链接。 假如服务供给者是集群,那么每个客户端能够跟这些服务供给者逐个树立链接,形成连接池。客户端恳求的时分,依据负载均衡算法从中取一个进行运用即可。

// 地址由 ip 和 port 组成
this.registryAddress = IpUtil.getIpPort(this.ip, this.port);

2.2 XXL-RPC 工程

XXL-RPC 工程代码如下,有两个示例,接下来就从示例进行下手 (provider 和 consumer/invoker) 到这儿了,不妨拉一下源码看看。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

工程是以 spring 为根底的,从 @XxlRpcService@XxlRpcReference 两个注解下手比较合适。而两个注解都是经过 spring 加载进程的后置处理器进行处理,假如把这两个注解捋顺了,整个进程就根本明晰了。

/xxl-rpc-sample-springboot-client :服务消费方 consumer 调用示例
/xxl-rpc-sample-springboot-server :服务供给方 provider 示例

三、源码了解

@XxlRpcService@XxlRpcReference 结构是依靠 spring 容器。在发动进程中,后置处理器会进行各自的初始化处理,源码了解就环绕这两个注解进行展开。

3.1 @XxlRpcService

XxlRpcService 处理的入口是 XxlRpcSpringProviderFactory, 也是服务供给者的中心类; XxlRpcSpringProviderFactory 完成 ApplicationContextAware 和 InitializingBean 两个接口, 这个类在创立的时分,会扫描一切 XxlRpcService 的注解类。在 afterPropertiesSet 中发动 Netty 服务端。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

接下来具体查看一下源码:

  1. 扫描一切 XxlRpcService 类,Spring 后置处理,比较简略。
// 将带有 XxlRpcService 注解的一切类都解析放入到 Map 中
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
    if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
        for (Object serviceBean : serviceBeanMap.values()) {
            // valid
            if (serviceBean.getClass().getInterfaces().length ==0) {
                throw new XxlRpcException("XXL-RPC, service(XxlRpcService) must inherit interface.");
            }
            // add service
            XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);
            String iface = serviceBean.getClass().getInterfaces()[0].getName();
            String version = xxlRpcService.version();
            // iface + version 作为 key,将其放入到 map 中
            super.addService(iface, version, serviceBean);
        }
    }
}
  1. 回调 afterPropertiesSet ,触发 Netty 服务端的创立。
public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {
    // 回调调用 start 创立服务    
    @Override
    public void afterPropertiesSet() throws Exception {
        super.start();
    }
}
  1. 创立 Netty 服务端

Netty 服务端发动代码; 事务中心处理类是 NettyServerHandler 类;这个类会处理具体恳求,不杂乱,模板式的代码。

public class NettyServer extends Server {
    private Thread thread;
    @Override
    public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // 设置线程数大小。默认 60、300 
                final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(
                        NettyServer.class.getSimpleName(),
                        xxlRpcProviderFactory.getCorePoolSize(),
                        xxlRpcProviderFactory.getMaxPoolSize());
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                try {
                    // start server
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            // 心跳检活
                                            .addLast(new IdleStateHandler(0,0, Beat.BEAT_INTERVAL*3, TimeUnit.SECONDS))     // beat 3N, close if idle                     
                                            // 编解码
                                            .addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializerInstance()))
                                            .addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializerInstance()))
                                            // 处理恳求的具体类
                                            .addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
                                }
                            })
                            .childOption(ChannelOption.TCP_NODELAY, true)
                            .childOption(ChannelOption.SO_KEEPALIVE, true);
                    // bind
                    ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();
                    // 激活服务注册
                    onStarted();
                    // wait util stop
                    future.channel().closeFuture().sync();
                .......
    }

上面的 Netty 服务端发动代码是十分规范的模板式代码。具体调用进程如下:

研讨一款入门级其他 RPC 结构 -- XXL-RPC

  1. NettyServerHandler 服务恳求处理的 handler

在这个 handler 中,将任务提交到线程池;再经过反射处理恳求,再将成果写回 Netty 通道。

@Override
public void channelRead0(final ChannelHandlerContext ctx, final XxlRpcRequest xxlRpcRequest) throws Exception {
     ......
    // do invoke
    try {
        serverHandlerPool.execute(new Runnable() {
            @Override
            public void run() {
                // invoke + response
                // 真实做调用的当地
                XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
                // 写回数据
                ctx.writeAndFlush(xxlRpcResponse);
            }
        });
    } catch (Exception e) {
        .......
        ctx.writeAndFlush(xxlRpcResponse);
    }
}
  1. 反射调用接口 xxlRpcProviderFactory.invokeService。 从 Map 中寻找 service,然后经过反射进行调用。
// 经过反射进行调用
Class<?> serviceClass = serviceBean.getClass();
String methodName = xxlRpcRequest.getMethodName();
Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
Object[] parameters = xxlRpcRequest.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result = method.invoke(serviceBean, parameters);

服务端的代码仍是比较简略和明晰。大致有以下几个进程:

  1. XxlRpcSpringProviderFactory 完成了 ApplicationContextAware 接口,所以 spring 发动会回调; 在回调的时分,将一切 XxlRpcService 注解的类都标记成为服务供给的类,用一个 Map 容器承载。
  2. XxlRpcSpringProviderFactory 完成了 InitializingBean 接口,在一切类都初始化完成后,回调 afterPropertiesSet() 办法,这个时分会经过这个办法发动 Netty 服务端。
  3. Netty 服务端是十分规范的模板代码,其间 NettyServerHandler 为中心的事务 handler,用来处理服务恳求的中心。
  4. NettyServerHandler 中运用线程池来,这样能够处理许多恳求;进行具体事务办法的调用
  5. xxlRpcProviderFactory.invokeService 完成具体办法调用。

上面的代码便是服务端(provider) 发动和调用的进程,比较简略明晰的。

服务注册发动是在 Netty 发动的时分,回调 start 事件发动的

接下来讲解客户端(consumer/invoker) 的一个发动和调用进程

3.2 @XxlRpcReference

带有 XxlRpcReference 注解接口,客户端都没有显现的完成类, 因而需求依靠 XxlRpcReference 注解,署理生成一个具体的完成类,这个完成类在被调用具体办法的时分,能够映射成长途办法恳求,并将成果再回来

研讨一款入门级其他 RPC 结构 -- XXL-RPC

整个调用能够分为以下两个中心点:

  • 署理方针生成
  • 长途办法调用

整理整个进程,大致如下:

研讨一款入门级其他 RPC 结构 -- XXL-RPC

接下来要点分析生成可长途调用方针的代码。

  1. XxlRpcSpringInvokerFactory 为入口类。在这个类,需求触发生成 @XxlRpcReference 的具体署理方针。

下图是发动的时分去填充 @XxlRpcReference 描绘的字段代码

研讨一款入门级其他 RPC 结构 -- XXL-RPC

比如 DemoService 被 @XxlRpcReference 描绘, 就需求生成具体的署理方针

@Controller
public class IndexController {
   @XxlRpcReference
   private DemoService demoService;
   ......
}

代码如下:给一切 @XxlRpcReference 描绘的特点字段生成具体的署理方针

public class XxlRpcSpringInvokerFactory implements InitializingBean, DisposableBean, BeanFactoryAware, InstantiationAwareBeanPostProcessor {
    .......
    // spring 后置处理接口,填充特点字段
    @Override
    public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {
        // collection
        final Set<String> serviceKeyList = new HashSet<>();
        // parse XxlRpcReferenceBean 
        // 处理特点字段
        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
            @Override
            public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                if (field.isAnnotationPresent(XxlRpcReference.class)) {
                    // valid
                    Class iface = field.getType();
                    if (!iface.isInterface()) {
                        throw new XxlRpcException("XXL-RPC, reference(XxlRpcReference) must be interface.");
                    }
                    XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);
                    // init reference bean
                    XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean();
                   ......
                   referenceBean.setInvokerFactory(xxlRpcInvokerFactory);
                    // get proxyObj 生成署理方针
                    Object serviceProxy = null;
                    try {
                        // 中心,生成长途调用的署理方针 
                        serviceProxy = referenceBean.getObject();
                    } catch (Exception e) {
                        throw new XxlRpcException(e);
                    }
                    // set bean 反射设置特点
                    field.setAccessible(true);
                    field.set(bean, serviceProxy);
                    ......
                }
            }
        });
}
  1. serviceProxy = referenceBean.getObject(); 包装成长途调用的署理方针

研讨一款入门级其他 RPC 结构 -- XXL-RPC

整个客户端的中心代码,有必要认真阅览一下

阅览前需求先了解 callType 有四种方法,而 referenceBean.getObject() 完成了下面 4 种方法的调用:

callType 描绘
SYNC 同步等回来成果
FUTURE future 获取成果
CALLBACK 回来成果进行办法回调
ONEWAY 不关注回来成果

由于 NIO 是异步通讯模型,调用线程并不会阻塞获取调用成果,因而,XXL-RPC 完成了在异步通讯模型上的同步调用,即 “sync-over-async” 。即怎么恰当地回来异步成果,下图是作者规划的思路,即经过 wait 和 notify 来完成异步成果的获取。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

有了上面的了解,咱们再阅览中心的生成署理方针的代码就比较简略了。

下面生成署理的代码也是环绕怎么进行长途调用(省掉参数封装、泛化调用), 逻辑并不杂乱

public Object getObject() throws Exception {
    ......
   // newProxyInstance 生成署理方针
   return Proxy.newProxyInstance(Thread.currentThread()
         .getContextClassLoader(), new Class[] { iface },
         new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    .......
                    // send  中心,发送长途调用,对应四种方法
               if (CallType.SYNC == callType) {
                  // future-response set
                  XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                  try {
                     // do invoke 
                     clientInstance.asyncSend(finalAddress, xxlRpcRequest);
                     // future get
                     XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                     if (xxlRpcResponse.getErrorMsg() != null) {
                        throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                     }
                     return xxlRpcResponse.getResult();
                  } catch (Exception e) {
                     .....
                  } finally{
                     // future-response remove
                     futureResponse.removeInvokerFuture();
                  }
               } else if (CallType.FUTURE == callType) {
                  // future-response set
                  XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                           try {
                     // invoke future set
                     XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
                     XxlRpcInvokeFuture.setFuture(invokeFuture);
                               // do invoke
                     clientInstance.asyncSend(finalAddress, xxlRpcRequest);
                               return null;
                           } catch (Exception e) {
                     logger.info(">>>>>>>>>>> XXL-RPC, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
                     // future-response remove
                     futureResponse.removeInvokerFuture();
                    ......
               } else if (CallType.CALLBACK == callType) {
                  // get callback
                  XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
                  XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
                  if (threadInvokeCallback != null) {
                     finalInvokeCallback = threadInvokeCallback;
                  }
                  if (finalInvokeCallback == null) {
                     throw new XxlRpcException("XXL-RPC XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
                  }
                  // future-response set
                  XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
                  try {
                     clientInstance.asyncSend(finalAddress, xxlRpcRequest);
                  } catch (Exception e) {
                    .......
                  }
                  return null;
               } else if (CallType.ONEWAY == callType) {
                  clientInstance.asyncSend(finalAddress, xxlRpcRequest);
                 return null;
                ....
               }
            }
         });
}

关于怎么完成 “sync-over-async” 细节参考 3.3 sync 和 future 形式。

终究的长途通讯仍是运用 Netty。

  1. clientInstance.asyncSend(finalAddress, xxlRpcRequest);
@Override
public void send(XxlRpcRequest xxlRpcRequest) throws Exception {
    this.channel.writeAndFlush(xxlRpcRequest).sync();
}

做一个简略的总结:

  1. XxlRpcSpringInvokerFactory 在发动的时分,完成了 postProcessAfterInstantiation 接口,在方针填充特点前调用,这个时分对具有 XxlRpcReference 特点字段的方针做该字段的填充,即生成具体的署理方针
  2. XxlRpcReferenceBean 署理中心类,最中心的作用便是包装,长途调用。
  3. 长途调用供给了四种类型。中心是完成 “sync-over-async”

根本上咱们现已摸清楚 provider/服务端 和 consumer/Invoker/客户端的完成和调用进程。

下面再追加 sync 和 future 形式的了解。

3.3 sync 和 future 形式

经过 wait() 和 notifyAll() 来处理异步成果,作者的解释,能够结合图进行了解

  • 1、consumer建议恳求:consumer 会依据长途服务的 stub 实例化长途服务的署理服务,在建议恳求时,署理服务会封装本次恳求相关底层数据,如服务iface、methods、params等等,然后将数据经过 serialization 之后发送给provider;
  • 2、provider 接纳恳求:provider 接纳到恳求数据,首先会deserialization获取原始恳求数据,然后依据 stub 匹配方针服务并调用;
  • 3、provider 响应恳求:provider 在调用方针服务后,封装服务回来数据并进行serialization,然后把数据传输给 consumer;
  • 4、consumer 接纳响应:consumer 接纳到相应数据后,首先会deserialization 获取原始数据,然后依据 stub 生成调用回来成果,回来给恳求调用处。结束。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

get() 等候获取成果方针

@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
   if (!done) {
     // lock.wait() 等候调集
      synchronized (lock) {
         try {
            if (timeout < 0) {
               lock.wait();
            } else {
               long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
               lock.wait(timeoutMillis);
            }
         } catch (InterruptedException e) {
            throw e;
         }
      }
   }
   ......
   return response;
}

调用成功后,在 notifyAll()


// ---------------------- for invoke back ----------------------
public void setResponse(XxlRpcResponse response) {
   this.response = response;
   // 触摸等候
   synchronized (lock) {
      done = true;
      lock.notifyAll();
   }
}

规划仍是比较精彩。

下面是 future 方法获取成果方针的方法。

// invoke future set
XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
XxlRpcInvokeFuture.setFuture(invokeFuture);
// do invoke
clientInstance.asyncSend(finalAddress, xxlRpcRequest);

重写了 future 。 其间 get 也是在复用上面才能。

public class XxlRpcInvokeFuture implements Future {
    .......
    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            // 也是运用 await 和 notifyAll
            XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit);
            if (xxlRpcResponse.getErrorMsg() != null) {
                throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
            }
            return xxlRpcResponse.getResult();
        } finally {
            stop();
        }
    }
    .......
}

经过上面两个注解进程的讲解,RPC 的调用进程就现已讲完了。

接下来再略微分析一下服务注册与发现(上面的内容是中心要点)

四、服务中心体系规划(粗讲)

内部经过播送机制,集群节点实时同步服务注册信息,保证共同。客户端凭借 long pollong 实时感知服务注册信息,简练、高效;

研讨一款入门级其他 RPC 结构 -- XXL-RPC

4.1 服务注册

服务端的调用是在 Netty 发动后注册的回调事件进行服务注册的。

serverInstance.setStartedCallback(new BaseCallback() {     // serviceRegistry started
   @Override
   public void run() throws Exception {
      // start registry 注册信息
      if (serviceRegistry != null) {
         registerInstance = serviceRegistry.newInstance();
         registerInstance.start(serviceRegistryParam);
         if (serviceData.size() > 0) {
            // XxlRpcService 一切的接口。进行注册
            registerInstance.registry(serviceData.keySet(), registryAddress);
         }
      }
   }
});
serverInstance.setStopedCallback(new BaseCallback() {     // serviceRegistry stoped
   @Override
   public void run() {
      // stop registry 
      if (registerInstance != null) {
         if (serviceData.size() > 0) {        
            registerInstance.remove(serviceData.keySet(), registryAddress);
         }
         registerInstance.stop();
         registerInstance = null;
      }
   }
});

然后在内部发动一个独立的线程,每隔 10 秒进行,上报注册信息

// registry thread
registryThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while (!registryThreadStop) {
            try {
                if (registryData.size() > 0) {
                    // 运用 http 上报元数据信息
                    boolean ret = registryBaseClient.registry(new ArrayList<XxlRpcAdminRegistryDataItem>(registryData));
                    logger.debug(">>>>>>>>>>> xxl-rpc, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
                }
            } catch (Exception e) {
                 ......
            try {
                // 歇息 10 s
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
             ......
            }
        }
    }
});

服务供给者在发动 Netty 后,上报数据到服务注册中心。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

运用的是 http 往注册中心进行上报(注册进程)。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

研讨一款入门级其他 RPC 结构 -- XXL-RPC

4.2 服务发现

客户端调用,便是拉取注册中心的数据,选择一个合适的地址进行调用(负载均衡算法)

if (finalAddress==null || finalAddress.trim().length()==0) {
   if (invokerFactory!=null && invokerFactory.getRegister()!=null) {
      // 拉取数据
      String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
      TreeSet<String> addressSet = invokerFactory.getRegister().discovery(serviceKey);
      // 负载均衡调用
      if (addressSet==null || addressSet.size()==0) {
         // pass
      } else if (addressSet.size()==1) {
         finalAddress = addressSet.first();
      } else {
         finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
      }
   }
}

另外在程序中有一个独立的线程在不断地进行数据的守时改写 com.xxl.rpc.core.registry.impl.xxlrpcadmin.XxlRpcAdminRegistryClient#XxlRpcAdminRegistryClient。 中心代码 refreshDiscoveryData

discoveryThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while (!registryThreadStop) {
           ......
            } else {
                try {
                     .......
                    // 守时更新数据。(10s) 
                    refreshDiscoveryData(discoveryData.keySet());
                } catch (Exception e) {
                 .......
        }
    }
});

在 xxl-rpc-admin 模块中 com.xxl.rpc.admin.service.impl.XxlRpcRegistryServiceImpl#afterPropertiesSet 有几个线程在不断改写数据和播送音讯,完成服务治理,限于篇幅就不再深入。

负载均衡、泛化调用、服务监控就不逐个展开了,感兴趣的能够阅览代码进行研讨。

到这儿,对 RPC 的整体完成现已有了一个明晰的认识了。再来看看作者这张架构图就不难了解了。

研讨一款入门级其他 RPC 结构 -- XXL-RPC

✒️五、总结

XXL-RPC 是比较小而美的 RPC 结构,很轻量、简略; 关于 RPC 初学者十分的友好。 经过学习这个结构再去了解 Dubbo 就会比较轻松。

XXL-RPC 代码结构比较明晰, 底层 Netty 也是模板式的代码,XXL-RPC 作者对线程的了解也十分到位,值得推荐。

由于轻量,所以许多细节并没有考虑周全,比如线程池的高雅关闭等,这些问题,咱们学习 Dubbo 的时分再细说,本文到此结束。

具体地址 XXL-RPC。