上一篇文章叙述了以RocketMQ源码的方法发起NameServerbroker进行单机布置及收发消息的流程,其实就是简略的quickstart,后端君在实际操作过之后就现已可以依据RocketMQ进行简略业务的音] l _ j &讯传递,结束比如异步消费搜集日志这样的小功用了。

RocketMQ的官网上还0 m E Y有许多不同品种的消息示例,建议想要学习RocketMQ的同学们先去手写一下这些Demo,了解过如次序消息、广播消息、守时消息、批消息等消息类型的特性之后再去看源码。

接下来便c j S !是学习RocketMQ的第二天正文内容,今日咱们来聊聊Name~ ! f BServer的发起C : o r流程以及NameServer首要功用的源码分析0 ? v ? R 8

NameServer的总述

NameServer是一个供给轻量g / ` ! u P # – 2级服务发现和路由的服务器,它首H % : S .要包括两个功用:

  • 代理处理,NameServerBroker集群承受注册,并供给心跳机制来检查Broker是否活动。
  • 路由处理,每个称谓服务器将保存关于Broker集群的整个路由& q 3 d r x 4 K E信息和用于客户机查询的行列信息。

咱们可以将NameServM 3 i B ! D Y l &er就当成是一个轻量级的注册中心。事实上,Q ( @ 8 6 6 3从前Rocketo G $ J a ) I ,MQ就用过Zookeeper来作r w E 为注册中心,后来因为RocketMQ本身架构的原因不7 8 R W : w需求像Zookee5 L X uper那样的推举机制来选择master节点,所以移除了Zookeeper依靠,并运用NameServer来替代。

作为RoS y & 5 I ; DcketMQ的注册中心,NameServer接搜集群中所有Broker的注册,并每隔10s供给心跳机制来检查Broker的是否,假如BrokerF v j V j 2 O w超越120s没有更新,那么将被视为失效并从集群中移除。

了解了NameServer的功用之后,后端君不由会想,NameServer的这些功用是怎么完成的?这就需求翻阅源码了。

RocketMQ.2-NameServer是怎么发起的
2020052403

NG : M ? $ h U 6 #ameserver发起流程分析

上一篇文章《快速入门》中也S : g h 3提到过,发起NameServer需求找到nam[ W [ esrv包中的发起类NamesrcStartup类,而研讨N_ N a u o wameServer的发起流程也需求从这个类的main方法Z Y V S g ` [ t初步。

解析配备

发起NameServer的第一. V y K 0 Q & 0步是结构一个NamesrvController实例,这个类是NameServerK X 9 u ) - P + K的中心类。

public static NamesrvController main0(String[] args) {
    try {
        // 结构 NamesrvController 类
        NamesrvController coL x Y c u C 8 o 4ntroller = createNamesrvController(args);
        // 初始化、发起 NamesrvCm 9 S W V g [ Rontroller 类
        start(controller);
        StriA u &ng tip = "The Name Server bb Z 0 @oot success. serializeType=" + RemotingCommand.getSeriE + v z  ualizeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catchR g T + w [ E _ (Throwabq 3 j 8le e) {
        e.A V K { dprintStackTrace();
        System.ex~ j H p i : it(-1);
    }
    return null;
}

createNamesrvController方法,就是从指令行接收参数,然后将解析成装R W F 5 Z ; B J备类NamesrvConfigNettyServerConfig

final NamesrvCos M $ enfig namesrvConfig = new NamesrvConfig();
final Netr Z D I i G & N xtyServerCy } 2 K I Vonfig netL C ) q F } | FtyServerConH N t +fig = new NettyServerConfig();
// RocketMQ 默许端口为9876
nettyServerCo] P l W G !nfig.setListenPort(9876);
// 通过 -c 参数指定配备文件
if (commandLine.hasOption('c? E = , 4 ~ :')) {
    String fi0 ; @ A d 0le = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyS} c y L = U * | !erverConfig);

        namesrvConfig.setConfigStorePathj + .(file);

        System.z y T 8 8 W V I 5out.printf("load config properties file OK, %s%n", file);
        in.close();
    }
}
// 通过 -p 参数打印当时装V t ) [备,并退出程序
if (commandLine.hasOption([ 8 w X E A H V !'p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(Logge! J 4rName.NAMESRV_CONSOLE_NAME);
    MixAll.printObject/ _ % X V NProperties(console,H , ] namesrvConfig);
    MixAll.printObjectProperties(console, nettyServerCon) = Efig);
    System.exit(0): s / G 8 M 5 e;
}
// 通过"--具体特色名 特色值"指定特色值
MixAll.properties2Obj3 * t  9ect(ServerUtil.commandLine2Properties(comz t 1 zmandLine), namesrvConfig);

