1、ETL简介
ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程),对于开发或者运维人员来说,我们经常会遇到各种数据的处理,转换,迁移,所以了解并掌握一种ETL工具的使用,必不可少,这里我们要学习的ETL工具就是Kettle!
2、Kettle简介
Kettle是一款国外开源的ETL工具,纯Java编写,可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。Kettle这个ETL工具集,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
Kettle(现在已经更名为PDI,Pentaho Data Integration-Pentaho数据集成)。
引入依赖包(此处用的7.1.0.0)
<!-- kettle --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-vfs2</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>27.0.1-jre</version> </dependency> <dependency> <groupId>org.scannotation</groupId> <artifactId>scannotation</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.kettle</groupId> <artifactId>pentaho-vfs-browser</artifactId> <version>7.1.0.0</version> </dependency> <dependency> <groupId>com.kettle</groupId> <artifactId>kettle-engine</artifactId> <version>7.1.0.0</version> </dependency> <dependency> <groupId>com.kettle</groupId> <artifactId>kettle-core</artifactId> <version>7.1.0.0</version> </dependency> <dependency> <groupId>com.kettle</groupId> <artifactId>metastore</artifactId> <version>7.1.0.0</version> </dependency> <!-- kettle -->
1、ktr(转换)
/** * kettle脚本执行 - 转换 * @param initKettleParam kettle脚本入参 * @param crontabMissionLog 输出日志 * @param ktrFilePath 脚本路径(/User/xxx/xxx.ktr) * @return true/flase */ public static boolean runKettleTransfer(Map<String, String> initKettleParam,CrontabMissionLog crontabMissionLog, String ktrFilePath) { Trans trans; String uuid = UUID.randomUUID().toString(); logger_info.info("ExecKettleUtil@runKettleTransfer:" + uuid + " {ktrFilePath:" + ktrFilePath + "}"); try { // 初始化 KettleEnvironment.init(); //EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta(ktrFilePath); // 转换 trans = new Trans(transMeta); // 初始化trans参数,脚本中获取参数值:${variableName} if (initKettleParam != null) { for (String variableName : initKettleParam.keySet()) { trans.setVariable(variableName, initKettleParam.get(variableName)); //trans.setParameterValue(variableName, initKettleParam.get(variableName)); //transMeta.setParameterValue(variableName, initKettleParam.get(variableName)); } } //监听kettle执行日志 KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() { boolean flag = true; @Override public void eventAdded(KettleLoggingEvent logs) { crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+logs.getMessage().toString()); } }); // 执行转换 trans.execute(null); // 等待转换执行结束 trans.waitUntilFinished(); if (trans.getErrors() > 0) { crontabMissionLog.setExecStatus(ExecStatus.执行失败.getValue()); } else { crontabMissionLog.setExecStatus(ExecStatus.执行成功.getValue()); crontabMissionLog.setSuccess(true); } return true; } catch (Exception e) { crontabMissionLog.setExecStatus(ExecStatus.执行失败.getValue()); return false; } }
2、kjb(任务)
/** * kettle脚本执行 - 作业 * @param initKettleParam kettle脚本入参 * @param crontabMissionLog 输出日志 * @param ktrFilePath 脚本路径(/User/xxx/xxx.kjb) */ public static void runKettlJob(Map<String, String> initKettleParam, CrontabMissionLog crontabMissionLog, String ktrFilePath) { try { KettleEnvironment.init(); // jobname 是Job脚本的路径及名称 JobMeta jobMeta = new JobMeta(ktrFilePath, null); Job job = new Job(null, jobMeta); // 向Job 脚本传递参数,脚本中获取参数值:${参数名} if (initKettleParam != null) { for (String variableName : initKettleParam.keySet()) { job.setVariable(variableName, initKettleParam.get(variableName)); } } //监听kettle执行日志 KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() { boolean flag = true; @Override public void eventAdded(KettleLoggingEvent logs) { crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+logs.getMessage().toString()); } }); job.start(); job.waitUntilFinished(); if (job.getErrors() > 0) { throw new Exception( "There are errors during job exception!(执行job发生异常)"); } } catch (Exception e) { e.printStackTrace(); } }
3、调用
/** * kettle脚本执行 (.ktr;.kjb) * @param crontabMissionLog 日志 * @param crontabMission 调度实体类 */ public static void runKettleScript(CrontabMissionLog crontabMissionLog, CrontabMission crontabMission,String businessDate) { crontabMissionLog.setExecBeginTime(DateEnum.yyyyMMddHHmmssSSS.now()); String ktrFile =crontabMission.getKettleScript(); Map<String,String> initKettleParam = new HashMap<>(); initKettleParam.put("date",businessDate); // String[] params = new String[0]; // if(StringUtils.hasText(crontabMission.getKettleScriptParam())){ // params = crontabMission.getKettleScriptParam().split(";"); // for(String param : params){ // String[] strings = param.split(":"); // initKettleParam.put(strings[0],strings[1]); // } // } crontabMissionLog.setExecLog(DateEnum.yyyyMMddHHmmssSSS.now()+"-kettle脚本开始执行:\n脚本执行结果:"); String ktrFilePath = fileLocal + "/kettle/"+ crontabMission.getSid() + File.separator + ktrFile; if (ktrFile.endsWith(".ktr")){ runKettleTransfer(initKettleParam,crontabMissionLog,ktrFilePath); }else if (ktrFile.endsWith(".kjb")){ runKettlJob(initKettleParam,crontabMissionLog,ktrFilePath); }else { crontabMissionLog.setExecLog("无法执行非kettle脚本格式文件"); } crontabMissionLog.setExecLog(crontabMissionLog.getExecLog()+"\n"+DateEnum.yyyyMMddHHmmssSSS.now()+"-kettle脚本完成;"); crontabMissionLog.setExecEndTime(DateEnum.yyyyMMddHHmmssSSS.now()); }