上篇:并行的Source定义
基于文件的Source,本质上就是使用指定的FileInputFormat格式读取数据,可以指定3种格式,分别是:TextInputFormat格式、CsvInputFormat格式、BinaryInputFormat格式。
基于文件的Source底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction,它们都是非并行的Source
1、readFile(FileInputFormat inputFormat, String filePath) 方法
可以指定读取文件的FileInputFormat 格式,有个重载的方法readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval)
可以指定FileProcessingMode,有两个枚举类型
PROCESS_ONCE模式:
PROCESS_CONTINUOUSLY模式:
注意:使用这种模式,文件的内容发生变化后,会将以前的内容和新的内容全部都读取出来,进而造成数据重复读取
package cn._51doit.flink.day01; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; /** * Source-readFile的使用 * readFile创建的Source是一个多并行度的Source,而且是一个无限的数据流(但是会重复读取数据),这个也会打印4个并行度 * 场景:有新数据就会马上读进来 * */ public class ReadFileDemo { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port", 8181); //设置web ui的端口 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); //改造,发现有新数据就读进来 //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化 String path = "E:\\englin"; DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 2000); int parallelism = lines.getParallelism(); System.out.println("fromElements创建的并行度的DataStreamSource为:"+parallelism); lines.print(); env.execute(); } }
场景:有新数据就会马上读进来,打印输出:
readTextFile(String filePath) 方法
可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码
该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行
该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE
readTextFile的使用只读取文件中的数据一次,读取完成后,程序退出,它是一个有限的数据流,会读指定的目录文件下的数据
readTextFile创建的DataStream也是一个多并行的,fromElements创建的并行度的DataStreamSource为:4
代码:
package cn._51doit.flink.day01; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Source-readTextFile的使用 *Source只读取文件中的数据一次,读取完成后,程序退出,它是一个有限的数据流,会读指定的目录文件下的数据 * readTextFile创建的DataStream也是一个多并行的 */ public class ReadTextFileDemo { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port", 8181); //设置web ui的端口 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); //改造,发现有新数据就读进来 //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化 String path = "E:\\englin"; DataStreamSource<String> lines = env.readTextFile(path); int parallelism = lines.getParallelism(); System.out.println("fromElements创建的并行度的DataStreamSource为:"+parallelism); lines.print(); env.execute(); } }