阿里解决异地机房数据同步(基于cannal的Otter)问题。直接读取数据库进行同步会造成数据库服务器压力过大,所以通过读取binlog增量日志(增删改)来进行增量数据的获取,由此衍生了数据订阅和消费的业务。
基于数据库日志解析完成数据库增量数据同步。
以事件的形式记录了DDL和DML操作,且是事务安全型的。开启binglog会有大概1%的性能损耗。
数据实时收仓,数据恢复,缓存刷新,es同步,Otter组件(原本的目的收到数据后otter同步到不同机房数据库)
a statement 语句级别
节省空间但是可能造成数据不一致 如now()函数
b row 行级别
保持数据一致性,只记录sql执行后的结果,占用空间。数据分析一般用这个。
c mixed
二者的结合,不常用
把自己伪装成一个slave,假装从Master中复制数据。
a Mysql 8
(Mysql8 依赖)https://www.microsoft.com/zh-cn/download/details.aspx?id=42642
b 不能远程连接问题
use mysql; update user set host = '%' where user ='root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION; //赋予任何主机访问数据的权限
FLUSH PRIVILEGES;
b 创建数据库和表
c 修改配置文件开启Binlog
win需要打开可查看隐藏文件,C:\ProgramData\MySQL\MySQL Server 8.0可以查看到my.ini
server-id=服务id
log-bin=二进制日志文件的名称
binlog_format=日志类型
binlog-do-db=数据库
INSERT INTO fruit VALUES (1,'香蕉',20.22),(2,'苹果',40.22); INSERT INTO fruit VALUES (3,'菠萝',20.22),(4,'榴莲',40.22); INSERT INTO fruit VALUES (4,'西瓜',20.22),(5,'葡萄',40.22);
binlog日志变大
重启服务会新产生一个binlog文件
a 下载链接 https://github.com/alibaba/canal/releases
b 目录结构
c 修改 canal.properties
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml # canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; #这后面不需要分号!!! canal.instance.tsdb.url = jdbc:mysql://192.168.67.222:13306/goods_mark canal.instance.tsdb.dbUsername = root canal.instance.tsdb.dbPassword = root
7启动canal
POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>cannalDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>cannalDemo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
测试代码
package com.example.cannaldemo.config; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) throws InvalidProtocolBufferException { //1.获取 canal 连接对象 user pwd 为cannal的密码可以不写 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", ""); //2.获取连接 只需要获取一次即可 canalConnector.connect(); while (true) { //3.指定要监控的数据库 * 表示数据库下面所有表 canalConnector.subscribe("goods_mark.*"); //4.获取 Message Message message = canalConnector.get(100); List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0) { System.out.println("没有数据,休息一会"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { for (CanalEntry.Entry entry : entries) { //TODO 获取表名 String tableName = entry.getHeader().getTableName(); // TODO Entry 类型 CanalEntry.EntryType entryType = entry.getEntryType(); // TODO 判断 entryType 是否为 ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // TODO 序列化数据 ByteString storeValue = entry.getStoreValue(); // TODO 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //TODO 获取事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); //TODO 获取具体的数据 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); //TODO 遍历并打印数据 for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); JSONObject beforeData = new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:" + beforeData + ",After:" + afterData); } } } } } } }