datax源码解析-任务拆分机制详解
此次源码分析的版本是3.0。因为插件是datax重要的组成部分,源码分析过程中会涉及到插件部分的源码,为了保持一致性,插件都已大部分人比较熟悉的mysql为例子说明。
本文我们来看看datax的任务拆分机制。
先来看一幅图,
主要是要通过这幅图,理解datax中关于job和task的关系以及概念。
我们这篇文章其实就是关注的第二个步骤,拆分task。
任务拆分的入口函数是com.alibaba.datax.core.job.JobContainer#split
,我们来一点点分析这个方法。
//计算needChannelNumber this.adjustChannelNumber(); if (this.needChannelNumber <= 0) { this.needChannelNumber = 1; } //切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个 List<Configuration> readerTaskConfigs = this .doReaderSplit(this.needChannelNumber); ...
首先是计算needChannelNumber
这个变量,这个变量是后面执行具体拆分成task的依据。adjustChannelNumber
方法如下:
private void adjustChannelNumber() { int needChannelNumberByByte = Integer.MAX_VALUE; int needChannelNumberByRecord = Integer.MAX_VALUE; //是否指定了字节限速,来自任务配置文件 boolean isByteLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0); if (isByteLimit) { //全局的限速字节数 long globalLimitedByteSpeed = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024); // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错! Long channelLimitedByteSpeed = this.configuration .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE); if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) { throw DataXException.asDataXException( FrameworkErrorCode.CONFIG_ERROR, "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数"); } //计算channel的数量 needChannelNumberByByte = (int) (globalLimitedByteSpeed / channelLimitedByteSpeed); needChannelNumberByByte = needChannelNumberByByte > 0 ? needChannelNumberByByte : 1; LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes."); } //是否指定了记录数量限流 boolean isRecordLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0; if (isRecordLimit) { long globalLimitedRecordSpeed = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000); Long channelLimitedRecordSpeed = this.configuration.getLong( CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD); if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) { throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数"); } needChannelNumberByRecord = (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed); needChannelNumberByRecord = needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1; LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records."); } // 取较小值 this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ? needChannelNumberByByte : needChannelNumberByRecord; // 如果从byte或record上设置了needChannelNumber则退出 if (this.needChannelNumber < Integer.MAX_VALUE) { return; } //是否直接指定了channel数量 boolean isChannelLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0); if (isChannelLimit) { this.needChannelNumber = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL); LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels."); return; } throw DataXException.asDataXException( FrameworkErrorCode.CONFIG_ERROR, "Job运行速度必须设置"); }
注释写得比较详细了,总结下该方法的逻辑是,如果指定字节数限流,则据此计算并发数目A。如果指定记录数限流,则据此计算一个并发数目B。再取A和B两者中最小值作为needChannelNumber变量的值。如果两者限流都没指定,则看是否配置文件指定了channel并发数目。配置的示例是这样的:
{ "core": { "transport" : { "channel": { "speed": { "record": 100, "byte": 100 } } } }, "job": { "setting": { "speed": { "record": 5000, "byte": 10000, "channel" : 1 } } } }
或者直接指定了channel数量:
"job": { "setting":{ "speed":{ "channel":"2" } } }
继续看split代码,
//切分读插件,返回包含各个切分后的读插件配置列表,后续一个服务使用一个 List<Configuration> readerTaskConfigs = this .doReaderSplit(this.needChannelNumber); //拆分的任务数量 int taskNumber = readerTaskConfigs.size(); //先拆reader,再拆writer List<Configuration> writerTaskConfigs = this .doWriterSplit(taskNumber); ...
这里似乎有点奇怪,为啥reader拆分传入的是needChannelNumber
,而writer拆分入参是taskNumber
。这是因为datax的执行逻辑就是,必须先切分Reader,然后Writer是根据Reader切分后的数目进行切分的。这个仔细想想也可以理解,毕竟传输的源头是reader,根据reader进行分工是自然的。
深入到doReaderSplit
方法继续看,
private List<Configuration> doReaderSplit(int adviceNumber) { classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader( PluginType.READER, this.readerPluginName)); //内部还是调用插件的split List<Configuration> readerSlicesConfigs = this.jobReader.split(adviceNumber); if (readerSlicesConfigs == null || readerSlicesConfigs.size() <= 0) { throw DataXException.asDataXException( FrameworkErrorCode.PLUGIN_SPLIT_ERROR, "reader切分的task数目不能小于等于0"); } LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.", this.readerPluginName, readerSlicesConfigs.size()); classLoaderSwapper.restoreCurrentThreadClassLoader(); return readerSlicesConfigs; }
没啥东西,因为委托给了插件自己的split方法进行拆分,这里以mysql为例,最终调用的是com.alibaba.datax.plugin.rdbms.reader.util.ReaderSplitUtil#doSplit
方法,来看下,
public static List<Configuration> doSplit( Configuration originalSliceConfig, int adviceNumber) { //默认isTableMode是true boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue(); int eachTableShouldSplittedNumber = -1; if (isTableMode) { // adviceNumber这里是channel数量大小, 即datax并发task数量 // eachTableShouldSplittedNumber是单表应该切分的份数, 向上取整可能和adviceNumber没有比例关系了已经 eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber( adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK)); } //从配置文件获取列信息 String column = originalSliceConfig.getString(Key.COLUMN); //从配置文件获取where设置,如果配置文件没有指定就是空 String where = originalSliceConfig.getString(Key.WHERE, null); //数据库连接信息,这里仅指reader的连接信息 List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class); List<Configuration> splittedConfigs = new ArrayList<Configuration>(); for (int i = 0, len = conns.size(); i < len; i++) { Configuration sliceConfig = originalSliceConfig.clone(); Configuration connConf = Configuration.from(conns.get(i).toString()); String jdbcUrl = connConf.getString(Key.JDBC_URL); sliceConfig.set(Key.JDBC_URL, jdbcUrl); // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作 sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl)); sliceConfig.remove(Constant.CONN_MARK); Configuration tempSlice; // 说明是配置的 table 方式 if (isTableMode) { // 已在之前进行了扩展和`处理,可以直接使用 List<String> tables = connConf.getList(Key.TABLE, String.class); Validate.isTrue(null != tables && !tables.isEmpty(), "您读取数据库表配置错误."); //要不要根据主键进一步拆分,如果配置文件没有指定就不需要拆分 String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null); //最终切分份数不一定等于 eachTableShouldSplittedNumber boolean needSplitTable = eachTableShouldSplittedNumber > 1 && StringUtils.isNotBlank(splitPk); //是否需要对单表进行拆分 //当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分 if (needSplitTable) { if (tables.size() == 1) { //原来:如果是单表的,主键切分num=num*2+1 // splitPk is null这类的情况的数据量本身就比真实数据量少很多, 和channel大小比率关系时,不建议考虑 //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾 //考虑其他比率数字?(splitPk is null, 忽略此长尾) //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; //为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数 // 最终task数为(channel/tableNum)向上取整*splitFactor Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor; } // 尝试对每个表,切分为eachTableShouldSplittedNumber 份 for (String table : tables) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.TABLE, table); List<Configuration> splittedSlices = SingleTableSplitUtil .splitSingleTable(tempSlice, eachTableShouldSplittedNumber); splittedConfigs.addAll(splittedSlices); } } else { for (String table : tables) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.TABLE, table); String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column); //sql的示例:select col1,col2,col3 from table1 tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); splittedConfigs.add(tempSlice); } } } else { // 说明是配置的 querySql 方式 List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class); // TODO 是否check 配置为多条语句?? for (String querySql : sqls) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.QUERY_SQL, querySql); splittedConfigs.add(tempSlice); } } } return splittedConfigs; }
这个方法比较长,我加了比较详细的注释。其实就是先判断是否需要进行单表切分,当满足并发数要求较高,并且配置了splitPk(表分割的主键)参数时,则要求进行单表拆分,拆分个数前面已经经过计算得出,如果不需要就是几张表开启几个并发。拆分之后会返回一个Configuration的List,每个Configuration代表原先总配置文件中需要同步的数据的一部分。并加入到总配置文件存储,为后续调用提供配置的支持。
然后继续看writer的拆分方法,最终调用的是com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#doSplit
方法,来看下,
public static List<Configuration> doSplit(Configuration simplifiedConf, int adviceNumber) { List<Configuration> splitResultConfigs = new ArrayList<Configuration>(); int tableNumber = simplifiedConf.getInt(Constant.TABLE_NUMBER_MARK); //处理单表的情况 if (tableNumber == 1) { //由于在之前的 master prepare 中已经把 table,jdbcUrl 提取出来,所以这里处理十分简单 for (int j = 0; j < adviceNumber; j++) { splitResultConfigs.add(simplifiedConf.clone()); } return splitResultConfigs; } ...
其中adviceNumber
传入的是根据reader切分的任务数,simplifiedConf是从配置文件获取的writer相关的配置。为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则会报错,
if (tableNumber != adviceNumber) { throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, String.format("您的配置文件中的列配置信息有误. 您要写入的目的端的表个数是:%s , 但是根据系统建议需要切分的份数是:%s. 请检查您的配置并作出修改.", tableNumber, adviceNumber)); }
拆分完reader和writer之后,接下来有一行代码:
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
这个是做什么的呢?我举个例子,我们定义任务配置的时候可以指定转换的规则,比如:
{ "job": { "setting": { "speed": { "channel": 2 }, "errorLimit": { "record": 10000, "percentage": 1 } }, "content": [ { // 字段转换部分 "transformer": [ { // 使用字段截取转换 "name": "dx_substr", "parameter": { // 操作读取出来的record的第一列 "columnIndex": 0, // 意思是截取第0到4个字符 "paras": ["0","4"] } } ], ...
如下图所示,在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
最后是合并配置,方法是mergeReaderAndWriterTaskConfigs
,
private List<Configuration> mergeReaderAndWriterTaskConfigs( List<Configuration> readerTasksConfigs, List<Configuration> writerTasksConfigs, List<Configuration> transformerConfigs) { //reader和writer切分的数量要相等 if (readerTasksConfigs.size() != writerTasksConfigs.size()) { throw DataXException.asDataXException( FrameworkErrorCode.PLUGIN_SPLIT_ERROR, String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].", readerTasksConfigs.size(), writerTasksConfigs.size()) ); } List<Configuration> contentConfigs = new ArrayList<Configuration>(); for (int i = 0; i < readerTasksConfigs.size(); i++) { Configuration taskConfig = Configuration.newDefault(); //reader相关的配置 taskConfig.set(CoreConstant.JOB_READER_NAME, this.readerPluginName); taskConfig.set(CoreConstant.JOB_READER_PARAMETER, readerTasksConfigs.get(i)); //writer相关的配置 taskConfig.set(CoreConstant.JOB_WRITER_NAME, this.writerPluginName); taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER, writerTasksConfigs.get(i)); //transform相关的配置,可以为空 if(transformerConfigs!=null && transformerConfigs.size()>0){ taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs); } taskConfig.set(CoreConstant.TASK_ID, i); contentConfigs.add(taskConfig); } return contentConfigs; }
这个其实就是把任务整合后输出,输出的配置文件可以在task中使用。
参考: