canal是阿里巴巴开源的mysql数据库binlog的增量订阅&消费组件。项目github地址为:https://github.com/alibaba/canal。

本教程是从源码的视点来剖析canal,适用于对canal有必定根底的同学。本教程运用的宫崎骏版本是1.1.4,这也是笔者写这mysql面试题篇教程时的最新稳Go定版,关于canal的根底知识能够参看:https://github.com/alibaba/canal/wiki。

下载项目源码

git clone https://github.com/alibaba/canal.git

切换到canal-1.1.4json数据格局这个tag

git checkout canal-1.1.4

1. 源json解析码模块差异

canal是依据maven构建的,运用模块如下所示:

同步利器之Canal源码解析

模块虽多,但是每个模块的代码都很少。各mysql索引个模块的效果如下所示:

common模块:首要是供给了一些公共的东西类和接口。

client模块:canal的客户端。中心接口为CanalConnectmysql增修改查句子or

example模块:供给client模块运用案例。

protocol模块:client和server模块之间的通讯协议

deployer:安顿模块。经过该模块供给的CanalLauncher来建议canal ser源码本钱ver

server模块:canal服务器端。中心接口为CanalSejson数据格局rver

iappearancen源码编辑器stance模块:一个server有多个instance。每个insmysql数据库tance都会仿照成一个mysql实例的slave。instance模块json格局怎样翻开有四个中心组成部分:parser模块、sink模块、store模源码共享网块,meta模块。中心接口为CanalInstance

parJSONser工商银行模块:数据源接入,工商银行仿照slave协议和master进行交互,协议解析。parser模块依托于djsonpbsync、driver模块。

drivejsonobjectr模块和dbsync模块:从这两个模块的artifactId(cjsonobjectanal.parsmysql数据库命令大全e.dappstoreriver、canal.parse.dbsync),就能够看出来,这两个模块实践上是pa工商银行rser模块的组件。事实上parser 是经过driver模块与mysql树立联接,然后获取到binlog。由于原始的binloapp装置下载g都是二进制流,需求解析成对应的binlog作业,这些binlog作业政策都界说在dbsync模块中,dmysql数据库命令大全bsync 模块来自于淘宝的tddl。

sink模块:parser和store链接器,进行数据过滤,加工,分发的作业。中心接口为CanalEventSink

store模块:数据存储。中心接口为Canal源码共享网EventStore

met源码本钱a模块:增量订阅&消费信息处理器,中心接口为CanalMetaManager,首要用于记载canajsonl消费到的mysql binlog的方位,

下面再经过一张图来阐明json数据格局各个模块之间的依托联系:

同步利器之Canal源码解析
经过deplmysql装置装备教程oyer模块,发mysql装置起一个canal-server,一个cannal-server内部包括多个instanmysql怎样读ce,每个insta宫崎骏nce都会伪装成一个mysql实例的slave。client与server之间的通讯协议由protmysql怎样读ocol模块界说。client在订阅binlog信息时,json是什么意思需求传递一个desjson数组tination参数,server会依据这个destination供认由哪一个instance为其供给服源码务。源码之家

咱们先看一下deployer模块:

canal有两种运用办法:1、独立安顿 2、内嵌到运用中。 deployer模块首要json数据格局用于独立安顿canal server。关于这两种办法的差异,请拜见server模块源码剖析。deployer模块源码目录结构appearance如下所示:

同步利器之Canal源码解析

depjson格局loyer模块首要完毕以下功用:
  • 读取canal,properties装备文件
  • 建议canal server,监听canal client的央求
  • 建议canal instance,联接mysql源码编辑器数据库appstore,伪装成slave,解析binlog
  • 在canal的作业进程中,监听装备文件的改动

startup.jsonobjectsh脚本内,会调用com.alibaba.otter.canal.deployer.CanalLauncher类来进行建议,这是剖析Canal源码的进口类,如下图所示:
同步利器之Canal源码解析

所以咱们JSON来解析一下CanalLauncher类
  • 读取canal.properties文件中的装备
  • 运用读取的装备结构一个CanalController实例,将悉数的建议操作都委派给CgoogleanalController进行处理。
  • 毕竟注册一个钩子函数,在JVM间公积金断时同时也连续canal server。枸杞
public class CanalLauncher {
private static fina源码共享网l String CLASSPATH_URappstoreL_PREFIX = "classpath:";
private static final Logger logger               = LoggerFactory.getLogger(CanalLauncher.class);
public static void main(String[] args) throws Throwable {
try {
//1、读取canal.pjson格局怎样翻开roperties文件中装备,默许读取classpat狗狗币h下的canal.properties
String conf = SGoystem.getProperty("canal.conf"工商银行, "classpath:canal.properties");
Properties properties = nemysql增修改查句子w Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)approach) {
conjson格局怎样翻开f = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
prope源码超市rties.load(new FileInputStream龚俊(conf));
}
//2、建议canal,首要将promysql数据库命令大全perties政策传递给CanalController,然后调用其start办法json数组建议
logger.info("## start the canal server.");
final CanalController controller = new CanalController(properties);
controller.start();
loggerGo.info("## the canal server is running now ......");
//3、封闭canal,经过增加JVM的钩子,JVM连续前会回调run办法,其内部调用controller.stop()办法进行连续
Runtime.getRuntime().addShutdownHook公积金(new Thread() {
public void run() {
tjson文件是干什么的ry {
logger.info("#JSON# stop the ca源码年代nal server");
controller.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:n{}",
ExceptionUtils.getFullStackTrace(e));
} finajsonlly {
logger.info("## cappearanal server is down.");
}
}
});
} catch (Throwable e) {
logger.error("## Something gojson解析es wrong whenmysql数据库 starting up the canal Server:n{app装置下载}",
ExceptionUtils.getFullStackT源码编辑器race(e));
Systemysql增修改查句子m.exit(0);
}
}
}

