文章目录
kafka数据定时导入到hive,后续做数据清洗:
flume,confulent都需要单独部署服务,比较繁琐。调查其他可选方案,参考以下文章:参考资料
综合比较,camus 简单,比较方便接入。主要分两步:
1、采用mapreduce过程处理数据从kafka导入hadoop
2、hadoop数据接入hive管理。
1、下载源码,本地构建jar包。
参考文章
camus源码
2、查看camus.properties配置文件,支持的功能选项
期间需要自定义input,output encoder,
需要配置Reader,Writer类,具体参考源码实现。
3、修改camus.properties配置项,最终结果如下:
camus.job.name=Camus-Job-Test etl.destination.path=/tmp/escheduler/root/resources/topics etl.execution.base.path=/tmp/escheduler/root/resources/camus/exec etl.execution.history.path=/tmp/escheduler/root/resources/camus/exec/history # 新增的自定义decoder camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder # 修改写入hadoop的writer etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider kafka.client.name=camus # broker kafka.brokers=.... # topic kafka.whitelist.topics=topic_exec_servicecheck_prod_calConfigId_177 log4j.configuration=false # 禁用压缩,deflate,snappy mapred.output.compress=false etl.output.codec=deflate etl.deflate.level=6 etl.default.timezone=Asia/Shanghai
4、上传jar,properties文件,执行如下命令:实现kafka数据到hadoop的功能:
cd /home/app/transform/libs/ hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties
数据导入到hadoop,
5、数据从hadoop到hive,执行如下脚本:
date_string=$(date '+%Y/%m/%d/%H') partion=$(date '+%Y-%m-%d_%H') topic='topic_exec_servicecheck_prod_calConfigId_177' table_name='dwd.test_exec_servicecheck' filePath="/tmp/escheduler/root/resources/topics/$topic/hourly/"$date_string"/" hive<<EOF create table if not exists $table_name( date TIMESTAMP, node STRING, status STRING ) PARTITIONED BY(dt STRING) row format delimited fields terminated by '|' STORED AS TEXTFILE; load data inpath '$filePath' into table $table_name partition (dt='$partion'); EOF
6、配置定时调度,按小时执行。
附自定义decoder
public class StringMessageDecoder extends MessageDecoder<Message, String> { private static final org.apache.log4j.Logger log = Logger.getLogger(JsonStringMessageDecoder.class); @Override public CamusWrapper<String> decode(Message message) { //log.info(message.getTopic()); return new CamusWrapper<String>(new String(message.getPayload())); } }
hive input/output 支持自定义数据格式,这个是很有意义的,通常来说文本文件,分隔符分割一行,纯文本解析,最简单,但是可读性,可维护性差。
支持json格式数据写入,json处理相关jar文件 放到${HIVE_HOME}/lib目录。