查看hadoop有哪些压缩算法的命令
[lqs@bdc112 hadoop-3.1.3]$ bin/hadoop checknative 2021-12-15 16:20:12,342 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native 2021-12-15 16:20:12,345 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 2021-12-15 16:20:12,348 WARN zstd.ZStandardCompressor: Error loading zstandard native libraries: java.lang.InternalError: Cannot load libzstd.so.1 (libzstd.so.1: 无法打开共享对象文件: 没有那个文件或目录)! 2021-12-15 16:20:12,353 WARN erasurecode.ErasureCodeNative: ISA-L support is not available in your platform... using builtin-java codec where applicable Native library checking: hadoop: true /home/lqs/module/hadoop-3.1.3/lib/native/libhadoop.so.1.0.0 zlib: true /lib64/libz.so.1 zstd : false snappy: true /lib64/libsnappy.so.1 lz4: true revision:10301 bzip2: true /lib64/libbz2.so.1 openssl: true /lib64/libcrypto.so ISA-L: false libhadoop was built without ISA-L support
压缩算法基础对比简介
压缩格式 | 是否自带 | 算法 | 文件扩展名 | 切片? | 转换成压缩文件后使用是否需要切片 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 否,直接使用 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 否,直接使用 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 否,直接使用 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 是,需要建索引,还需要指定输入格式 |
Snappy | 是,直接使用 | Snappy | .snappy | 否 | 否,直接使用 |
压缩性能对比
压缩算法 | 原始文件大小(G) | 压缩文件大小(G) | 压缩速度(mb/s) | 解压速度(mb/s) |
---|---|---|---|---|
gzip | 8.3 | 1.8 | 17.5 | 58 |
bzip2 | 8.3 | 1.1 | 2.4 | 9.5 |
LZO | 8.3 | 2.9 | 49.3 | 74.6 |
压缩方式的选择择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。同时还要考虑网络、IO流、磁盘、集群情况等进行选择。
压缩名 | 优点 | 缺点 |
---|---|---|
Gzip | 压缩率比较高 | 不支持Split(切片),压缩或者解压的速度一般 |
Bzip2 | 压缩率高,且支持切片 | 压缩或者解压的速度慢 |
Lzo | 压缩或者解压的速度比较快,支持切片 | 压缩率一般,想要支持切片需要额外创建索引 |
Snappy | 压缩或者解压的速度都快 | 不支持切片,压缩率一般 |
Map之前 | Map和Reducer之间 | Reducer之后 |
---|---|---|
一般情况下: 无须显示指定使用的编解码方式。Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。 企业开发考虑因素 : a、数据量小于块大小,重点考虑压缩和解压缩速度比较快的LZO/Snappy b、数据量非常大,重点考虑支持切片的Bzip2和LZO | 这里主要是指MapTask之后的输出。 企业开发中如何选择: 为了减少MapTask和ReduceTask之间的网络IO。重点考虑压缩和解压缩快的LZO、Snappy。 | 看需求: a、如果数据永久保存,考虑压缩率比较高的Bzip2和Gzip。 b、 如果作为下一个MapReduce输入,需要考虑数据量和是否支持切。 |
压缩格式 | 对应编码器、解码器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
启用压缩配置需要在hadoop中配置如下参数
参数 | 默认值 | 阶段 | 建议 |
---|---|---|---|
io.compression.codecs (在 core-site.xml 中配置) | 无,这个需要在命令行输入 hadoop checknative 查看 | 输入压缩 | Hadoop 使用文件扩展名判断是否支持某种编解码器 |
mapreduce.map.output.compress (在 mapred-site.xml 中配置) | false | mapper 输出 | 这个参数设为 true 启用压缩 |
mapreduce.map.output.compress.codec (在 mapredsite.xml 中配置) | org.apache.hadoop.io. compress.DefaultCodec | mapper 输出 | 企业多使用 LZO 或Snappy 编解码器在此阶段压缩数据 |
mapreduce.output.fileoutputformat.compress (在mapred-site.xml 中配置) | false | reducer 输出 | 这个参数设为 true 启用压缩 |
mapreduce.output.fileoutputformat.compress.codec (在mapred-site.xml 中配置) | org.apache.hadoop.io. compress.DefaultCodec | reducer 输出 | 使用标准工具或者编解码器,如 gzip 和bzip2 |
Map类
package com.lqs.mapreduce.zip; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author qingSong liu * @version 1.0 * @time 2021/12/8 19:48 */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k; IntWritable v; @Override protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) { k = k = new Text(); v = new IntWritable(1); } /** * @param key 偏移量 * @param value 一行数据 * @param context 上下文对象,传输,将数据传给reduce * @throws IOException 异常 * @throws InterruptedException 异常 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1、获取一行,转换字符串 String line = value.toString(); //2、切割 String[] words = line.split(" "); //3、输出,迭代写出 for (String word : words) { k.set(word); context.write(k, v); } } }
Reducer类
package com.lqs.mapreduce.zip; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author qingSong liu * @version 1.0 * @time 2021/12/8 19:52 */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int sum = 0; /** * 封装int类型 */ IntWritable v; @Override protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) { v = new IntWritable(); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { //1、累计求和 for (IntWritable count : values) { sum += count.get(); } //2、输出 v.set(sum); context.write(key, v); } }
Driver类
package com.lqs.mapreduce.zip; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author qingSong liu * @version 1.0 * @time 2021/12/8 19:55 */ public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1、获取配置信息以及获取job对象 Configuration configuration = new Configuration(); // 开启 map 端输出压缩 configuration.setBoolean("mapreduce.map.output.compress", true); // 设置 map 端输出压缩方式 configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration); //2、关联本Driver程序的jar,或者是驱动类 job.setJarByClass(WordCountDriver.class); //3、关联Mapper和Reducer的jar job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4、设置Mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword")); FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord")); //7、提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
Reducer输出端使用压缩
和上面的案例一样,这里只需要改Driver类
package com.lqs.mapreduce.zip; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author qingSong liu * @version 1.0 * @time 2021/12/8 19:55 */ public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1、获取配置信息以及获取job对象 Configuration configuration = new Configuration(); // // 开启 map 端输出压缩 // configuration.setBoolean("mapreduce.map.output.compress", true); 设置 map 端输出压缩方式 // configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration); //2、关联本Driver程序的jar,或者是驱动类 job.setJarByClass(WordCountDriver.class); //3、关联Mapper和Reducer的jar job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4、设置Mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword")); FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord")); // 设置 reduce 端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //7、提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
以下是常用的使用方式,直接在Driver类里的configuration里面配置,代码如下:
package com.lqs.mapreduce.zip; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author qingSong liu * @version 1.0 * @time 2021/12/8 19:55 */ public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1、获取配置信息以及获取job对象 Configuration configuration = new Configuration(); //Map configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec"); //Reducer configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); // // 开启 map 端输出压缩 // configuration.setBoolean("mapreduce.map.output.compress", true); 设置 map 端输出压缩方式 // configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration); //2、关联本Driver程序的jar,或者是驱动类 job.setJarByClass(WordCountDriver.class); //3、关联Mapper和Reducer的jar job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4、设置Mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword")); FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord")); // 设置 reduce 端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //7、提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }