C/C++教程

hadoop数据压缩及涉及的相关算法和(MapReduce)代码示例演示

本文主要是介绍hadoop数据压缩及涉及的相关算法和(MapReduce)代码示例演示,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

查看hadoop有哪些压缩算法的命令

[lqs@bdc112 hadoop-3.1.3]$ bin/hadoop checknative
2021-12-15 16:20:12,342 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
2021-12-15 16:20:12,345 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
2021-12-15 16:20:12,348 WARN zstd.ZStandardCompressor: Error loading zstandard native libraries: java.lang.InternalError: Cannot load libzstd.so.1 (libzstd.so.1: 无法打开共享对象文件: 没有那个文件或目录)!
2021-12-15 16:20:12,353 WARN erasurecode.ErasureCodeNative: ISA-L support is not available in your platform... using builtin-java codec where applicable
Native library checking:
hadoop:  true /home/lqs/module/hadoop-3.1.3/lib/native/libhadoop.so.1.0.0
zlib:    true /lib64/libz.so.1
zstd  :  false 
snappy:  true /lib64/libsnappy.so.1
lz4:     true revision:10301
bzip2:   true /lib64/libbz2.so.1
openssl: true /lib64/libcrypto.so
ISA-L:   false libhadoop was built without ISA-L support

目录

  • hadoop数据压缩简介
  • MR(MapReduce)支持的压缩编码
  • 选择压缩的条件
  • 几种压缩算法的对比
  • 压缩位置的选择
  • 压缩参数的配置
  • Map 输出端采用压缩

hadoop数据压缩简介


数据压缩的好处:
可以减少磁盘IO、减少磁盘存储空间
压缩缺点:
增加CPU开销

压缩原则:
a、运行密集型的作业,尽量少用压缩
b、IO密集型的作业,尽量多用压缩

MR(MapReduce)支持的压缩编码

压缩算法基础对比简介

压缩格式是否自带算法文件扩展名切片?转换成压缩文件后使用是否需要切片
DEFLATE是,直接使用DEFLATE.deflate否,直接使用
Gzip是,直接使用DEFLATE.gz否,直接使用
bzip2是,直接使用bzip2.bz2否,直接使用
LZO否,需要安装LZO.lzo是,需要建索引,还需要指定输入格式
Snappy是,直接使用Snappy.snappy否,直接使用

压缩性能对比

压缩算法原始文件大小(G)压缩文件大小(G)压缩速度(mb/s)解压速度(mb/s)
gzip8.31.817.558
bzip28.31.12.49.5
LZO8.32.949.374.6

选择压缩的条件

压缩方式的选择择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片。同时还要考虑网络、IO流、磁盘、集群情况等进行选择。

几种压缩算法的对比

压缩名优点缺点
Gzip压缩率比较高不支持Split(切片),压缩或者解压的速度一般
Bzip2压缩率高,且支持切片压缩或者解压的速度慢
Lzo压缩或者解压的速度比较快,支持切片压缩率一般,想要支持切片需要额外创建索引
Snappy压缩或者解压的速度都快不支持切片,压缩率一般

压缩位置的选择

Map之前Map和Reducer之间Reducer之后
一般情况下:
无须显示指定使用的编解码方式。Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。

企业开发考虑因素 :
a、数据量小于块大小,重点考虑压缩和解压缩速度比较快的LZO/Snappy
b、数据量非常大,重点考虑支持切片的Bzip2和LZO
这里主要是指MapTask之后的输出。

企业开发中如何选择:
为了减少MapTask和ReduceTask之间的网络IO。重点考虑压缩和解压缩快的LZO、Snappy。
看需求:

a、如果数据永久保存,考虑压缩率比较高的Bzip2和Gzip。
b、 如果作为下一个MapReduce输入,需要考虑数据量和是否支持切。

压缩参数的配置