能够看到,CanalLauncher实践上仅仅担任读取canal.properties装备文件,然后结构CanalCon源码怎样做成app软件troller政策,并经过其start和stop办法来敞开和连续canal。源码怎样做成app软件因而,假源码之家设说CanalLauncher是canaapplicationl源码剖析的进口类,那么CanalController便是canal源码源码怎样做成app软件剖析的中心类。

在CaAPPnalController的结构办法中,会json格局对装备文件内容解析,初始化相关成员变量,做好canal server的建议前的准备作mysql数据库业,之后在CanalLauncherjson中调用CanalController.start办法来建议。

public class CanalController {
private static final Logger  logger   = LoggerFactory.getLogger(CanalController.class);
private Loappstoreng                                     cid;
private Sappstoretring                                   ip;
private int                                  port;
// 默许运用spring的办法载入
private Map<String, InstanceConmysql增修改查句子fig>              instanceConfigs;
private InstanceConfig                           globalInstanceConfig;
private Map<String, CanalConfigClient>           managerClients;
// 监听insta宫颈癌nce config的改动
private boolean                             autoSjsoncan = true;
private InstanceAction                           defaultAction;
private MaJSONp<InstanceMode, InstanceConfigMonitor> instanceConfigMonitors;
private C源码编辑器编程猫下载analServerWithEmbedded                  embededCanalServer;
private CanalServerWithNetty                     canalServer;
private CanalInstanceGenerator                   instanceGenerator;
private ZkClientx                                zkclientx;
public CanalCo源码之家ntroller(){
this(Systmysql增修改查句子em.getPropertieappreciates());
}
public CanalController(final Prope工商银行rties proapplicationperties){
managerClients = Migratmysql增修改查句子eMap.makeComputingMap(new Fun工商银行ction<String, CanalappleConfigClient&gtjsonp;()源码网站 {
public CanalConfigClient apply(String managerAddress) {
return getManagemysql装置装备教程rClient(managerAddrappreciateess);
}
});
//1、装备解析
globalInstanceConfig = initGlobalConfig(properties);
instanceConfigs = new MapMaker().makeMap();
initInstanapplicationceConf源码买卖网站源码ig(properties);
// 2、准备canal server
cid = Long.valueOf(getProperty(properties, CanalConstaappleidnts.CANAL_ID));
ip = getProperty(properties, CanaappstorelConstagooglents.CANAL_IP);
port = Integeappearancer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer工商银行 = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);//Go 设置自json是什么意思界说的instanceGenerator
canalServer =app装置下载 CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
//3、初始化zk相关代码
// 处理下ip为空,默许运用hosjsonobjecttIp露出到zk中
if (StringUtils.isEmpty(ip))json解析 {
ip = AddressUtils.getHostIp();
}
finajsonpl String zkServers = getProperty(p源码本钱roperties, CanalappreciateConstants.CANAL_ZKSERVjson数组ERS);源码超市
if (StringUtmysql优化ils.isNotEmpty(zkServers)) {
zkappleclapp装置下载ientx = ZkClijson格局entx.getZk源码超市Client(zkServers);
// 初始json数据格局化体系目录
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, tru宫颈癌e);
zkclientx.createPersistent(ZookeeperPathUtils.CapplicationANAappearanceLjson_CLUSjson是什么意思TER_ROOT_NODE,源码年代 true);
}
//4 CanalInstance作业状况监控
final ServerRunningData serverData = new ServerRunningData(cid, ipjson文件是干什么的 + ":" + port);
ServerRunningMonitors.setServerData(serveappearancerData);
ServerRunningMonitors.setRunningMonitors(//...)源码编辑器编程猫下载;
//5、autoScan机制相关代码
autoScaappearn = BooleanUtils.toBoolean(getProperty(proper源码编辑器编程猫下载ties, CanalConstants.CANAL_AUTO_SCAN));
if (autoScan) {
defaultAction =mysql面试题 new InstanceAction(apple) {//....};
instamysql面试题nceConfigMonitomysql是什么软件rs = //....
}
}
....
}

为了读者能够尽量简略的看作业总结出CanalController的结构办法中都做了什么,上面代码片段中省掉了部分代码。这样,咱们能够很显着的看出来, ,在CanalController结构办法中的代码分差异为了固定的几个处理进程,下面mysql是什么软件依照几个进程的差异,逐一进行解说,并具appstore体的介绍CajsonnalConjson数组troller中界说的各个字段的效果。

装备解析相关代码
// 初始化大局参数设置
globalInstanceConfig = initGlobalConfig(p作业总结roperties);
instanceConfigs = new MapMak源码共享网er().makeMap();
// 初始化instance config
initInstanceCmysql优化onfig(properties);

