[mysqld] # 主库id标识 server-id=1 # 开启binlog日志 log-bin=mysql-bin # 日志格式类型 binlog_format=row # (可选)声明只对哪个库进行日志输出 binlog-do-db=gmall-2021
没有表就创建一个测试用的表:
CREATE TABLE user_info( `id` VARCHAR(255), `name` VARCHAR(255), `sex` VARCHAR(255) );
和主从复制一样,需要提供一个从库监听的账号:
CREATE USER 'canal'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'canal'@'%' WITH GRANT OPTION; FLUSH PRIVILEGES; -- GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Linux平台:
# 解压方式一 tar -zxvf canal.deployer-1.1.2.tar.gz mkdir canal.deployer-1.1.2 mv bin canal.deployer-1.1.2/ mv logs canal.deployer-1.1.2/ mv lib canal.deployer-1.1.2/ mv conf canal.deployer-1.1.2/ # 解压方式二 mkdir ~/canal-1.1.2 tar -zxvf canal.deployer-1.1.2.tar.gz -C ~/canal-1.1.2/
Windows平台:
新建一个canal的目录,然后打开目录 把压缩包内容解压到目录中即可
案例只是为了演示,单机运行的方式进行配置
# 备份 instance.properties文件 cd ~/canal-1.1.2/example cp instance.properties instance.properties.bak
编辑example下的实例文件
vim ~/canal-1.1.2/conf/example/instance.properties
关键参数项:
# 伪装从库的id,不要和主库id一致即可 canal.instance.mysql.slaveId=20 # 主库IP地址和端口号 canal.instance.master.address=192.168.2.225:3308 # 主库开设的监听账号 canal.instance.dbUsername=canal canal.instance.dbPassword=123456 # 字符集 canal.instance.connectionCharset=UTF-8 # 默认监听的db? canal.instance.defaultDatabaseName=canal
restart.sh startup.bat startup.sh stop.sh # windows 平台直接运行 startup.bat # 关闭就是直接关闭终端窗口即可 startup.bat # linux 平台 startup.sh # 启动 restart.sh # 重启 stop.sh # 停止
这里使用Java做一个客户端来查看消息
创建普通Maven项目,引入两个依赖
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> </dependencies>
客户端类:
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException { //TODO 获取连接 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); while (true) { //TODO 连接 canalConnector.connect(); //TODO 订阅数据库 canalConnector.subscribe("canal.*"); //TODO 获取数据 Message message = canalConnector.get(100); //TODO 获取Entry集合 List<CanalEntry.Entry> entries = message.getEntries(); //TODO 判断集合是否为空,如果为空,则等待一会继续拉取数据 if (entries.size() <= 0) { System.out.println("当次抓取没有数据,休息一会。。。。。。"); Thread.sleep(1000); } else { //TODO 遍历entries,单条解析 for (CanalEntry.Entry entry : entries) { //1.获取表名 String tableName = entry.getHeader().getTableName(); //2.获取类型 CanalEntry.EntryType entryType = entry.getEntryType(); //3.获取序列化后的数据 ByteString storeValue = entry.getStoreValue(); //4.判断当前entryType类型是否为ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { //5.反序列化数据 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //6.获取当前事件的操作类型 CanalEntry.EventType eventType = rowChange.getEventType(); //7.获取数据集 List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); //8.遍历rowDataList,并打印数据集 for (CanalEntry.RowData rowData : rowDataList) { JSONObject beforeData = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } //数据打印 System.out.println("Table:" + tableName + ",EventType:" + eventType + ",Before:" + beforeData + ",After:" + afterData); } } else { System.out.println("当前操作类型为:" + entryType); } } } } } }
开启后,想指定的表中写入数据:
客户端输出消息:
当次抓取没有数据,休息一会。。。。。。 当次抓取没有数据,休息一会。。。。。。 当次抓取没有数据,休息一会。。。。。。 当次抓取没有数据,休息一会。。。。。。 当前操作类型为:TRANSACTIONBEGIN Table:user_info,EventType:INSERT,Before:{},After:{"sex":"男","name":"张三","id":"1"} 当前操作类型为:TRANSACTIONEND 当前操作类型为:TRANSACTIONBEGIN Table:user_info,EventType:INSERT,Before:{},After:{"sex":"男","name":"张三","id":"2"} 当前操作类型为:TRANSACTIONEND 当次抓取没有数据,休息一会。。。。。。
# 指定输出模式为kafka canal.serverMode = kafka # kafka集群地址,如单机,则写一个即可 canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
修改 example/instance.properties
# mq config # 指定Topic名称 和 分区数量 canal.mq.topic=canal_test canal.mq.partitionsNum=1
重启canal以加载配置信息
启动Kafka消费者来查看是否运行:
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test
执行插入SQL:
INSERT INTO user_info VALUES('1001','zhangsan','male'),('1002','lisi','female');
Kafka控制台:
{ "data": [ { "id": "1001", "name": "zhangsan", "sex": "male" }, { "id": "1002", "name": "lisi", "sex": "female" } ], "database": "gmall-2021", "es": 1639360729000, "id": 1, "isDdl": false, "mysqlType": { "id": "varchar(255)", "name": "varchar(255)", "sex": "varchar(255)" }, "old": "null", "sql": "", "sqlType": { "id": 12, "name": 12, "sex": 12 }, "table": "user_info", "ts": 1639361038454, "type": "INSERT" }