先用java代码手写一遍,方便后续业务逻辑理解
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.24</version> </dependency> <dependency> <groupId>commons-dbutils</groupId> <artifactId>commons-dbutils</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
# 服务端口 server.port=10000 # 服务名 spring.application.name=canal-client # 环境设置:dev、test、prod spring.profiles.active=dev # mysql数据库连接 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=603409875
首先声明一个队列来接收sql语句
//sql队列 private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); @Resource private DataSource dataSource;
编写主流程方法
// 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe("cancal_mysql\\..*");//订阅cancal_mysql库下的全表 connector.rollback(); try { while (true) { //尝试从master那边拉去数据batchSize条记录,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); } else { //数据处理 dataHandle(message.getEntries()); } connector.ack(batchId); //当队列里面堆积的sql大于一定数值的时候就模拟执行 if (SQL_QUEUE.size() >= 1) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } finally { connector.disconnect(); }
数据处理方法
/** * 数据处理 * * @param entrys */ private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException { for (CanalEntry.Entry entry : entrys) { if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType == CanalEntry.EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == CanalEntry.EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == CanalEntry.EventType.INSERT) { saveInsertSql(entry); } } } }
解析sql并保存sql语句方法
删除(只做了简单的语句删除,下面类似)
/** * 保存删除语句 * * @param entry */ private void saveDeleteSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (CanalEntry.Column column : columnList) { if (column.getIsKey()) { //暂时只支持单一主键 if(column.getMysqlType().contains("varchar")){ sql.append(column.getName() + "='" + column.getValue()+"'"); }else { sql.append(column.getName() + "=" + column.getValue()); } break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
新增
/** * 保存插入语句 * * @param entry */ private void saveInsertSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
修改
/** * 保存更新语句 * * @param entry */ private void saveUpdateSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); for (int i = 0; i < newColumnList.size(); i++) { sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'"); if (i != newColumnList.size() - 1) { sql.append(","); } } sql.append(" where "); List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : oldColumnList) { if (column.getIsKey()) { //暂时只支持单一主键 if(column.getMysqlType().contains("varchar")){ sql.append(column.getName() + "='" + column.getValue()+"'"); }else { sql.append(column.getName() + "=" + column.getValue()); } break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
jdbc操作
/** * 入库 * @param sql */ public void execute(String sql) { Connection con = null; try { if(null == sql) return; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.update(con, sql); System.out.println("update: "+ row); } catch (SQLException e) { e.printStackTrace(); } finally { DbUtils.closeQuietly(con); } }
运行sql语句:
INSERT INTO `tb_commodity_info` ( `id`, `commodity_name`, `commodity_price`, `number`, `description` ) VALUES ( '030acbd3b71011ecb9760242ac110005', '测试0001', '5.88', 11, '描述信息0001' );
执行结果:
库1:
库2:
上面的代码初步可以实现一般情况下的数据迁移,接下来我们来实现一下给予adapter的方式
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.adapter-1.1.5-SNAPSHOT.tar.gz
mkdir /tmp/canal_adapter tar zxvf canal.adapter-1.1.5.tar.gz -C /tmp/canal_adapter
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer #kafka.bootstrap.servers: 127.0.0.1:9092 #kafka.enable.auto.commit: false #kafka.auto.commit.interval.ms: 1000 #kafka.auto.offset.reset: latest #kafka.request.timeout.ms: 40000 # kafka.session.timeout.ms: 30000 # kafka.isolation.level: read_committed # kafka.max.poll.records: 1000 # rocketMQ consumer # rocketmq.namespace: # rocketmq.namesrv.addr: 127.0.0.1:9876 # rocketmq.batch.size: 1000 # rocketmq.enable.message.trace: false # rocketmq.customized.trace.topic: # rocketmq.access.channel: # rocketmq.subscribe.filter: # rabbitMQ consumer # rabbitmq.host: # rabbitmq.virtual.host: # rabbitmq.username: # rabbitmq.password: # rabbitmq.resource.ownerId: srcDataSources: defaultDS: url: jdbc:mysql://源数据库ip:端口/cancal_mysql?useUnicode=true username: root password: root canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: # - name: logger - name: rdb key: mysql1 properties: jdbc.driverClassName: com.mysql.jdbc.Driver jdbc.url: jdbc:mysql://127.0.0.1:3306/cancal_mysql?useUnicode=true jdbc.username: root jdbc.password: 603409875 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase # - name: es # hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode # properties: # mode: transport # or rest # # security.auth: test:123456 # only used for rest mode # cluster.name: elasticsearch # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address
修改conf/rdb/tb_commodity_info.yml
修改需要监听的表名为yml,若监听多个表可以生产多个yml
mv mytest_user.yml tb_commodity_info.yml
在修改配置文件
dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: mysql1 concurrent: true dbMapping: database: cancal_mysql table: tb_commodity_info targetTable: tb_commodity_info targetPk: id: id mapAll: true # targetColumns: # id: # name: # role_id: # c_time: # test1: # etlCondition: "where c_time>={}" commitBatch: 3000 # 批量提交的大小 ## Mirror schema synchronize config #dataSourceKey: defaultDS #destination: example #groupId: g1 #outerAdapterKey: mysql1 #concurrent: true #dbMapping: # mirrorDb: true # database: mytest
注意:由于我存的是mysql8,所以需要在lib中加入8的mysql-connector-java-8.0.24.jar
执行启动命令
sh bin/startup.sh
不出意外的果然出意外了
由于出现了意外,所以我们吧源码拉下来跑一遍后,更换了上面的配置等问题,终于成功了
git clone https://github.com/alibaba/canal.git
adapter项目启动入口是client-adapter包下的launcher模块
修改配置文件:application.yml与rdb目录下的tb_commodity_info.yml
启动类CanalAdapterApplication
通过源码追溯
在启动时,依据配置需要加载rdb的spi,需要在之前加载logger,所以需要在配置中加入logger
canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger #解决此bug问题 - name: rdb key: mysql1 properties: jdbc.driverClassName: com.mysql.jdbc.Driver jdbc.url: jdbc:mysql://localhost:3306/cancal_mysql?serverTimezone=GMT%2B8 jdbc.username: root jdbc.password: 603409875
这时候可以将client-adapter的maven进行install一遍,会出现不少缺jar包的方式,我执行时缺了俩个jar
将这俩个jar打包出来后,在install adapter就可以了
注意:若还不行,可以先将launcher package然后在执行
进入ConfigLoader&load(Properties envProperties)方法中,看到加载表映射关系,跟代码后发现,说没有映射文件
为了查这个问题,我又跑到spi加载文件的地方去查,查到加载的路径是:
canal/1.1.5/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin/client-adapter.rdb-1.1.5-jar-with-dependencies.jar
打开jar包,原来是之前没改的时候的jar,此时又去查询install了rdb的包,在重新pack一遍,终于可以运行了