标明canal instance的大局装备,类型为InstanceConfig,经过initGlobalConfig办法进行初始化。首要用于解析can源码超市al.properties以下几个装备项:

  • canal.instajson数据格局nce.global.mode:供认canal ijson是什么意思nstance装备加载办法,取值有manager|spring两种办法
  • canal.instance.global.lazy:供认canal instan宫崎骏ce是否推延初始化
  • canal.instance.global.manager.address:装备中心地址。假定canal.instance.global.mo龚俊de=manager,需求供给此装备项
  • canaljson文件是干什么的.instance.global.spring.xml:spring装备文件途径。假定canal.instance.global.mode=spring,需求供给此装备项

initGlobAPPalConf龚俊ig源码如下所示:

private InstanceConfig initG作业总结lobalConfig(Properties properties) {
InstanceConfig globalConfig = new InstanceCappstoreonfig();
//读取canal源码买卖网站源码.instanceapp装置下载.global.mode
String modeStrapple = getProperty(properties, CanalConstanjson解析ts.getInstanceModmysql数据库命令大全eKey(CanalConstants.GLOBA源码网站L_NAME));
if (StringUtiljson解析s.i源码本钱sNotEmpty(modeStr)) {
//将modelStr转成枚举InstanceMode,这是一个枚举类,只需2个取值,SPRINGMANAGER,对应两种装备办法
gl公积金obalConfig.setMode宫颈癌(InstanceMode.valueOf(json文件是干什么的StringUtils.upperCase(modeStr)));
}
//读取canal.mysql数据库instance.global.lazy
Stmysql索引ring lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBALAPP_NappleAME));
if (StringUtils.isNotEmpty(lazyStr)) {
globalConfig.setLazy(Boolean.vajsonobjectlueOf(lazyStrjson));
}
//读狗狗币取ca源码编辑器编程猫下载nal.instance.global.manager.address
String managerAddress =mysql增修改查句子 getProperty(properties,
Can公积金alConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
if (StringUtils.isNotEmpty(m工商银行anagerAddress)) {
globalConfig.setManagerAddress(managerAdmysql增修改查句子dress);
}
//读取canal.instance.global.spring.xml
String springXml = getProperty(作业总结proper宫颈癌ties, CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME));
if (StringUtils.isNotEmpty(sprmysql是什么软件ingXml)) {
globalConfig.setSpr工商银行ingXml(springXml);
}
instanceGenerator = //...初始化insta狗狗币nceGenerator
return globalConfig;
}

其间canal.instance.global.mode用于供认canal instaAPPnce的大局装备加载办法,其取值规模有2个:spring、manager。咱们知道一个canal server中能够发宫颈癌起多个canal instance,每个instance都有各自的装备。instance的装备也能够放在本地,也能够放在长途装备中心里。咱们能够自界说每个canal instance装备文件存储的方位,假定源码超市悉数canal instance的装备都在本地或许长途,此刻咱们就能够经过canal.instance.global.mode这个装备mysql是什么软件项,来共同的指定配approach备文件的方位,防止为每个canal instance独自指定。

spring办法:

标明悉数的canalmysql装置 instance的装备文件坐落本地。此刻,咱们有必要供给装备项canal.instance.global.spring.xml指定spring装备文件的途径。canal供给了多个sjson解析pring装备源码超市文件:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xmysql面试题ml、group-instance.xmjson是什么意思l。这么多装备文件首要是为了支撑canajson数组l instance不同appreciate的作业办法。咱们在稍后将会解说各个装备文件的差异。而在这些装备文件的开始,咱们无一例外的能够看到以下装备:

<bean class="co源码年代m.alibaba.otter.canal.in宫崎骏stance.spmysql装置ring.support.PropeJSONrtyPl龚俊aceh源码共享网olderConfigurer" lazy-init="false">
<papp装置下载roperty name="ignoreResourceNotFound" value="true" /json解析>
<property name宫崎骏="systemPropertiesModeName" value="SYSTEM_PROPERTIappearanceES_MODE_OVERRIDE"/><!-- 允许system掩盖 -->
<property name="locationNames"&gtjson文件是干什么的;
<list>
<value>classpath:canal.properties</vjson格局怎样翻开alue>
&ltJSON;value>classpath:${canal.instance.dmysql面试题estination:}/instance.properties</valmysql数据库命令大全ue>
</list>
</property>
</bean>

这儿咱们能够看到,所谓经过spring办法加载canal ins源码怎样做成app软件tance装备,无非便是经过spring供给的PropertyPlaceholderConfigurer来加载canal inst作业总结ance的装备文件instance.properties。

这儿instance.pmysql装置roperties的文件无缺途径是canal.instance.destination:mysql数据库/ins源码超市tance.properties,其间{canal.instance.destination:}/instance.properties,其间{canal.instance.destination}是一个变量。这是由于咱们能够在一个canal server中装备多个canal instanappstorece,每个canal instance装备文件的称号都是instance.pr源码之家operties,因而咱们需求通mysql装置过目录进行差异。例如咱们经过装备项cana作业细胞l.destinations指定多个canal instance工商银行的姓名

canal.destinatiojson数组ns= example1,example2

狗狗币时咱们就要conf目录下,新建两个子目录公积金exampleappearance1和eapp装置下载xample2,每个目录下各自放置一个instance.properties。

