先说总体步骤:
maven
仓库[上传私服(可选)];pom
文件依赖datax-core
和需要的reader
和writer
datax.home
(或者利用System#setProperty(String)
)和一些需要替换脚本中的变量:脚本中${}
占位符的变量将被系统变量替换。conf
、plugin
等文件放到datax.home目录中。{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}
Engin#main(String[])
或者Engine#entry(String[])
目前官方的使用指南里都是利用python来调用dataX执行任务。而且现有的博客基本上也是利用java来调用python命令Runtime.getRuntime().exec()
来执行。
个人感觉,dataX未提供java集成开发的方法,应该是定位生产系统,运维需要吧?!
我们的业务场景:执行完dataX的job之后,还有一定的业务逻辑,所以希望在java应用里调用dataX执行完job之后,再执行后续逻辑。
笔者简单的看了一下午的DataX的逻辑,完全以使用者的视角分析DataX,必然不能完全了解DataX的整个执行过程。
本文仅分析如果能够在java代码里集成DataX进行开发。
DataX没有将代码上传到maven服务器上,所以需要自己先pull代码到本地,编译,才能在集成开发的使用通过pom引用。有条件的可以上传到自己的私服上。
代码地址
通过pom文件加入datax-core
:
<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-core</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
如果需要对应的reader
和writer
的话,加入到pom文件中,比如需要streamreader和streamwriter:
<dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamreader</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>streamwriter</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
要依赖datax一定要保证有对应的源码或者编译到本机的maven repository或者在对应的私服上有上传相应的编译版本,不然pom文件是找不到依赖的。
为了集成开发,可能需要一口气引用所有的reader和writer,目前所知,就得一个一个写,如果大家有好办法,麻烦告知!
从com.alibaba.datax.core.util.container.CoreConstant
中可以看到,datax.home
很重要,很多文件的读取都是在datax.home
里面获取的。就如我们在安装版的datax中可以看到里面一些目录一样
$ ll total 4 drwxr-xr-x 2 mcbadm mcb 56 Sep 20 18:28 bin drwxr-xr-x 2 mcbadm mcb 65 Sep 20 18:28 conf drwxr-xr-x 2 mcbadm mcb 21 Sep 20 18:28 job drwxr-xr-x 2 mcbadm mcb 4096 Sep 20 18:28 lib drwxr-xr-x 4 mcbadm mcb 32 Sep 20 18:28 plugin drwxr-xr-x 2 mcbadm mcb 22 Sep 20 18:28 script drwxr-xr-x 2 mcbadm mcb 23 Sep 20 18:28 tmp
目前所知的,Engine#entry
在解析配置的时候会读取conf目录下的文件,还有对应plugin/reader/xxxreader、plugin/writer/xxxwriter的plugin.json文件:
{ "name": "streamreader", "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport data from stream.", "warn": "Never use it in your real job." }, "developer": "alibaba" }
编写job代码:
{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 1, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX,现在是${now}" } ] } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } } }
写个测试类吧:
import java.time.LocalTime; import com.alibaba.datax.core.Engine; public class EngineTest { public static void main(String[] args) { System.setProperty("datax.home", getCurrentClasspath()); System.setProperty("now", LocalTime.now().toString());// 替换job中的占位符 String[] datxArgs = {"-job", getCurrentClasspath() + "/job/stream2stream.json", "-mode", "standalone", "-jobid", "-1"}; try { Engine.entry(datxArgs); } catch (Throwable e) { e.printStackTrace(); } } public static String getCurrentClasspath() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); String currentClasspath = classLoader.getResource("").getPath(); // 当前操作系统 String osName = System.getProperty("os.name"); if (osName.startsWith("Windows")) { // 删除path中最前面的/ currentClasspath = currentClasspath.substring(1); } return currentClasspath; } }
datax在解析完配置后,会将core.json,job.json,plugin.json合并在一起:
{ "common": { "column": { "dateFormat": "yyyy-MM-dd", "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "encoding": "utf-8", "extraFormats": [ "yyyyMMdd" ], "timeFormat": "HH:mm:ss", "timeZone": "GMT+8" } }, "core": { "container": { "job": { "id": -1, "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" } }, "dataXServer": { "address": "http://localhost:7001/api", "reportDataxLog": false, "reportPerfLog": false, "timeout": 10000 }, "statistics": { "collector": { "plugin": { "maxDirtyNumber": 10, "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector" } } }, "transport": { "channel": { "byteCapacity": 67108864, "capacity": 512, "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "flowControlInterval": 20, "speed": { "byte": -1, "record": -1 } }, "exchanger": { "bufferSize": 32, "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger" } } }, "entry": { "jvm": "-Xms1G -Xmx1G" }, "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" } ], "sliceRecordCount": 1 } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 1 } } }, "plugin": { "reader": { "streamreader": { "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "mechanism": "use datax framework to transport data from stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamreader", "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\reader\\streamreader" } }, "writer": { "streamwriter": { "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter", "description": { "mechanism": "use datax framework to transport data to stream.", "useScene": "only for developer test.", "warn": "Never use it in your real job." }, "developer": "alibaba", "name": "streamwriter", "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\writer\\streamwriter" } } } }
每个reader和writer都有自己的plugin.json文件,里面最重要的就是class配置了,这个类的全路径配置用于classloader将其加载进来并通过反射将其实例化。加载代码可看com.alibaba.datax.core.util.container.LoadUtil
所以我们在集成的时候,plugin目录下面不需要有jar包了,只需要放json文件就行,因为我们通过pom文件依赖了对应的reader和writer,说白了,就是classpath下面有对应的reader和writer即可。
文章有点长,记录了一个下午的研究结果,应该有很多不完善的地方,希望可以和大家多交流。如果觉得有帮助,可以点个赞。
9人点赞 开发记录