咱们知道在指令行中运行RocketMQj - w C B + z ` 8是可以指定参数的,它的原理就是上面代码展示的那样。

通过-c指令可以指定配备文件,将配备文件中的内容解析成java.util.Properties类,然后赋值给NaI : DmesrvConfigNettyServerConfig类结束配备文件的解析与映射。

假如指定了-p指令,则会在控制台打印配备信息,然后程序直接退出。

除此之外还可以运用-n参数指定namesrvAddr的值,这是在org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions方法中指定的参数3 J 3 !,不在本节的评论范围内,有爱好的同学, 1 `可以自己去翻阅源码调试下看看。

当结束配备特色的映射,就会依据配备类N. I A I 7 G wamesrvConfigNettyServerConfig结构一个NamesrvController实例。

fin] * ; s 7 } Pal NamesrvController controller8 V L )  B M / = new NamesrvController(namesrvConfig, nettyServerCog 3 7 Znfig);

初始化及心跳机制

发起NameServer的第二步是通过NamesrvController#initialize结束初始化。

public boolean init# L K @ Qialize() {
    // 加载`kvConfig.? d Yjss @ % d : Bon`装 ] h备文件中的`KV`配备,然后将这些配备W & h : p M放到`KVConfigManager#configTable`特色中
    this.kvConfigManage] a ^ F kr.load();
    // 依据`NettyServerConfig`发起一个`Netty`服务器
    this.remotingServer = new Netty d ! F F B BRemotingServerx V f . E I + ;(this.nettyServerConfig, this.broB % , M ! _ TkerHousekeepingService);
    //5 ! Q % ` ~ E p u 初始化负责处理`Netty`网络交互数据的线程池
    this.remotingExecutor = Executors.newFixedThreadPo^ 5 D V ` , 3ol(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExeO ; w | ( HcutorThread_"));
    this.registerProcessor();5 & $ p ~ :

    // 注册心跳机制线程池
    this.scheduledExe~ 7 Q ) E # FcutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public voi1 k - # % / G }d run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 注册打印KV配备线程池
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(% X ) t +) {
        @Override
        publicb u T void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

   // 省掉以下代码...

    return true;
}

在初始化NamesrvController过程中,会注册一个心跳机制的线程池,它会在发起后5秒初步每隔10秒扫描一次不生动的bN A 4 . /roker

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> i: Z l # ~ ^ | r Xt = thi/ h m G B 9s.brokerLiveTable.entrySet().iterator();
    while (iw k a M O 8t.I s l F q PhasNext6 2 c & B()) {
        Entry<String, BrokerLiveInfo> nextP S e 6 W C = it.next();
        long lasts e s y b | } = next.getValue().getLastUpdateTimen u + | { lstamp();
        // BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;即120秒
        if ((last + BROKER_CHANNELx x 4 i 1 ^_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChanns [ ) 1 s N N [ Nel());
            //v P S A - f h 将该 broker 从 brokerLiveTable 中移除
            it.reZ 5 , k = 1 @move();
            lF 4 # v Sog.warn("The broker chI S E : bannel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDe W c P ;estroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

可以看到,在w b v B : K [ jscanNotActiveBrT w h h Aoker方法中,Namt } feServer会遍历RouteInfoManager#brokerLiveTable这个特色。

private final HashMap<String/* brd Z | | S 2 $okerAddr */, BrokerLiveInfo> brokerLiveTable;

class I 8 : BrokerLiveInfo {
    // broker上一次更新的生动时间戳
    private long lastUpdateTimestamp;
    private DataVersionG x , ) c J b dataVer] r ?sion;
    private Channel channel;
    private String haServerAddrl $ x % I ~;
}

RouteInfoManager#brokerLiveTable特色存储的是集群中所有broker的生动信息,首要是BrokerLiveInfo#lastUpdateTimestamp特色,它描述了broker上一次更0 B k | ^ L新的生动时间戳。若lastUpdateTimestamp特色超越120秒未更新,则该broker会被视为失效并从brokerLiveTable中移除。

除了心跳机制的线程池外,还会g P S N $ a . 1 }注册别的一个线程池,它会每隔10秒打印一次所有的KVG ` + q p | *备信息。

典雅停机

NameServer发起x B ) g ( d o a的终究一步,是注m t |册了a – 0 V & l t ~ J一个JVM的钩子函数,它会在JVM封闭之前实行。这个钩子函数的作用是开释资源,如封闭NeG _ n y /tty服务器,封闭线程池等。

RuR G ; N kntime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callabp 6 0 q Z Tle<Void>(| p P B * :) {
    @Override
    public Void call() throwsR O & F H 2 @ Exception {
        controller.shutdown();
        return nulC x Ol;
    }
}));

p g o l

本文Y m N m r叙述了NameServer的作用,同时依据其发起类NamesrvStartup类分析了发起流程,以及心跳机制和典雅停机的完成+ e X & A原理。

希望可以帮忙到咱们。

参考文献

  • RocketMQ Architecture
  • RoI 6 ) Y ~ r ncketMQ Namesrv发起流程

版权声明:本文为Planez $ = _ u I Oswalker23所创,转载请带上原文链接,感谢。
本文已上传S B U个人公众号,欢迎扫码重视。

RocketMQ.2-NameServer是怎么发起的

本文运用 m7 = E U z pdnice 排版