MySql教程

通过 Canal 将 MySQL 数据实时同步到 Easysearch

本文主要是介绍通过 Canal 将 MySQL 数据实时同步到 Easysearch,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Canal 是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave,实时接收 MySQL 的增量数据 binlog,然后通过 RESTful API 将数据写入到 Easysearch 中。

前提条件

  1. 部署 Easysearch 集群。

  2. 部署 MySQL 数据库。

  3. 部署 Gateway,Canal Adapter 不支持使用 HTTPS 协议连接,使用 Gateway 代理 Easysearch 。

  4. 部署 Console,方便查看 Easysearch 数据。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

创建 canal 用户,授权 canal 连接 MySQL 具有作为 MySQL slave 的权限。

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

操作步骤

在进行数据同步时支持自定义索引 Mapping,但需保证 Mapping 中定义的字段(名称+类型)与 MySQL 中一致。

1. 准备 MySQL 数据源

create database canal;

use canal;

CREATE TABLE `test` (

`id` bigint(32) NOT NULL,

`name` text NOT NULL,

`age` smallint NOT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB

DEFAULT CHARACTER SET=utf8;

2. Easysearch 创建索引

PUT test

{

"settings" : {

"index" : {

"number_of_shards" : "1",

"number_of_replicas" : "1"

}

},

"mappings" : {

"properties" : {

"id": {

"type": "integer"

},

"name": {

"type" : "text"

},

"age" : {

"type" : "integer"

}

}

}

}

3. 安装并启动 Canal-server

下载:https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

修改配置文件

vi conf/example/instance.properties

启动 canal

sh bin/startup.sh

启动成功日志信息,logs/canal/canal.log

关闭 canal

sh bin/stop.sh

4. 安装并启动 Canal-adapter

下载:https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz

修改配置文件:application.yml

server:

port: 8081

spring:

jackson:

date-format: yyyy-MM-dd HH:mm:ss

time-zone: GMT+8

default-property-inclusion: non_null

  

canal.conf:

flatMessage: true

syncBatchSize: 1000

retries: -1

timeout:

accessKey:

secretKey:

consumerProperties:

canal.tcp.server.host: 127.0.0.1:11111

canal.tcp.batch.size: 500

  

srcDataSources:

defaultDS:

url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true

username: canal

password: canal

canalAdapters:

groups:

- groupId: g1

outerAdapters:

- name: logger

- name: es7

properties:

security.auth: admin:4ad8f8f792e81cd0a6de

cluster.name: easysearch

新增 canal-adapter/conf/es7/test.yml,配置索引和表的映射关系。

dataSourceKey: defaultDS

destination: example

groupId: g1

esMapping:

_index: test # es 的索引名称

_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配

# sql映射

sql: " select a.id as _id,a.id,a.name,a.age from test a "

etlCondition: "where a.id>={}"

commitBatch: 3000 # 提交批大小

启动 canal-adapter

./bin/startup.sh

5. 验证增量数据同步

在 MySQL 数据库中,对 test 表插入两条数据。

inserttest(id,name,age) values(1,'canal_test1',11);

inserttest(id,name,age) values(2,'canal_test2',22);

6. 在 Console 中,执行以下命令查询数据

这篇关于通过 Canal 将 MySQL 数据实时同步到 Easysearch的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!