整理一下以前的笔记:
项目背景:
需要使用Maxwell进行对MySQL的业务库数据采集到Kafka
版本选择:当时v1.29.1开始支持HA
参考文档:https://maxwells-daemon.io/quickstart/
下载地址:https://github.com/zendesk/maxwell/releases/tag/v1.29.1
解压:tar -zxvf maxwell-1.29.1.tar.gz -C /opt/soft
mysql权限:
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'maxwell1223'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; mysql> flush privileges;
重启mysql: service mysqld restart
Maxwell启动:
# 过滤数据库 只采集dw_test
#要提前创建kafka_topic dw_test
/opt/soft/maxwell-1.26.3/bin/maxwell --user='maxwell' --password='maxwell1223' --host='127.0.0.1' \ --producer=kafka --kafka.bootstrap.servers=cdh01:9092,cdh02:9092,cdh03:9092 \ --kafka_topic=dw_test --filter 'exclude: *.*, include: dw_test.*'
kafka监控: kafka-eagle http://cdh02:8048/
kafka消费:
kafka-console-consumer.sh \ --bootstrap-server cdh01:9092,cdh02:9092,cdh03:9093 \ --topic dw_test \ --from-beginning
mysql建表:
CREATE TABLE `test01` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `host` varchar(45) DEFAULT NULL COMMENT 'ip', `port` int(11) DEFAULT NULL COMMENT 'process id', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
插入数据测试:
insert into test01 (host,port) VALUES ('127.0.0.1',99) insert into test01 (host,port) VALUES ('127.0.0.1',133)
kafka消费者消费:
{"database":"dw_test","table":"test01","type":"insert","ts":1651319606,"xid":65972,"commit":true,"data":{"id":1,"host":"127.0.0.1","port":99}} {"database":"dw_test","table":"test01","type":"insert","ts":1651319687,"xid":67733,"commit":true,"data":{"id":2,"host":"127.0.0.1","port":133}}