云计算

2021年大数据Flink(三十七):???????Table与SQL ??????案例四

本文主要是介绍2021年大数据Flink(三十七):???????Table与SQL ??????案例四,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录

案例四

需求

​​​​​​​代码实现


​​​​​​​案例四

需求

从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka


/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka


/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka


/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning

 

​​​​​​​代码实现

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html

 

package cn.itcast.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author itcast
 * Desc
 */
public class FlinkSQL_Table_Demo06 {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        TableResult inputTable = tEnv.executeSql(
                "CREATE TABLE input_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'input_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'properties.group.id' = 'testGroup',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json'\n" +
                        ")"
        );
        TableResult outputTable = tEnv.executeSql(
                "CREATE TABLE output_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'output_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'format' = 'json',\n" +
                        "  'sink.partitioner' = 'round-robin'\n" +
                        ")"
        );

        String sql = "select " +
                "user_id," +
                "page_id," +
                "status " +
                "from input_kafka " +
                "where status = 'success'";

        Table ResultTable = tEnv.sqlQuery(sql);

        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();

        tEnv.executeSql("insert into output_kafka select * from "+ResultTable);


        //7.excute
        env.execute();
    }


}

 

这篇关于2021年大数据Flink(三十七):???????Table与SQL ??????案例四的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!