1 scala的基本依赖设置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ssjt</groupId> <artifactId>ods_flink</artifactId> <version>1.0-SNAPSHOT</version> <properties> <scala.version>2.11</scala.version> <flink.version>1.11.1</flink.version> <hbase.version>2.11</hbase.version> <scope.type>provided</scope.type> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase_${hbase.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-metrics-prometheus_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <finalName>ods_flink</finalName> <sourceDirectory>src/main/scala</sourceDirectory> <resources> <resource> <directory>src/main/resource</directory> <includes> <!--包含文件夹以及子文件夹下所有资源--> <include>**/*.*</include> </includes> </resource> </resources> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}.8</scalaVersion> <args> <arg>-target:jvm-1.8</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!--如果要打包的话,这里要换成对应的 main class--> <mainClass>com.ssjt</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> <filters> <filter> <artifact>*:*:*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
1 写出到文件系统
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object FileTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60 * 1000) // 必须设定 val tableEnv = StreamTableEnvironment.create(env) // 连接数据源 val CreateSourceTable = """ |CREATE TABLE sourceTable ( | `iid` STRING COMMENT '', | `local_time` AS LOCALTIMESTAMP | ) | WITH ( | 'connector' = 'datagen', | 'rows-per-second'='10000', | 'fields.iid.length'='5', | 'timeZone'='Asia/Shanghai' |) |""".stripMargin tableEnv.executeSql(CreateSourceTable) // 连接文件系统 val CreateFileTable = """ |CREATE TABLE test_fs_table_2 ( | iid STRING, | local_time TIMESTAMP, | dt STRING, | dh STRING, | dm STRING |) PARTITIONED BY (dt,dh,dm) WITH ( | 'connector'='filesystem', | 'path'='D:\company\project\ods_flink\data\f_test', | 'format'='json', | 'sink.rolling-policy.file-size' = '1MB', | 'sink.partition-commit.delay'='1 h', | 'timeZone'='Asia/Shanghai', | 'sink.partition-commit.policy.kind'='success-file' |) |""".stripMargin tableEnv.executeSql(CreateFileTable) // 数据写出到文件系统 val sql = """ |INSERT INTO test_fs_table_2 |SELECT | iid, | local_time, | DATE_FORMAT(local_time,'yyyyMMdd'), | DATE_FORMAT(local_time,'HH'), | DATE_FORMAT(local_time,'mm') |FROM sourceTable |""".stripMargin tableEnv.executeSql(sql) } }