压缩格式对应编码器、解码器
DEFLATEorg.apache.hadoop.io.compress.DefaultCodec
gziporg.apache.hadoop.io.compress.GzipCodec
bzip2org.apache.hadoop.io.compress.BZip2Codec
LZOcom.hadoop.compression.lzo.LzopCodec
Snappyorg.apache.hadoop.io.compress.SnappyCodec

启用压缩配置需要在hadoop中配置如下参数

参数默认值阶段建议
io.compression.codecs
(在 core-site.xml 中配置)
无,这个需要在命令行输入
hadoop checknative 查看
输入压缩Hadoop 使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress
(在 mapred-site.xml 中配置)
falsemapper 输出这个参数设为 true 启用压缩
mapreduce.map.output.compress.codec
(在 mapredsite.xml 中配置)
org.apache.hadoop.io.
compress.DefaultCodec
mapper 输出企业多使用 LZO 或Snappy 编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress
(在mapred-site.xml 中配置)
falsereducer 输出这个参数设为 true 启用压缩
mapreduce.output.fileoutputformat.compress.codec
(在mapred-site.xml 中配置)
org.apache.hadoop.io.
compress.DefaultCodec
reducer 输出使用标准工具或者编解码器,如 gzip 和bzip2

Map 输出端采用压缩

Map类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:48
 */

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k;

    IntWritable v;

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) {
        k = k = new Text();
        v = new IntWritable(1);
    }

    /**
     * @param key     偏移量
     * @param value   一行数据
     * @param context 上下文对象,传输,将数据传给reduce
     * @throws IOException          异常
     * @throws InterruptedException 异常
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1、获取一行,转换字符串
        String line = value.toString();

        //2、切割
        String[] words = line.split(" ");

        //3、输出,迭代写出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

Reducer类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:52
 */

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum = 0;
    /**
     * 封装int类型
     */
    IntWritable v;

    @Override
    protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) {
        v = new IntWritable();
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        //1、累计求和
        for (IntWritable count : values) {
            sum += count.get();
        }

        //2、输出
        v.set(sum);
        context.write(key, v);

    }
}

Driver类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1、获取配置信息以及获取job对象
        Configuration configuration = new Configuration();

        // 开启 map 端输出压缩
        configuration.setBoolean("mapreduce.map.output.compress", true);
		// 设置 map 端输出压缩方式
        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2、关联本Driver程序的jar,或者是驱动类
        job.setJarByClass(WordCountDriver.class);

        //3、关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4、设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        //7、提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

Reducer输出端使用压缩
和上面的案例一样,这里只需要改Driver类

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1、获取配置信息以及获取job对象
        Configuration configuration = new Configuration();

//        // 开启 map 端输出压缩
//        configuration.setBoolean("mapreduce.map.output.compress", true);
 设置 map 端输出压缩方式
//        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2、关联本Driver程序的jar,或者是驱动类
        job.setJarByClass(WordCountDriver.class);

        //3、关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4、设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        // 设置 reduce 端输出压缩开启
        FileOutputFormat.setCompressOutput(job, true);
        // 设置压缩的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //7、提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

以下是常用的使用方式,直接在Driver类里的configuration里面配置,代码如下:

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1、获取配置信息以及获取job对象
        Configuration configuration = new Configuration();

        //Map
        configuration.set("mapreduce.map.output.compress","true");
        configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");

        //Reducer
        configuration.set("mapreduce.output.fileoutputformat.compress","true");
        configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

//        // 开启 map 端输出压缩
//        configuration.setBoolean("mapreduce.map.output.compress", true);
 设置 map 端输出压缩方式
//        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2、关联本Driver程序的jar,或者是驱动类
        job.setJarByClass(WordCountDriver.class);

        //3、关联Mapper和Reducer的jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4、设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        // 设置 reduce 端输出压缩开启
        FileOutputFormat.setCompressOutput(job, true);
        // 设置压缩的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //7、提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

这篇关于hadoop数据压缩及涉及的相关算法和(MapReduce)代码示例演示的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!