Canal的介绍
基于数据库增量日志解析,提供增量数据订阅和消费
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务Cache刷新
带业务逻辑的增量数据处理
MySQL Master将数据变更的操作写入二进制日志binary log中, 其中记录的内容叫做二进制日志事件binary log events,可以通过show binlog events命令进行查看
MySQL Slave会将Master的binary log中的binary log events拷贝到它的中继日志relay log
MySQL Slave重读并执行relay log中的事件,将数据变更映射到它自己的数据库表中
Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议
MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal)
Canal解析binary log对象(数据为byte流)
文件和meta data的集合(root filesystem)
分层的,并且每一层都可以添加改变删除文件,成为一个新的image
不同的image可以共享相同的layer
Image本身是read-only的
通过Image创建(copy)
在Image layer之上建立一个container layer(可读写)
类比面向对象:类和实例
Image负责APP的存储和分发,Container负责运行APP
Bridge:桥接网络。默认情况下启动的Docker容器,都是使用Bridge,Docker安装时创建的桥接网络,每次Docker容器重启时,会按照顺序获取对应的IP地址,这个就导致重启下,Docker的IP地址就变了。
None:无指定网络。使用 --network=none,Docker容器就不会分配局域网的IP。
Host:主机网络。使用--network=host,此时,Docker容器的网络会附属在主机上,两者是互通的。例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器中。
docker network create --subnet=172.18.0.0/16 mynetwork
查看存在的网络类型docker network ls:
搭建Canal环境
附上Docker的下载安装地址:https://www.docker.com/products/docker-desktop。
下载Canal镜像docker pull canal/canal-server:
下载MySQL镜像docker pull mysql,下载过的则如下图:
查看已经下载好的镜像docker images:
接下来通过镜像生成MySQL容器与canal-server容器:
查看Docker中运行的容器docker ps:
MySQL的配置修改
以上只是初步准备好了基础的环境,但是怎么让Canal伪装成Salve并正确获取MySQL中的binary log呢?
对于自建MySQL,需要先开启Binlog写入功能,配置binlog-format为ROW模式,通过修改MySQL配置文件来开启bin_log,使用find / -name my.cnf查找my.cnf,修改文件内容如下:
[mysqld]log-bin=mysql-bin # 开启binlogbinlog-format=ROW # 选择ROW模式server_id=1 # 配置MySQL replaction需要定义,不要和Canal的slaveId重复
进入MySQL容器docker exec -it mysql bash。
创建链接MySQL的账号Canal并授予作为MySQL slave的权限,如果已有账户可直接GRANT:
mysql -uroot -proot# 创建账号CREATE USER canal IDENTIFIED BY 'canal'; # 授予权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;# 刷新并应用FLUSH PRIVILEGES;
数据库重启后,简单测试 my.cnf 配置是否生效:
show variables like 'log_bin';show variables like 'log_bin';show master status;
canal-server的配置修改
进入canal-server容器docker exec -it canal-server bash。
编辑canal-server的配置vi canal-server/conf/example/instance.properties:
更多配置请参考:https://github.com/alibaba/canal/wiki/AdminGuide。
重启canal-server容器docker restart canal-server 进入容器查看启动日志:
docker exec -it canal-server bashtail -100f canal-server/logs/example/example.log
至此,我们的环境工作准备完成!
拉取数据并同步保存到ElasticSearch
环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取Canal解析后的binlog数据。首先我们基于Spring Boot搭建一个canal demo应用。结构如下图所示:
Student.java
CanalConfig.java
CanalDataParser.java
由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从GitHub上获取:
public static class TwoTuple<A, B> {
ElasticUtils.java
BinLogElasticSearch.java
CanalDemoApplication.java(Spring Boot启动类)
application.properties
Canal集群高可用的搭建
1、机器准备:
运行Canal的容器IP:172.18.0.4 , 172.18.0.8
ZooKeeper容器IP:172.18.0.3:2181
MySQL容器IP:172.18.0.6:3306
canal.port=11113canal.zkServers=172.18.0.3:2181canal.instance.global.spring.xml = classpath:spring/default-instance.xml
4、创建example目录,并修改instance.properties:
canal.instance.mysql.slaveId = 1235 #之前的canal slaveId是1234,保证slaveId不重复即可canal.instance.master.address = 172.18.0.6:3306
注意:两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。
启动两个不同容器的Canal,启动后,可以通过tail -100f logs/example/example.log查看启动日志,只会看到一台机器上出现了启动成功的日志。
比如我这里启动成功的是 172.18.0.4:
查看一下ZooKeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111:
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running{"active":true,"address":"172.18.0.4:11111","cid":1}
客户端链接, 消费数据
可以通过指定ZooKeeper地址和Canal的instance name,canal client会自动从ZooKeeper中的running节点获取当前服务的工作节点,然后与其建立链接:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running{"active":true,"address":"172.18.0.4:11111","cid":1}
对应的客户端编码可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一个HA连接:
CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");
链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端IP,链接的端口信息等(聪明的你,应该也可以发现,canal client也可以支持HA功能):
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running{"active":true,"address":"192.168.124.5:59887","clientId":1001}
数据消费成功后,canal server会在ZooKeeper中记录下当前最后一次消费成功的binlog位点(下次你重启client时,会从这最后一个位点继续进行消费):
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}
停止正在工作的172.18.0.4的canal server:
docker exec -it canal-server bashcd canal-server/binsh stop.sh
这时172.18.0.8会立马启动example instance,提供新的数据服务:
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running{"active":true,"address":"172.18.0.8:11111","cid":1}
与此同时,客户端也会随着canal server的切换,通过获取ZooKeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成。
异常与总结
修改完配置文件后需重启es服务。
elasticsearch-head查询报406 Not Acceptable
解决方法:
1、进入head安装目录;
2、cd _site/
3、编辑vendor.js 共有两处
使用elasticsearch-rest-high-level-client报org.elasticsearch.action.index.IndexRequest.ifSeqNo
相关参考:https://github.com/elastic/elasticsearch/issues/43023。
为什么ElasticSearch要在7.X版本不能使用type?
参考:https://www.waitig.com/为什么elasticsearch要在7-x版本去掉type.html
使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.NoNodeAvailableException由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。
可参考:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html。
设置Docker容器开启启动
如果创建时未指定 --restart=always ,可通过update 命令docker update --restart=always [containerID]
Docker for Mac network host模式不生效
Host模式是为了性能,但是这却对Docker的隔离性造成了破坏,导致安全性降低。在性能场景下,可以用--netwokr host开启Host模式,但需要注意的是,如果你用Windows或Mac本地启动容器的话,会遇到Host模式失效的问题。原因是Host模式只支持Linux宿主机。
参见官方文档:https://docs.docker.com/network/host/。
客户端连接ZooKeeper报authenticate using SASL(unknow error)
zookeeper.jar与Dokcer中的ZooKeeper版本不一致
zookeeper.jar使用了3.4.6之前的版本