canal在初始化时就会别离运用example1和example2来替换${canal.instance.destination:},然后别离依据example1/instance.propertimysql增修改查句子es和example2/inst源码买卖网站源码ance.properties创立2个canal instance。

managemysql数据库r办法:

标明悉数的canal instance的装备文件坐落长途装备中心,此刻咱们有必要供给装备项 canal.instanjsonce.global.manager.address来指定长途装备中心的地址。现在alibabGoa内部装备运用这种办法。开发者能够自己完毕CanalConfigClient,联接各自的处理体系,完毕接入。

准备canal server相关代码

cid = Long.valueOf(ge源码tProperty(properties, CanalConstants.CANAL_ID));
iapplep = getPropert源码编辑器y(properties, CanalConstants.CANAL_IP);
port = Integer.v龚俊alueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbjsonedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自界说的instanceGenerator
canalServer = CanalS源码erverWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);

上述代码中,首要解析了cid、ip、port字段,其间:

cid:Long,对应canal.promysql数据库命令大全perties文件中的canal.id,现在无实践用处

ip:String,对应canamysql优化l.properties文件中的canal.ip,canal server监听的ip。

pojsonprt:int,对应canal.properties文件中的canal.port,canapplicational server监听的端口
之后别离为以MySQL下两个字段赋值:

embedapp装置下载edCanalServer:类型为Cana宫崎骏lServerWithEmbedded

canalServer:类型为Canaljson数组ServerWithNetty

CanalServerWithEmbedded 和 CanalServerWithNetty都完毕了CanalServer接口,且都完毕了单例办法,经过静态办法iappearnstance获取实例。

关于这两种类型的完毕,canal官方文档有以下描绘:
同步利器之Canal源码解析
说白了,便是咱们能够不用独立安顿canal server。在运用直接运用CanalServerWithEmbedded直连mysql作业总结数据库。假定觉得自己的技术hold不住相关代码,就独立安顿一个ca源码编辑器编程猫下载na源码编辑器l server,运用canal供给的客户端,联接mysql面试题canal server获取binmysql面试题log解析后数据。而CAPPanalServerWithNetty是在CanalServerWithEmbedded的json数组根底mysql数据库上做的一层封装,用appear于与客户端通讯。

在独立安顿canmysql装置al server时,Canal客户端发送的悉数央求都交给CanalServerWithN源码编辑器etty处理解析,解析完毕之后委派给了交给CanalServerWithEmbeapp装置下载dded进行处理。因而CanalServerWithNetty便是一个马甲罢了。CanalServerWithEmbedded才json是什么意思是中心。

因而,在上述代码中,咱们看到,用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,而ip和port被设置到CanalSAPPerverWithNetty中。

关于CanalServerWithNetty怎样将客户端的央求委派给CanalServerWithEmbeddedmysql增修改查句子进行处理,咱们将在server模块源码剖析中进行解说。

初始化zk相关狗狗币代码

 //读取canal.properties中的装备项canal.zkServers,假定没有这个配MySQL备,则标明项目不运用zk
final String zkServers = getmysql面试题Property(properties, CanalConstants.CANAL_ZKSERVERS);
if (StringUtils.isNotEmpty(zkServers)) {
//创立zk实例
zkclientx = ZkClientx.getZkC工商银行lient(zkServers);
// 初始化体系目录
/mysql数据库命令大全/destination源码怎样做成app软件列表,途径为mysql装置装备教程/otter/canal/destinations
zkclientx.createPer源码之家sistent(ZookeeperPathUtils.DESTINATION_RO源码本钱OT_NODE, true);
//整个canal server的集群列表,途径为/otter/canal/cluster
z源码怎样做成app软件kjson格局clientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
}

canal支撑运用了zk来完毕HA机制、以及将当时消费到到的mysql的binlog方枸杞位记载到zk中。ZkCljsonobjectientx是canal对ZkClient进行了一层简略的封装。

显着,源码网站当咱们没有装备canal.zkServers,那么zkclientx不会被初始化。

关于Canal怎样运用ZK做HA,咱们将在稍后的代码中进行分。而运用zmysql数据库k记载binlog的消费进展,将在之mysql索引后的章节进行剖析。

CanalInstance作业状况监控相关代码

由于这段代码比appear较长且恶心,这儿笔者暂时对部分代码进行省掉,以便读者看清楚整各条理

final ServerRunninJSONgData serverData = new ServerRunningData(cid, ip + ":" + port);
ServerRunningMonitors.setServerData(serverData);
ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunnmysql面试题ingMonitor>() {
public Server源码RunningMonitor apply(final String destination) {
ServerRunningMonitor runningMonitGoor = new ServerRunningMonitor(serjson文件是干什么的verData);
ru源码nningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningLappreciateistene公积金r() {....});//省掉ServerRunningListener的详细完毕
if (zkclientx != null) {
rumysql数据库命令大全nningMonitor.setZkClient(zkclientx)mysql装置;
}
// 触发创立一下cid节点
runningMonitor.init();
return runni源码之家ngMonitor;
}
}));

上述代码中,ServerRunningMonitors是ServerRunning源码编辑器Monitor政策的容器,而ServerRunningMonitor用于监控CanalInstance。

