类型 | 切片方法getSplits | kv方法createRecordReader | 用途说明 |
---|---|---|---|
TextInputFormat | FIF的切片方法 | LineRecordReader | 按照块大小分片,按行行读取记录。 |
KeyValueTextInputFormat | FIF的切片方法 | KeyValueLineRecordReader | 按照块大小分片,按行读取记录。 |
SequenceFileInputFormat | FIF的切片方法 | SequenceFileRecordReader | 按照块大小分片,专用读取上一个任务使用SequenceFileOutputFormat 输出的文件。 |
FixedLengthInputFormat | FIF的切片方法 | FixedLengthRecordReader | 按照块大小分片,定长读取记录。(使用频率低) |
NLineInputFormat | 自定义,N行一片 | LineRecordReader | 通过指定行数进行分片,按行读取记录。 |
CombineFileInputFormat | 自定义 | LineRecordReader | 合并小文件进行分片读取,按行读取记录。 |
RecordReader分类
类型 | 说明 |
---|---|
LineRecordReader | 按行读取。 key: LongWritable ,内容偏移量value: Text ,一行数据 |
KeyValueLineRecordReader | 按照指定分割符进行分割 key: Text ,分割的前一部分value: Text ,分割的后一部分,可以通过configuration中的mapreduce.input.keyvaluelinerecordreader.key.value.separator 属性指定,默认是\t |
FixedLengthRecordReader | 读取固定长度内容(byte) key: LongWritable ,记录偏移量value: BytesWritable ,二进制数据 |
SequenceFileRecordReader | 主要串联job执行,读取上一个job的结果,作为当前job的输入,可以传递对象数据 |
场景:
假设目前有一堆小文件,需要通过一个MR程序转换为一个SequenceFile文件,其中key:文件路径,value:文件内容
思路:
- 自定义一个Format类继承
FileInputFormat
,其中key:Text,value:BytesWritable
(因为文件内容不一定是文本)- 需要重写方法:
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
,此时我们需要定义一个自己的RecordReader- 同时考虑分片问题,我们FileInputFormat默认是按照文件和块大小分片的,这里我们需要同一个文件不被切片即一个文件在一个分片内,需要重写方法
protected boolean isSplitable(JobContext context, Path filename)
InputFormat.java
/** * 将一堆小文件,转换为一个SequenceFile文件,key:原文件路径,value:原文件内容 * 这里的泛型即为MapTask的记录的输入类型,所以key:Text,value:BytesWritable(因为文件不一定是文本,所以用BytesWritable) */ public class PackageInputFormat extends FileInputFormat<Text, BytesWritable> { @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new PackageSequenceRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
RecordReader.java
public class PackageSequenceRecordReader extends RecordReader<Text, BytesWritable> { //标识文件是否已经被读取过 private boolean notRead = true; private FileSplit fs = null; private Text key = new Text(); private BytesWritable val = new BytesWritable(); private FSDataInputStream inputStream; //初始化只执行一次 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { fs = (FileSplit) split; Path path = fs.getPath(); FileSystem fileSystem = path.getFileSystem(context.getConfiguration()); inputStream = fileSystem.open(path); } //是否还有下一个数据,返回true则标识还有数据,否则无数据 @Override public boolean nextKeyValue() throws IOException, InterruptedException { //判断是否已读取过,未读取过进行读取 if(notRead){ //设置key key.set(fs.getPath().getName()); //设置val long fileLength = fs.getLength(); System.out.println("fs.getlength():"+fileLength); byte[] buf = new byte[(int) fileLength]; int length = inputStream.read(buf); val.set(buf,0,length); notRead = false; return true; }else{ return false; } } //获取当前key @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } //获取当前value @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return val; } //去读进度 @Override public float getProgress() throws IOException, InterruptedException { return notRead ? 0 : 1; } //关闭资源 @Override public void close() throws IOException { IOUtils.closeStream(inputStream); } }
Driver.java
public class PackageDriver { public static void main(String[] args) throws Exception { //1. 创建一个Job Configuration conf = new Configuration(); conf.set("mapred.reduce.child.java.opts", "-Xmx512m"); Job job = Job.getInstance(conf); //2. 设置类路径 job.setJarByClass(FlowDriver.class); //3. 设置Mapper和Reducer:不需要MR //4. 设置Mapper和Reducer的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); //5. 设置输入输出数据 FileInputFormat.setInputPaths(job,new Path("d://hadoop-study/inputformat/input")); FileOutputFormat.setOutputPath(job,new Path("d://hadoop-study/inputformat/output")); //设置InputFormat job.setInputFormatClass(PackageInputFormat.class); //设置OutputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class); //6. 提交job boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }