package pers.aishuang.flink.streaming.task;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;
/**
主要完成从Kafka集群读取车辆的json数据并将其转换成ItcastDataObj,并将其
通过errorData过滤出来正常的数据或者错误的数据,将正确的数据保存到HDFS上
和HBase上,将错误的数据保存到HDFS上
*/
public class KafkaSourceDataTask extends BaseTask {
public static void main(String[] args) throws Exception{
//1. 获取当前流执行环境-env
StreamExecutionEnvironment env = getEnv(KafkaSourceDataTask.class.getSimpleName());
//2. 获取Kafka中的车辆数据json字符串 DataStreamSource<String> source = getKafkaStream( env, "__vehicle_consumer_", SimpleStringSchema.class ); //-- 打印输出 source.printToErr(); //3. 将读取出来的json字符串转换为ItcastDataObj SingleOutputStreamOperator<ItcastDataObj> vehicleDataStream = source.map( new MapFunction<String, ItcastDataObj>() { @Override public ItcastDataObj map(String line) throws Exception { return JsonParseUtil.parseJsonToObject(line); } } ); //-- 另种写法 DataStream<ItcastDataObj> vehicleDataStream02 = source.map(JsonParseUtil::parseJsonToObject); //vehicleDataStream.printToErr(); vehicleDataStream02.printToErr(); //触发执行 env.execute();
}
}