canal会为每一个destination创立一个CanalInstan作业总结ce,每个CanalInstance都会由一个S源码编辑器erve枸杞rRunningMonitor来进行监控。而ServerRunningMonitor共同由ServerRunningMonitors进行处理。
除了CanalInstance需求监控,CanalServer本身也mysql数据库命令大全需求监控。因而咱们在代码一开始,appear就看到往mysql是什么软件Servjson文件是干什么的erRunningMonitors设置了一个源码买卖网站源码ServerRunningData政策,封装了canal server监听的ip和端口等信息。

ServerRunn源码网站ingMonitors源码如下所示:

public class ServerRunningMonitors {
private static Servermysql装置RunningData serverData;
private static Map               run源码共享网ningMonitors; // <String,ServerRunningMonitor>
public static ServerRunningData getServerData() {
return serverData;
}
public static Map<String, ServerRunningMonitor> getRunni源码之家ngMonitors() {
return runningMonitappearors;
}
public static ServerRunningMonitor getRunningMonitor(String destination) {
return (ServerRunningMonitor) runningMonitors.get(destination);
}
public static void setServerData(ServerRunningData serverData) {
Serjson是什么意思verRunningMonitors.serverData = serverData;
}
public static void setRunningMonitors(Map runningMonitors) {
ServerRunningMonitors.runningMonitors = runningMomysql怎样读nitors;源码共享网
}
}

ServerRunningMonitors的setRunningMonitors办法接收的参数是一个Map,其间Map的key是destination,value是ServejsonprRunningMonitor,也便是说针对每一个destination都有一个ServerRunningMonitor来监控。

上述代码中,在往ServerRunninappearancegMonitors设置Map时,是经过MigrateMap.makeComputingMap办法来创立的,其接受一个Function类型的参数,这是guava中界说的接口,其声清楚apply笼统办法。其作业原理能够经过下面代码片段进行介源码之家绍:

Map<String, User> map = MigrateMap.makeComputingMap(new Functmysql增修改查句子ion<String, User>() {
@Override
public User apply(String name) {
return new User(name);
}
});
User user = map.get("tianshouzhi");//第一次获取时会创立
assert user != null;
assejson是什么意思rt user == map.get("tianshouzhi");//之后获取,总是回来之前现已创立的政策

这段代码中,咱们运用Migr作业总结ateMap.makeComputingMap创立了一个Map,其间key为String类型,value为User类型。当咱们调用map.get(“tianshouzhi”)办法,json文件是干什么的开始步这个Map中并没有任何key/value的,所以其就会回调Function的apply办法,运用参数”tianshouzhmysql增修改查句子i”创立一个User政策并回来。之后当咱们再以”tianshouapplicationzhi”为key从Map中获取User政策时,会直接将前面创立的政策回来。不会回调apply办法,也便是说,只需在第一次查验获取时,才会回调apply办法。

而在上述代码中,实践上就运用了这个特性,只不过是依据destination获取ServerRunningMonitor政策,假定不存在就创立。

在创立ServerRunningMonitor政策时,首要依据ServerRunningData创立ServerRunningMonitor实例,之后设置了destination和ServerRunningListener政策,接着,判别假定zmysql数据库kClientx字段假定不为json空,也设置到ServerRunningMonitor中,毕竟调用init办法进行初始化。

ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setjson格局Destination(destination);
runningMonitor.setListener(new ServerRunningListene源码编辑器编程猫下载r(){...})//省掉ServerRunningListener详细代码
if (zkclientxapp装置下载 != null) {
runningMonitor.setZkClie狗狗币nt(zkclientx);
}
// 触发创立一下cid节点
runningMonitor.init();
return runningMonitor;

ServerRunningListenAPPer的完毕如下:

new ServerRunningListener() {
/*内部调用了emAPPbededCanalServer的stmysql装置装备教程art(destination)办法。
此处需求划要点,阐明每个destination对应的Cana龚俊lIns宫颈癌tance是经过embejson数组dedCanalServer的stMySQLart办法建议的作业细胞,
这与咱们之前剖析将instanceGenerator设置到embeappearancededCanalServer中能够对应源码年代上。
embededCanalServer担任调用ijson数据格局nstanceGenejson是什么意思rator公积金生成CanalInstance实源码怎样做成app软件例,并担任其建议。*/
public void processActiveEntmysql数据库命令大全er() {
tapproachry {
MDC.put(CanalConstants.MDC_DESTINATappreciateION, String.valueOf(destination));
embededCanappstorealServer.start(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
//内部apple调用embededCanalServer的stop(dest源码ination)办法。与上start办法类似,只不过是连续CanalInJSONstance。
public void源码编辑器 processActiveExit() {
try {
MDC.put(CanaapproachlConstants.MDC_DESTINAmysql索引TION, String.valueOf(destination));
embededCanalServer.stop(destination);
} finallyAPP {
MDC源码编辑器编程猫下载.remove(CanalConstants.MD作业总结C_DESTINATION);
}
}
/*处理存在zk的状况下,在Canalinstance建议之前,在zk中创立节点。
途径为:/o源码编辑器tter/canal/destinations/{0}/cluster/{1},作业总结其0会被destination替换,1会被ip:port替换。
此办法会在processActiveEnter()之前被调用*/
public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
initCid(path);
zkcjson数据格局lientx.subscribeStatjson是什么意思eChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
pujson格局blic void handleNewSession() thrmysql面试题ows Except枸杞ion {
initCid(path);
}
});
}
} finaapprovelly {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
//处理存在zk的状况下,在Canalinstance连续前,开释zk源码共享网节点,途径为/otter/canal/json数组destinations/{0}/cluster/google{1},
//其0会被作业细胞destination替换,1会被ip:port替换。json格局怎样翻开此办法会在processActiveExit()之前被调用
public void processStop() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOfjson(destination));
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationCluste源码买卖网站源码rNode(destjson数据格局inatiomysql装置装备教程n, ip + ":" + port)宫颈癌;
releaseCid(path);
}
} finally {
M源码买卖网站源码DC.remove(CanalConstants.MDC_DESTINATION);
}
}
}

