C/C++教程

Flink流处理-Task之KafkaSourceDataTask

本文主要是介绍Flink流处理-Task之KafkaSourceDataTask,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

KafkaSourceDataTask

package pers.aishuang.flink.streaming.task;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;

/**

  • 主要完成从Kafka集群读取车辆的json数据并将其转换成ItcastDataObj,并将其

  • 通过errorData过滤出来正常的数据或者错误的数据,将正确的数据保存到HDFS上

  • 和HBase上,将错误的数据保存到HDFS上
    */
    public class KafkaSourceDataTask extends BaseTask {
    public static void main(String[] args) throws Exception{
    //1. 获取当前流执行环境-env
    StreamExecutionEnvironment env = getEnv(KafkaSourceDataTask.class.getSimpleName());

     //2. 获取Kafka中的车辆数据json字符串
     DataStreamSource<String> source = getKafkaStream(
             env,
             "__vehicle_consumer_",
             SimpleStringSchema.class
     );
     //-- 打印输出
     source.printToErr();
    
     //3. 将读取出来的json字符串转换为ItcastDataObj
     SingleOutputStreamOperator<ItcastDataObj> vehicleDataStream = source.map(
             new MapFunction<String, ItcastDataObj>() {
                 @Override
                 public ItcastDataObj map(String line) throws Exception {
                     return JsonParseUtil.parseJsonToObject(line);
                 }
             }
     );
     //-- 另种写法
     DataStream<ItcastDataObj> vehicleDataStream02 = source.map(JsonParseUtil::parseJsonToObject);
     //vehicleDataStream.printToErr();
     vehicleDataStream02.printToErr();
    
     //触发执行
     env.execute();
    

    }
    }


                    
这篇关于Flink流处理-Task之KafkaSourceDataTask的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!