测试服务器CDH 6.3.1版本安装Flink 1.9版本。
hello.txt文件
hello word hello hdfs hello mapreduce hello yarn hello hive hello spark hello flink
Flink依赖的配置
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency>
因为是本地写Java代码,要打包成jar文件,然后上传到服务器后运行,要设置主入口,不然会报错
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass>org.example.wordCount</mainClass> <!-- 此处为主入口--> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin>
其中 org.example.wordCount 需要自己调整
org.example 是包名
wordCount 是类名
如下:
package org.example; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.Collector; /* @author 只是甲 * @date 2021-08-24 * @remark Flink的第一个wordCount程序 */ public class wordCount { public static void main(String[] args) throws Exception{ //创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //从文件中读取数据 String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt"; DataSet<String> inputDataSet = env.readTextFile(inputPath); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) // 按照第一个位置的word分组 .sum(1); // 将第二个位置上的数据求和; resultSet.print(); //env.execute(); //env.execute("Word Count Example"); } //自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //按空格分词 String[] words = value.split(" "); //遍历所有word,包成二元组输出 for (String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
通过mvn package命令打包
C:\Users\Administrator\IdeaProjects\FlinkStudy>mvn package [INFO] Scanning for projects... [WARNING] [WARNING] Some problems were encountered while building the effective model for org.example:FlinkStudy:jar:1.0-SNAPSHOT [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-compiler-plugin @ line 98, column 17 [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-surefire-plugin @ line 107, column 17 [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-jar-plugin @ line 116, column 17 [WARNING] [WARNING] It is highly recommended to fix these problems because they threaten the stability of your build. [WARNING] [WARNING] For this reason, future Maven versions might no longer support building such malformed projects. [WARNING] [INFO] [INFO] -----------------------< org.example:FlinkStudy >----------------------- [INFO] Building FlinkStudy 1.0-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ FlinkStudy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 0 resource [INFO] [INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ FlinkStudy --- [INFO] Changes detected - recompiling the module! [INFO] Compiling 2 source files to C:\Users\Administrator\IdeaProjects\FlinkStudy\target\classes [INFO] [INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ FlinkStudy --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 0 resource [INFO] [INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ FlinkStudy --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-surefire-plugin:2.19:test (default-test) @ FlinkStudy --- [INFO] Tests are skipped. [INFO] [INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ FlinkStudy --- [INFO] Building jar: C:\Users\Administrator\IdeaProjects\FlinkStudy\target\FlinkStudy-1.0-SNAPSHOT.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.848 s [INFO] Finished at: 2021-08-25T09:41:03+08:00 [INFO] ------------------------------------------------------------------------ C:\Users\Administrator\IdeaProjects\FlinkStudy>
然后将生产的FlinkStudy-1.0-SNAPSHOT.jar文件上传到服务器
命令:
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/flink/FlinkStudy-1.0-SNAPSHOT.jar
运行结果:
Web界面显示执行结果:
虽然我们pom文件指定了main class,如果不指定对应的class,就执行pom文件里面指定的class,如果我们想执行该工程下其它class文件怎么办?这个时候我们需要用 -c 或者–class命令来指定对应的class文件
代码:
package org.example; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.Collector; /* @author 只是甲 * @date 2021-08-24 * @remark Flink的第二个wordCount程序 */ public class wordCount2 { public static void main(String[] args) throws Exception{ //创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //从文件中读取数据 String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt"; DataSet<String> inputDataSet = env.readTextFile(inputPath); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) // 按照第一个位置的word分组 .sum(1); // 将第二个位置上的数据求和; resultSet.print(); //env.execute(); //env.execute("Word Count Example"); System.out.println("这是第二个测试的wordcount"); } //自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //按空格分词 String[] words = value.split(" "); //遍历所有word,包成二元组输出 for (String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
运行命令:
-- 正确 flink run -m yarn-cluster -c org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar -- 正确 flink run -m yarn-cluster --class org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar --错误(依旧执行pom文件里面的main class) flink run -m yarn-cluster FlinkStudy-1.0-SNAPSHOT.jar -c org.example.wordCount2
执行截图: