进入目录编写json
cd /usr/local/datax/job vi zabbixmysql2mysql.json
写入的表结构要和reader的表结构一样,先建立好
编写json文件
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "test", "password": "123", "column": [ "itemid", "clock", "timestamp", "source", "severity", "value", "logeventid", "ns" ], "splitPk": "itemid", "connection": [ { "table": [ "history_log" ], "jdbcUrl": [ "jdbc:mysql://172.16.3.89:3306/zabbix" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "insert", "username": "test", "password": "123", "column": [ "itemid", "clock", "timestamp", "source", "severity", "value", "logeventid", "ns" ], "preSql": [ "truncate history_log_copy1" ], "connection": [ { "jdbcUrl": "jdbc:mysql://172.16.3.89:3306/chenzhenhua2?useUnicode=true&characterEncoding=utf8", "table": [ "history_log_copy1" ] } ] } } } ], "setting": { "speed": { "channel": 6 } } } }
注意:“writeMode”: “insert”,也可以为update,update更加稳妥一点
“preSql”: [ “truncate history_log_copy1” ], 在写入前提前清空表清空表
如果写入的数据库为mysql8以上版本,必须修改mysql-connector-java的插件
cd /usr/local/datax/plugin/writer/mysqlwriter/libs mv mysql-connector-java-5.1.34.jar mysql-connector-java-5.1.34.jar-bak
我这边上传的为mysql-connector-java-8.0.16.jar,下载地址https://static.runoob.com/download/mysql-connector-java-8.0.16.jar
Datax需要解决的另一个难题在于增量更新。
首先需要说明, Datax本身在大部分reader插件中提供了where配置项,用于做增量更新。例如mysqlerader md文件说明如下:
* **where** * 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。<br /> where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。 * 必选:否 <br /> * 默认值:无 <br /> * **querySql** * 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id <br /> `当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置`,querySql优先级大于table、column、where选项。 * 必选:否 <br /> * 默认值:无 <br />
示例:
新建json
vi new.json
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123", "where": "created_at > FROM_UNIXTIME(${create_time}) and created_at < FROM_UNIXTIME(${end_time})", "column": [ "id", "rpt_date", "rpt_hour", "unit_id", "build_id", "num", "run_state", "created_at" ], "splitPk": "id", "connection": [ { "table": [ "rpt_warning_hour" ], "jdbcUrl": [ "jdbc:mysql://172.16.5.11:3306/smart_fire" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "update", "username": "test", "password": "123", "column": [ "id", "rpt_date", "rpt_hour", "unit_id", "build_id", "num", "run_state", "created_at" ], "connection": [ { "jdbcUrl": "jdbc:mysql://172.16.3.89:3306/chenzhenhua2?useUnicode=true&characterEncoding=utf8", "table": [ "rpt_warning_hour" ] } ] } } } ], "setting": { "speed": { "channel": 6 } } } }
上面需要注意的事情为FROM_UNIXTIME将表里面的时间格式转换为时间戳格式,如果表里默认为时间戳不需要转换。
${…}就是将变量传入,上次更新{create_time}上次更新时间,{end_time}为现在本地时间。
然后再编写一个python脚本可以将参数传入json即可,vi dataxScheduler.py
import time,os,sys print "going to execute" configFilePath = sys.argv[1] logFilePath = sys.argv[2] lastTimeExecuteRecord = sys.argv[3] lastExecuteTime="" try: fo = open(lastTimeExecuteRecord, "r") lastExecuteTime = fo.read() print lastExecuteTime except IOError: lastExecuteTime = int(1) lastExecuteTime = int(lastExecuteTime) print("last time execute time: " + str(lastExecuteTime)) currentTime = int(time.time()) print("currentTime is :"+ str(currentTime)) #os.system("python /usr/local/datax/bin/datax.py " + configFilePath + " --lastTime" + lastExecuteTime + " --currentTime" + currentTime + " >> " + logFilePath) script2execute = "python /usr/local/datax/bin/datax.py %s -p \"-Dcreate_time=%s -Dend_time=%s\" >> %s"%(configFilePath,lastExecuteTime,currentTime,logFilePath) print("to be excute script:"+script2execute) os.system(script2execute) print("script execute ending") # update timestamp to file fo = open(lastTimeExecuteRecord, "w+") fo.write(str(currentTime)) fo.close() print("ending---",lastTimeExecuteRecord)
运行
python /usr/local/datax/job/dataxScheduler.py '/usr/local/datax/job/new.json' '/usr/local/datax/job/test_job.log' '/usr/local/datax/job/test_job.record'
测试,增加数据后再次运行,数据对应增加了,加入到定时任务执行即可完成增量同步。
但这个写脚本的方式还是非常笨拙的,下一篇介绍的datax-web会更好的去解决增量同步的问题。