Redis教程

Flink-Sink(Kafka、Redis、ES、JDBC)

本文主要是介绍Flink-Sink(Kafka、Redis、ES、JDBC),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。 stream.addSink(new MySink(xxxx)) 官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。  

5.0 File

package com.zhen.flink.api.sink

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._


/**
  * @Author FengZhen
  * @Date 6/8/22 10:43 PM
  * @Description TODO
  */
object FileSink {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )

    dataStream.print()
    val outFilePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor_out.txt"
    dataStream.writeAsCsv(outFilePath)

    val outFilePath1 = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor_out_1.txt"
    dataStream.addSink(
      StreamingFileSink.forRowFormat(
        new Path(outFilePath1),
        new SimpleStringEncoder[SensorReading]()
      ).build()
    )

    env.execute("file sink.")
  }

}

5.1 Kafka

package com.zhen.flink.api.sink

import java.util.Properties

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

/**
  * @Author FengZhen
  * @Date 6/11/22 3:20 PM
  * @Description TODO
  */
object KafkaSink {

  def main(args: Array[String]): Unit = {


    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)


    //从kafka读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val streamKafka = env.addSource( new FlinkKafkaConsumer[String](
      "topic_sensor",
      new SimpleStringSchema(),
      properties
    ))

    // 1.先转换成样例数据
    val dataStream: DataStream[String] = streamKafka
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
        }
      )

    dataStream.addSink(
      new FlinkKafkaProducer[String]("localhost:9092", "topic_flink_kafka_sink", new SimpleStringSchema())
    )

    //./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_sensor

    // ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_flink_kafka_sink

    env.execute("kafka sink.")


  }

}
 

5.2 Redis

package com.zhen.flink.api.sink

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
  * @Author FengZhen
  * @Date 6/12/22 8:23 PM
  * @Description TODO
  */
object RedisSink {


  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )

    // 定义一个FlinkJedisConfigBase
    val conf = new FlinkJedisPoolConfig.Builder()
        .setHost("localhost")
        .setPort(6379)
        .setDatabase(1)
        .build()

    dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper))

    env.execute("redis sink.")

  }

  // 定义一个redis mapper
  class MyRedisMapper extends RedisMapper[SensorReading]{

    // 定义保存数据写入Redis的命令,HSET 表名 key value
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
    }


    // 将ID指定位可以
    override def getKeyFromData(t: SensorReading): String =
      t.id

    // 将温度指定为value
    override def getValueFromData(t: SensorReading): String =
      t.temperature.toString
  }

}
 

5.3 Elasticsearch

package com.zhen.flink.api.sink

import java.util

import com.zhen.flink.api.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkBase, ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

/**
  * @Author FengZhen
  * @Date 6/17/22 3:39 PM
  * @Description TODO
  */
object ElasticsearchSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )


    // 定义HttpHosts
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))

    // 自定义写入ES的EsSinkFunction
    val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
      override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {

        // 包装一个map作为DataSource
        val dataSource = new util.HashMap[String, String]()
        dataSource.put("id", element.id)
        dataSource.put("temperature", element.temperature.toString)
        dataSource.put("ts", element.timestamp.toString)

        // 创建index request,用于发送http请求
        val indexRequest = Requests.indexRequest()
          .index("sensor")
          .`type`("reading_data")
          .source(dataSource)

        // 用indexer发送请求
        indexer.add(indexRequest)

      }
    }

    dataStream.addSink(
      new ElasticsearchSink.Builder[SensorReading](httpHosts, myEsSinkFunc)
        .build()
    )
    env.execute("elasticsearch sink.")
  }
}
 

5.4 JDBC自定义sink

package com.zhen.flink.api.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.zhen.flink.api.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._


/**
  * @Author FengZhen
  * @Date 7/1/22 2:21 PM
  * @Description TODO
  */
object JdbcSink {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 0.读取数据
    val filePath = "/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/sensor.txt"
    val inputStream = env.readTextFile(filePath)

    // 1.先转换成样例数据
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val arr = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }
      )

    dataStream.addSink(new MyJdbcSinkFunc())

    env.execute("jdbc sink")

  }

  class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{

    // 定义连接、预编译语句
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    var updateStmt: PreparedStatement = _


    override def open(parameters: Configuration): Unit = {
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "1234qwer")
      insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?,?)")
      updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
    }

    override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {

      // 先执行更新操作,查到就更新
      updateStmt.setDouble(1, value.temperature)
      updateStmt.setString(2, value.id)
      updateStmt.execute()

      //如果更新没有查到数据,那么就插入
      if(updateStmt.getUpdateCount == 0){
        insertStmt.setString(1, value.id)
        insertStmt.setDouble(2, value.temperature)
        insertStmt.execute()
      }
    }

    override def close(): Unit = {
      insertStmt.close()
      updateStmt.close()
      conn.close()
    }
  }

}

 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhen.flink</groupId>
    <artifactId>flink_learn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>flink_learn Maven</name>


    <properties>
        <scala_version>2.12</scala_version>
        <flink_version>1.13.1</flink_version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala_version}</artifactId>
            <version>${flink_version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>

    </dependencies>

    <build>
        <plugins> <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution> <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

 

 

这篇关于Flink-Sink(Kafka、Redis、ES、JDBC)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!