上述代码中,咱们能够看到建议一个CanalInstance实践上是在ServerRunningL源码超市istener的processActiveEnter办法中,经过调用embededCanJSONalServer的start(destiGonation)办法进行appleid的,关于连续也是类似。

那么ServerRunningListener中的相关办法到底是在哪里回调的呢?咱们能够在ServerRunningMoni源码tor的start和stop办法中找到答案,这儿只列出start办法。

public class ServerRunningMonitor extends AbstractCa作业细胞nalLifeCycle {
...mysql索引
public void源码编辑器 start() {
super.startAPP();
processStart(google);//其内部会调用ServerRunningListener的processStart()办法
if (zkClient != null) {//存在zk,以HA办法建议
// 假approach设需求尽或许开释instance资源,不需求监听running节点,否则即使stop了这台机器,另一台机器立马会start
String path = Zook宫颈癌eeperPathUtils.getDestinationServerRunning(destinat源码网站ion);
zkClient.subscrjson数组ibeDataChanges(path, dataListener);
initRunning();
} else {//没有zk,直接建议
papproachrocessActiveEnter();
}
}
//...smysql是什么软件top办法逻辑类似,相关json格局怎样翻开代码省掉
}

当ServerRunningMonitor的start办法被调用时,其首要会直接调用processStart办法,这个办法内部直接调了ServerRunningListener的processStart()办法,源码如下所示。经过前面的分源码本钱析,咱们现已知道在存在zkCl源码编辑器ient!=null的状况,会往zk中创立一个节点。

private void processStart() {
if (listener != null) {
try {
listener.processStart();
} catch (Exception e) {
logger.error("proc作业细胞essStart failed", e);
}
}
}

之后会判别是否存在zkClient,假作业细胞设不存在,则以本地办法建议,假定存在,则以HA办法建议。咱们知道,canal server能够安顿成两种办法:集群办法或许独立安顿。其间集群办法是运用zk来做HA,独立安顿则能够直接进行建议。咱们先来看比较简略的直接建议。

直接建议:

不存在zk的状况下,会进入elapplese代码块,调用processActiveEnter办法,其内部调用了listene作业细胞r的processActiveEnter,建议相应djson文件是干什么的estination对应的CanalInstance。

private void processActiveEnter(jsonobject) {
if (listener != null) {
try {
listenerapplication.procesapplicationsActiveEnter();
} catjson解析ch (Exception e) {
logger.error("proappreciatecessActiveEnter failed", e);
}
}
}

HA办法源码之家建议:

存在zk,说源码年代明canal server或许做了集群,由于canal便是运用zk来做HA的。首要依据destination结构一个zJSONk的节点途径,然后进行监听。

/*构建暂时节点的途径:/otter/canal/destinjson数组ations/{0}/running,其间占位符{0}会被destination替换。
在集群办法下,或许会有多个canal server一同处理同JSON一个destination,
在某一时间,只能由一个cajson是什么意思nal server进行处理,处理这个destination的can狗狗币al server进入running状况,其他canal server进入standby状况。*/
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
/*对destination对应的running节点进行监听,一旦产生了jsonp改动,则阐明或许其他处理相同destination的canal server或许呈现了失常龚俊,
此刻需求查验自己进入running状况。*/
zkClient.sujson是什么意思bscribeDataChanges(path, dataListener);

上述仅仅监听代码,之后查验调用initRunning办法经过HA的办法来建议CanalInstance。

private void initRunning() {
if (!isStart()) {
return;
}
//构建暂时节点的途径:/otter/canal/destinations/{0}/running,其间占位符{0}会被destination替换
String path = ZookeeperPathUtils.getDestinationServerRunning(dejson数组stination);
// 序列化
//构建暂时节点的数据,符号当时destination由哪一个canal server处理
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutmysql优化ex.set(false);
//查验创立暂时节点。假定节点现已存在,阐明是其他的canal server现已建议了这个canal instance。
//此刻会抛出ZkNodeExistsException,进入catch源码编辑器代码块。
zkCli龚俊ent.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();//假定创立成功,触发一下作业,内部调用ServerRunningListener的procemysql装置ssActiveEntGoer办法
mutex.set(true);
} catch (ZkNodeExistsEx作业总结ception e) {
//创立节点失利,则依据patMySQLh从json解析zk中获公积金取当时是哪一个canal server创立了当时源码之家canal instance的相关MySQL信息。
//第二个参数true,标明的是,假定这个pathjson数据格局不存在,则龚俊回来null。
bytes = zkClient.readData(path, true);
if (bytes == null) {// 假定不存在节点,当即查验一次
initRunning();
} else {
//假定的确存在,则将创立该canal instance实例信息存入actimysql索引veData中。
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRun源码编辑器ningData.class);
}
}json解析 catch (ZkNoNodeException e) {//假定/otter/canal/destinations/{0}/节点不存在,进行创立其间占位符{0}会被destination替换
zkClient.createPersistent(Zooapp装置下载keeperPathUtils.getDestinat龚俊ionPath(mysql面试题destination), true);
// 查验创立父节点
initRunning();
}源码编辑器
}

能够看到,initR作业总结unning办法内部只需在查验在zk中创立节点成功后,才json数据格局会去调用listener的pjson是什么意思rocessActiveEnter办法来google实在建议源码超市destination对应的canal inst作业总结ancejson,这是canal HA办法建议的中心。canal官方文档中介绍了CanalServer HA机制建议的流程,如下:

同步利器之Canal源码解析

事实上,这个阐明的前两步,都是在initRunning办法中完毕的。从上面的代码中,咱们能够看出,在HA机建议的状况下,ijsonobjectnitRunning办法不用定能走到processActiveEnter()办法,由于创立暂时节点或许会犯错。

源码怎样做成app软件外,依据官方文档阐明,假定犯错,mysql是什么软件那么当时canal instance则进入standBy状况。也便是别的一个canal instance呈现失常时,当时canal instance顶上去。那么相关源码在什么地方呢?在HA办法建议开始步的2行代码的监听逻辑中:

String path = ZookeeperPathUtils.getDejsonobjectstinationServerRunn龚俊ing(desti源码超市nation);
zkCappleidlient.subscribeDataChanges(path, dataListene源码共享网r);

其间dataListener类型是IZkDjson格局ataListener,这是zkclient客户端供给的接口,界说如下:

public interface IZkDataListener {
public void handleDataChange(String dAPPataPath, Object data) throws Exception;
public void handleDa龚俊taDeleted(String dataPath) throws Exception;
}

json格局zk节点中的数据产生改动时,会主动回源码年代调这两个办法,很显着,一个是源码年代用于处理节点数据产生改动,一个是用于处理节点数据被删去。

而dataListener是在ServerRunningMonit源码超市or的结构办法中初始化的,如下:

