对应版本: Mysql 5.7 Es 7.4 Canal 1.1.5
https://blog.csdn.net/qq_24950043/article/details/122463372
下载如下:https://github.com/alibaba/canal/releases
上面配置过不需要配置
修改/conf/application.yml
server: port: 8081 #adapter 服务端口 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # tcp - canal server读取模式, kafka rocketMQ flatMessage: true zookeeperHosts: # adapter集群环境配置,主备锁。 syncBatchSize: 1 #批处理条数 retries: 0 #重试次数 timeout: #同步超时 accessKey: secretKey: consumerProperties: # 消息服务消费端地址,根据mode读取 # canal tcp consumer canal.tcp.server.host: {canalServer}:11111 # cancal地址 canal.tcp.zookeeper.hosts: # 如果配置了canalServerHost, 则以canalServerHost为准 canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer kafka.bootstrap.servers: 127.0.0.1:9092 kafka.enable.auto.commit: false kafka.auto.commit.interval.ms: 1000 kafka.auto.offset.reset: latest kafka.request.timeout.ms: 40000 kafka.session.timeout.ms: 30000 kafka.isolation.level: read_committed kafka.max.poll.records: 1000 # rocketMQ consumer rocketmq.namespace: rocketmq.namesrv.addr: 127.0.0.1:9876 rocketmq.batch.size: 1000 rocketmq.enable.message.trace: false rocketmq.customized.trace.topic: rocketmq.access.channel: rocketmq.subscribe.filter: # rabbitMQ consumer rabbitmq.host: rabbitmq.virtual.host: rabbitmq.username: rabbitmq.password: rabbitmq.resource.ownerId: srcDataSources: defaultDS: #支持多数据源 url: jdbc:mysql://{ip}:3306/test?useUnicode=true username: canal password: canal canalAdapters: # 同步适配器列表,同步到哪各数据源 - instance: example # canal 实例名或者 MQ topic 名 groups: # 适配器组,支持多个不同入库数据源 - groupId: g1 # 分组id, 如果是MQ模式将用到该值 outerAdapters: # 分组内适配器列表 - name: logger # 打印获取到的消息数据日志 # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es7 #用于匹配Adapter,该名称对应数据源文件名称 key: es-user #必须有key,否则是比es里面的配置 hosts: {ip}:9300 # 127.0.0.1:9200 for rest mode properties: mode: transport # 9300对应transport or 9200对应rest # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address
修改yml文件
dataSourceKey: defaultDS #源数据源的key, 对应上面配置的srcDataSources中的值 outerAdapterKey: es-user # 对应application.yml中es配置的key destination: example # cannal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: mytest_user # es 的索引名称 _type: _doc # es 的type名称, es7下无需配置此项 _id: id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配 upsert: true # 支持不存在新增操作 # pk: id sql: "select t.id,t.username,t.phone from t_user t" #需要更新的所有字段都需写上 # objFields: # _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的 etlCondition: "where a.c_time>={}" # etl 的条件参数,通过服务的etl接口传入。
修改完成启动报错
java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
原因:common和escore的druid包冲突。
需要修改源码解决,打开刚刚下载的源码
1, 修改canal-canal-1.1.5\client-adapter\escore\pom.xml
2, 修改canal-canal-1.1.5\pom.xml
添加 true 主要是打包跳过测试,避免报错
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>${maven-surefire.version}</version> <configuration> <useSystemClassLoader>true</useSystemClassLoader> <forkMode>once</forkMode> <argLine>${argline} ${jacocoArgLine}</argLine> <systemProperties> <!-- common shared --> </systemProperties> <skipTests>true</skipTests> <!--默认关掉单元测试 --> </configuration> </plugin>
Maven重新打包
执行完后用canal-canal-1.1.5\client-adapter\es7x\target\client-adapter.es7x-1.1.5-jar-with-dependencies.jar下面包替换conf/plugins/client-adapter.es7x-1.1.5-jar-with-dependencies.jar
之后重启
NoNodeAvailableException[None of the configured nodes are available:
修改ES的elasticsearch.yml文件:
注意:cluster.name 与 adapter下 /conf/application.yml文件的cluster.name名称相同