源码分析
Canal的版本是1.0.3,先找到程序的入口点
/** * canal独立版本启动的入口类 * */ public class CanalLauncher { ... public static void main(String[] args) { try { ... if (remoteConfigLoader != null) { remoteConfigLoader.startMonitor(new RemoteCanalConfigMonitor() { @Override public void onChange(Properties properties) { try { // 远程配置canal.properties修改重新加载整个应用 canalStater.destroy(); //从这里进入下面的类 canalStater.start(properties); } catch (Throwable throwable) { logger.error(throwable.getMessage(), throwable); } } }); } .... } catch (Throwable e) { logger.error("## Something goes wrong when starting up the canal Server:", e); } } }
可以看到启动时,通过配置文件判断是否需要将监听到的binlog数据写入mq
/** * Canal server 启动类 * */ public class CanalStater { private CanalController controller = null; private CanalMQProducer canalMQProducer = null; private Thread shutdownThread = null; private CanalMQStarter canalMQStarter = null; /** * 启动方法 * * @param properties canal.properties 配置 * @throws Throwable */ synchronized void start(Properties properties) throws Throwable { String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); //根据配置初始化mq的生产者 if (serverMode.equalsIgnoreCase("kafka")) { canalMQProducer = new CanalKafkaProducer(); } else if (serverMode.equalsIgnoreCase("rocketmq")) { canalMQProducer = new CanalRocketMQProducer(); } if (canalMQProducer != null) { ... if ("true".equals(autoScan)) { String rootDir = CanalController.getProperty(properties, CanalConstants.CANAL_CONF_DIR); ... } else { String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS); System.setProperty(CanalConstants.CANAL_DESTINATIONS, destinations); } } logger.info("## start the canal server."); controller = new CanalController(properties); //在这里进入下一面的类 controller.start(); ... if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); MQProperties mqProperties = buildMQProperties(properties); //启动mq生产者 canalMQStarter.start(mqProperties); controller.setCanalMQStarter(canalMQStarter); } } ... }
下面看下这个类CanalController
创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
/** * canal调度控制器 * */ public class CanalController { .... public void start() throws Throwable { ... for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 创建destination的工作节点 //创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状 if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); 态 if (!config.getLazy() && !runningMonitor.isStart()) { //从这里进入下一个类 runningMonitor.start(); } } if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } // 启动网络接口 if (canalServer != null) { canalServer.start(); } } ... }
上面现实了canal的程序入口,下面重点来分析下ServerRunningMonitor 这个类看下cancal如何现实主备切换
一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行尝试创建新的同名节点,重新选出一个canal server启动instance.
/** * 针对server的running节点控制 * */ public class ServerRunningMonitor extends AbstractCanalLifeCycle { private BooleanMutex mutex = new BooleanMutex(false); // 当前服务节点状态信息 private ServerRunningData serverData; // 当前实际运行的节点状态信息 private volatile ServerRunningData activeData; ... public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active release = true; releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); //利用AQS自旋锁实现多线程下的无锁阻塞 mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络闪断或者zk异常,导致出现频繁的切换操作 // 具体场景: canal server所在的网络出现闪断,导致zookeeper认为session失 // 效, 释放了running节点,此时canal server对应的jvm并未退出,(一种假死状 // 态,非常特殊的情况) delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; } ... private void initRunning() { if (!isStart()) { return; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; processActiveEnter();// 触发一下事件 mutex.set(true); } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } } /** * 判断处于active的节点是不是本机实例 * @param address * @return */ private boolean isMine(String address) { return address.equals(serverData.getAddress()); } }
下面来做个下HA机制的测试
操作步骤
一.配置
把canal配置成HA的架构,配置如下
1.canal-master
canal.properties
canal.id = 2 canal.ip = 127.0.0.1 canal.port = 32121 canal.metrics.pull.port = 11112 canal.zkServers = 127.0.0.1:2181 #基于ZK记录解析位点,HA机制下必须启用 canal.instance.global.spring.xml = classpath:spring/default-instance.xml canal.mq.servers = 127.0.0.1:9092
Instace.properties
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.mq.topic=example canal.mq.partition=0
2.canal-slave
canal.properties
canal.id = 3 canal.ip = 127.0.0.1 canal.port = 32122 canal.metrics.pull.port = 11113 canal.zkServers = 127.0.0.1:2181 #基于ZK记录解析位点,HA机制下必须启用 canal.instance.global.spring.xml = classpath:spring/default-instance.xml canal.mq.servers = 127.0.0.1:9092
Instace.properties
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.mq.topic=example canal.mq.partition=0
两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
高可用测试
1.分别启动canal-master和canal-slave
在zk里可以看到canal-master先创建running节点处于active,此时canal-master处于standby
2.截断表t_test
Kafka tool里这是出现了一条数据type为TRUNCATE
{ "data": null, "database": "trade", "es": 1555560679000, "id": 1, "isDdl": true, "mysqlType": null, "old": null, "pkNames": null, "sql": "TRUNCATE `t_test`", "sqlType": null, "table": "t_test", "ts": 1555560920238, "type": "TRUNCATE" } |
3.通过navicat往mysql里插入数据
直接执行t_test.sql文件,往t_test表里按照id编号插入8000条数据(注意不要用navicat的数据同步功能可能会造成数据写入数据不是按照id递增的顺序,对测试结果的观察不大友好)
{ "data": null, "database": "trade", "es": 1555560679000, "id": 1, "isDdl": true, "mysqlType": null, "old": null, "pkNames": null, "sql": "TRUNCATE `t_test`", "sqlType": null, "table": "t_test", "ts": 1555560920238, "type": "TRUNCATE" }
4.模拟 canal server对应的jvm异常crash的情况
把canal-master对应得java进程强制kill掉,过了段时间可以看到running节点cid变成了3,就是canal-slave的id,说明canal完成了主从切换。
5.检查kafka里数据的正确性
通过新running(ctime = 2019-04-18 12:33:07) 节点的创建时间查找时间为2019-04-18 12:33:07附近的数据可以发现id为1392-1394的数据插入重复发到kafka里了
原因分析:canal的HA机制是通过把从mysql binlg拉取的位点信息异步写入zk里的,发生主从切换的时候canal-slave节点会zk里把该位点信息读取出来,并且从这个地方开始解析mysql的binlog日志。canal-master可能有些数据已经发到kafka里但是还来不及更新zk就挂掉了,所以就会出现数据重复发到kafka的情况
写入zk的cursor位点信息如下:
建议解决方案:HA机制可以保证拉取binlog的数据不丢失,但是会出现重复发送到kafka的情况,DELETE和UPDATE事件的记录消息并没影响,而INSERT事件需要根据原始数据的id进行幂等性判断
三.功能测试
1.执行批量更新的操作
UPDATE `t_test` a SET `net_balance` = '123.0000000000', `sma_committed` = '123.0000000000', `high_water_mark` = '2359539.0000000000', `version` = '2018-11-01' WHERE a.create_time >= '2018-10-11 14:42:22' AND a.create_time <= '2019-02-13 04:28:23'
2.查看kafka里的数
据
offset=0的数据详情
结论:批量操作的DML语句是多条数据(具体数量并不确定)合在一起发一条kafka数据的
四.数据格式
Kafka收到canal解析的DML binlog日志格式
{ "data": [ { //修改,删除,新增后的数据 "external_id": "20180522032329", "complete_time": "2018-05-22 20:20:12", "return_time": "2018-05-24 11:34:12" } ], "database": "数据库名", "es": 1555489722000, "id": 主键值, "isDdl": false, "mysqlType": { //字段格式 "external_id": "varchar(32)", "complete_time": "datetime", "return_time": "datetime" }, "old": [ { //老的数据 "complete_time": "2018-05-22 20:20:22", "return_time": "2018-05-24 11:34:10" } ], "pkNames": [ //主键值 "external_id" ], "sql": "", "sqlType": { "external_id": 12, "complete_time": 93, "return_time": 93 }, //表名 "table": "return_time", "ts": 1555489722742, //操作类型 "type": "UPDATE" }
Kafka收到canal解析的DDL binlog日志格式
{ "data": null, "database": "trade", "es": 1555575860000, "id": 2832, "isDdl": true, "mysqlType": null, "old": null, "pkNames": null, "sql": "ALTER TABLE `t_order`\r\nADD COLUMN `test` varchar(255) NULL AFTER `is_bo_modified`", "sqlType": null, "table": "t_order", "ts": 1555575861188, "type": "ALTER" }