public ServerRunningMonitor(){
// 创立父节点
dataListener = new IZkDataListener() {
//!!!现在看来,好像并没有存在修正running节点数据的代码,为什么这个办法不是空完毕?
public void handleDatajson解析Change(String dataPath, Object data) throws Exception {
MjsonDC.put("destination", destination);
ServerRunningData runningData = JsonUtils.unmarshalF源码编辑器romByte((byte[]) dat作业细胞a, ServerRunni源码网站ngData.class)mysql索引;
if工商银行 (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMinejsonp(rujson文件是干什么的nningData.getAddress()))app装置下载 { // 阐明呈现了主动开释的操作,并且本机之前是active
release = true;
releaseRunning();// 完全开释mainstem            }
activeData = (ServerRunningData) runningjson数据格局Data;
}
//当其他canal instancegoogle呈现失常,暂时节点数据被删去时,会主动回调这个办法,此刻当时canal instance要顶上去
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
if (!release &源码怎样做成app软件amp;& ac源码超市tijson格局怎样翻开veData != null && isMine(activapplicationeData.getAddress())) {
// 假定上一次active的状况便是本机,则即时触发一下active抢占
imysql数据库nitRunning();
} else {
// 否则便是等候demysql数据库layTime,防止因网络瞬端或许zk失常,导致呈现一再的切换宫颈癌操作
delayExector.schedule(new Runnable()龚俊 {
public void run() {
initRunning();//查验自枸杞己进入running状况
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}

那么现在问题json来了?ServerRunningMonitor的start办法又是在application哪里被调用的, 这个办法被调用了,才华实在的mysql是什么软件建议canal instance。这部分代码咱们放到后边的CanalController中的start办法进行解说。

autoScan机制相关代码mysql增修改查句子

同步利器之Canal源码解析

//
autoScan = Boomysql数据库leanUtils.toBoolean(getProperty(propeMySQLrties,作业总结 CanalConstants.CANAL_JSONAUTO_SCAjson格局N));
if (autoSc枸杞an) {
defaultAction = new InstanceAction() {//....};
instjson文件是干什么的anceConfigMonitors = //....
}

能够看到,autoScan是否需求主动扫描的开关,只需当autoScan为true时,才会初始化defaultAction字段和instanceConfigMonitors字段。其间:

其间:
defaultAction:其效果是假定装备产生了改动,默mysql装置装备教程许应该选用什么样的操作。其完毕了InstanceAction接口界说的三个笼统办法:start、stop和reload。当新增一个destination装备mysql装置装备教程时,需求调用start办法来建议;当移除一个destination装备时,需求调用stop办法来连续;appstore当某个desappletination装备产生枸杞改动时,需求调用reload办法来进行重启。

ins源码共享网tanceConfigMonitors:类型为Map&mysql数据库命令大全lt;InstanceMode, InstanceConfigMonitomysql装置r>。defaultAction字段仅仅定源码网站义了装备产生改动默许应该选用的操作,那么总该有一个类来监听装备是否产生了改动,这便是InstanceConfigMonitor的效果。官方文档中,只提到了对canal.conf.dirmysql装置装备教程装备项指定的目录的监听,这指的是经过spring办法加载装备。显着的,经过manager办法加载装备,装备中心的内容也是或许产生改动的,也需求进行监听。此刻能够理解为什么instanceConfigMonitors的类型是一个Map,key为InstanceMode,便是为了对这两种办法的装备加载办法都进行监听。

defaultAction字段初始化源码如下所示:

defaultAction = new InstanceAction() {
public vmysql增修改查句子oid start(String destination) {
InstanceCo源码怎样做成app软件nfig config = instanceConfigs.get(destination);
if (config == null) {
// 从头读取一json解析下instance config
config = parseInstanceConfig(properties, destination);
instanceConfigs.put(destination, config);
}
if作业总结 (!embededCanalServer.isStart(destination)) {
// HA机制建议
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(de宫崎骏stination);
if (mysql装置装备教程!confijson是什么意思g.getLazy() && !runningMonitor.isStart()) {
runappleningMonitor.start();
}
}
}
public void stop(String destination) {
// 此处的stop,代表强制退出,非HA机制appear,所以需求退出HA的mojson是什么意思nitor和装备信息
InstanceConfig confapplicationig = instanceConfigs.remove(dmysql怎样读estination);
if (co源码本钱nfig != null) {
embededCanalServer.stop(destination);
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMo枸杞nitor(destination);
if (runningMonitor.isStart()源码共享网) {
runningMonitor.源码之家stoapp装置下载p();
}
}
}
public void reloa源码怎样做成app软件d(String destination) {
// 现在任何装备改动,宫颈癌直接重启,简略处理
stop(destinationmysql装置);
start(destination);
}
};

instanc源码本钱e源码超市ConfigMonitors字段初始化源码如下所示:

injsonstanceConfigMonitors = MigrateMap.makeComputingMmysql怎样读ap(new Function<InstanceMode, InstanceConfigMonitor>() {
public InstanceConfigMonitor apply(InstanceMode mode) {
int scanInterval = Integer.valueOf(getProperty(properties, Camysql装置nalConst源码年代ants.CjsonpANAL_AUTO_SCAN_INTERVAL));
if (mode.isSpring()) {//假定加载办法是spring,回来SpringInstanceCo源码nfigMonitor
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
monitor.setScanIntervalInSeconmysql数据库d(scanInterval狗狗币);
monitor.setDefaultAction(defaultAction);
// 设置conf目录,默许是user.dir + conf目录组成
String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
if (StringUtils.isEappearmpty(ro源码之家otjsonobjectDir)) {
rootDir = "../conf"approach;
}
if (StringUtils源码年代.equals("otter-canal", System.getProperty("appName"))) {
monitor.setRootConf(rootDir)作业细胞;
} else {源码之家
// eclipse debug办法
monitjsonor.setmysql装置装备教程RootConf("src/mainmysql装置装备教程/resourcemysql是什么软件s/");
}
return monitor;
} else if (mode.isManager()) {//假定加载办法是manager,回来ManagerInstancjsonpeConfigMonitor
retumysql数据库命令大全rn new ManagerInstanceConfigMonitor();
} else {
throw new Unsu源码编辑器编程猫下载pportedOperationException("unknow mode :" + mode + " for monitor");
}
}
});

能够看到instanceConfigMonitors也是依据mode特点,来选用不同的监控完毕类SpringInstanceConfigMonitor 或许ManappearagerInstanceConfigMonitor,二者都结json数据格局束了InstanceConfigMonitor接口。

papp装置下载ublic interface InstanceCAPPonfigMonitor extends CanalLifeCycle {
void register(String destination, InstanceAction action);
void unregister(String destination);
}

当需求对一个destination进行监听时,调用register办法

当撤销对一个destination监听时,调用unregister办法。

事实上,unregister办法在canal 内部并没有有任何地方被调用,也便是说,某个destination假定敞开了autoScan=true,那么你是无法在作业时连续对其进行监控的mysql优化。假定要连续,你能够选择将对应的目录删去。

Insta工商银行nceConfigMonitor本身并不知道哪些app装置下载canal instance需求进行监控,由于不同的canal instance,有的或许设置autoScan为true,别的一些或许设置为false。

在CanalConroller的start办法中,关于autoScan为true的destination,会调用InstanceConfapproveigMoni龚俊to源码年代r的register办法进行注册,此刻InstanceConfigMo枸杞nitor才会实在的对这个destination装备进行扫描监听。关于那些autoScan为false的destination,则不会进行mysql怎样读监听。

现在SpringInstancjson格局eConfigMonitor对这两个办法都进行了完毕,而ManagerInstanceConfigMonitor现在对这两个办法完毕的都是空,需求开发者自己来完毕。

在完毕ManagerInstanceConfig源码本钱Monitor时,能够参看SpringInstanceConfigMonitor。

此处不计划再继续进行剖析Sprin枸杞gInstanceConfigMonitor的源码json是什么意思,由于逻辑很简略,感兴趣的读者能够自行查看SpringInstanceConfigMonitor 的scan办法,内部源码年代在什么状况下会回调defualtAction的start、stop、reload办法 。

总结

deployer模块的首要效果:

  • 读取canal.properties,供认canal instance的配mysql数据库备加json是什么意思载办法
  • 供认canal in源码stance的建议办法:独立建议或许集群办法建议
  • 监听canal instance的装备的改动json数组,动态连续、建议或新增
  • 建议camysql数据库命令大全nal s龚俊erv源码买卖网站源码er,监听客户端央求