目标:
mongodb --flink cdc--> kafka --> doris
ll ./lib -rw-r--r-- 1 hadoop hadoop 249567 Oct 12 17:23 flink-connector-jdbc_2.11-1.13.3.jar -rw-r--r-- 1 hadoop hadoop 92313 Oct 13 00:15 flink-csv-1.13.3.jar -rw-r--r-- 1 hadoop hadoop 115418686 Oct 13 00:29 flink-dist_2.11-1.13.3.jar -rw-r--r-- 1 hadoop hadoop 148127 Oct 13 00:13 flink-json-1.13.3.jar -rwxrwxrwx 1 hadoop hadoop 7709740 Jun 8 21:13 flink-shaded-zookeeper-3.4.14.jar -rw-r--r-- 1 hadoop hadoop 3674114 Oct 12 17:35 flink-sql-connector-kafka_2.11-1.13.3.jar -rw-r--r-- 1 hadoop hadoop 15296786 Nov 15 14:30 flink-sql-connector-mongodb-cdc-2.1.0.jar -rw-r--r-- 1 hadoop hadoop 19648146 Nov 15 14:30 flink-sql-connector-mysql-cdc-2.1.0.jar -rw-r--r-- 1 hadoop hadoop 36453353 Oct 13 00:25 flink-table_2.11-1.13.3.jar -rw-r--r-- 1 hadoop hadoop 41061738 Oct 13 00:26 flink-table-blink_2.11-1.13.3.jar -rwxrwxrwx 1 hadoop hadoop 67114 Mar 31 2021 log4j-1.2-api-2.12.1.jar -rwxrwxrwx 1 hadoop hadoop 276771 Mar 31 2021 log4j-api-2.12.1.jar -rwxrwxrwx 1 hadoop hadoop 1674433 Mar 31 2021 log4j-core-2.12.1.jar -rwxrwxrwx 1 hadoop hadoop 23518 Mar 31 2021 log4j-slf4j-impl-2.12.1.jar -rw-r--r-- 1 hadoop hadoop 2475087 Sep 29 02:17 mysql-connector-java-8.0.27.jar
db.checkresult.insertMany ( [ { "_id": ObjectId("7159a93265b6c375acfa279h"), "userid": NumberLong(112480487), "mid": "14029647849143177307", "tag": [ "mnt_seven" ], "checkcontent": [ { "name": "14029647849143177303_112480480_7", "type": 1, "data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg" } ], "createat": NumberLong(1633265977) }, { "_id": ObjectId("8159a93265b6c375acfa2710h"), "userid": NumberLong(112480488), "mid": "14029647849143177303", "tag": [ "mnt_eight" ], "checkcontent": [ { "name": "14029647849143177303_112480480_8", "type": 2, "data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg" } ], "createat": NumberLong(1633265978) }, { "_id": ObjectId("9059a93265b6c375acfa279h"), "userid": NumberLong(112480489), "mid": "14029647849143177309", "tag": [ "mnt_nigh" ], "checkcontent": [ { "name": "14029647849143177303_112480480_9", "type": 3, "data": "http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg" } ], "createat": NumberLong(1633265979) } ] )
-- flink sql # create source mongodb CREATE TABLE mongo_checkresult ( _id STRING, //must be declared userid BIGINT, mid STRING, tag ARRAY<STRING>, checkcontent ARRAY<ROW<name STRING,type INT,data STRING>>, // embedded document createat BIGINT, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = '192.168.3.121:27017', 'username' = 'mongouser', 'password' = 'mongodbpwd', 'database' = 'xxx', 'collection' = 'checkresult' ); # create sink kafka ,嵌套的字段会展开 ## connector 必须用upsert-kafka,因为cdc定义的是upsert的数据 CREATE TABLE kafka_checkresult ( _id STRING, userid BIGINT, mid STRING, tag ARRAY<STRING>, check_name STRING, check_type INT, check_data STRING, createat BIGINT, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'checkresult', 'properties.bootstrap.servers' = '192.168.3.124:9092', 'key.format' = 'json', 'value.format' = 'json' ) # execute sink task insert into kafka_checkresult select _id, userid, mid, tag, checkcontent[1].name, checkcontent[1].type, checkcontent[1].data, createat from mongo_checkresult
# 观察修改是否也产生消息,upsert-kafka 会对消息做upsert及delete 操作,select * from kafka_checkresult 展现的为与源端一致的数据 kafka-console-consumer.sh --bootstrap-server 192.168.3.124:9092 --topic checkresult --from-beginning {"_id":"7159a93265b6c375acfa2790","userid":112480487,"mid":"14029647849143177307","tag":["mnt_seven"],"check_name":"14029647849143177303_112480480_7","check_type":1,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg","createat":1633265977} {"_id":"9059a93265b6c375acfa2790","userid":112480489,"mid":"14029647849143177309","tag":["mnt_nigh"],"check_name":"14029647849143177303_112480480_9","check_type":3,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg","createat":1633265979} {"_id":"8159a93265b6c375acfa2710","userid":112480488,"mid":"14029647849143177303","tag":["mnt_eight"],"check_name":"14029647849143177303_112480480_8","check_type":2,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg","createat":1633265978} #修改 userid db.checkresult.updateOne( { userid: 112480487}, { $set: {mid: "14029647849143177344"} } ); {"_id":"7159a93265b6c375acfa2790","userid":112480487,"mid":"14029647849143177344","tag":["mnt_seven"],"check_name":"14029647849143177303_112480480_7","check_type":1,"check_data":"http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg","createat":1633265977} # 删除,会生成null 记录 a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”
CREATE TABLE ods_safety_checkresult ( _id STRING, userid BIGINT, mid STRING, tag STRING, check_name STRING, check_type INT, check_data STRING, createat BIGINT ) ENGINE=OLAP UNIQUE KEY(_id) COMMENT "OLAP" DISTRIBUTED BY HASH(_id) BUCKETS 8 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "V2" );
create routine load etl_ods_safety_checkresult on ods_safety_checkresult columns ( _id, userid , mid , tag , check_name , check_type , check_data , createat ) PROPERTIES ( "format"="json", "desired_concurrent_number"="1", "max_error_number"="1000", "max_batch_interval"="5" ) FROM KAFKA ( "kafka_broker_list"= "192.168.3.121:9092", "kafka_topic" = "checkresult", "kafka_partitions" = "0", "kafka_offsets" = "0" );
mysql> show routine load for etl_ods_safety_checkresult \G *************************** 1. row *************************** Id: 22055 Name: etl_ods_safety_checkresult CreateTime: 2021-11-20 15:32:18 PauseTime: NULL EndTime: NULL DbName: default_cluster:mongotest TableName: ods_safety_checkresult State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 1 JobProperties: {"partitions":"*","columnToColumnExpr":"_id,userid,mid,tag,check_name,check_type,check_data,createat","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"104857600","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"1","maxErrorNum":"1000","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"200000"} DataSourceProperties: {"topic":"checkresult","currentKafkaPartitions":"0","brokerList":"192.168.3.121:9092"} CustomProperties: {"group.id":"etl_ods_safety_checkresult_d1504eb7-6dc7-4f5b-801e-09ade020ee0b"} Statistic: {"receivedBytes":1313,"errorRows":0,"committedTaskNum":1,"loadedRows":5,"loadRowsRate":0,"abortedTaskNum":2,"errorRowsAfterResumed":0,"totalRows":5,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":5015} Progress: {"0":"4"} ReasonOfStateChanged: ErrorLogUrls: OtherMsg: 1 row in set (0.00 sec) mysql> select * from ods_safety_checkresult; +--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+ | _id | userid | mid | tag | check_name | check_type | check_data | createat | +--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+ | 7159a93265b6c375acfa2790 | 112480487 | 14029647849143177344 | ["mnt_seven"] | 14029647849143177303_112480480_7 | 1 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg | 1633265977 | | 9059a93265b6c375acfa2790 | 112480499 | 14029647849143177309 | ["mnt_nigh"] | 14029647849143177303_112480480_9 | 3 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg | 1633265979 | | 8159a93265b6c375acfa2710 | 112480488 | 14029647849143177303 | ["mnt_eight"] | 14029647849143177303_112480480_8 | 2 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg | 1633265978 | +--------------------------+-----------+----------------------+---------------+----------------------------------+------------+--------------------------------------------------------------+------------+ 3 rows in set (0.00 sec) // 插入数据,更新数据测试 mysql> select * from ods_safety_checkresult; +--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+ | _id | userid | mid | tag | check_name | check_type | check_data | createat | +--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+ | 7159a93265b6c375acfa2790 | 112480487 | 14029647849143177344 | ["mnt_seven"] | 14029647849143177303_112480480_7 | 1 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92aa.jpg | 1633265977 | | 1059a93265b6c375acfa2790 | 112480489 | 14029647849143177309 | ["mnt_nigh"] | 14029647849143177303_112480480_10 | 4 | http://sz.aliyuncs.com/db/9591ec01a358b29efdafd9634837e92cc.jpg | 1633268979 | | 9059a93265b6c375acfa2790 | 112480499 | 14029647849143177399 | ["mnt_nigh"] | 14029647849143177303_112480480_9 | 3 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92cc.jpg | 1633265979 | | 8159a93265b6c375acfa2710 | 112480488 | 14029647849143177303 | ["mnt_eight"] | 14029647849143177303_112480480_8 | 2 | http://sz.aliyuncs.com/db/9591ec01a358b29eb79634837e92bb.jpg | 1633265978 | +--------------------------+-----------+----------------------+---------------+-----------------------------------+------------+-----------------------------------------------------------------+------------+ 4 rows in set (0.01 sec) // 删除呢? unique 为replace 的操作不能支持!!!
总结
测验结果证明,利用 mongodb-cdc 能够通过flink 准实时的把数据传输到doris表,但是delete 不能处理。如果sink 端用支持upsert的数据库如 ES,KUDU,MySQL是能支持完整的upsert及delete操作的
参考
MongoDB CDC Connector
Upsert Kafka SQL Connector
Doris Routine Load