对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口,同时对MapReduce处理好的数据利用Hive实现数据的基本统计。
设计要求:
- 根据数据特征,设计一个任务场景,利用MapReduce编程实现数据的清洗和预处理。(10分)
- 利用HDFS的JavaAPI编写程序将原始数据和预处理后的数据上传到分布式文件系统
数据集:
链接:https://pan.baidu.com/s/1rnUJn5ld45HpLhzbwYIM1A
提取码:7bsd
package com.company.HDFS; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class step0 { final static String INPUT_PATH="hdfs://192.168.88.100/data"; final static String OUTPUT_PATH="hdfs://192.168.88.100/output"; public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration configuration = new Configuration(); FileSystem fileSystem =FileSystem.get(new URI(INPUT_PATH),configuration); if (fileSystem.exists(new Path(OUTPUT_PATH))) { fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = new Job(configuration,"step0"); FileInputFormat.setInputPaths(job, INPUT_PATH); FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH)); job.setJarByClass(step0.class); job.setMapperClass(ReMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(ReReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } public static class ReReducer extends Reducer<Text,NullWritable, Text,NullWritable> { private IntWritable result = new IntWritable(); public ReReducer() { } protected void reduce(Text key2, Iterable<NullWritable> value2, Reducer<Text,NullWritable, Text,NullWritable>.Context context) throws IOException, InterruptedException { context.write(key2,NullWritable.get()); } } public static class ReMapper extends Mapper<LongWritable, Text, Text,NullWritable> { private static final int FAIL_DATA=9999; public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } } }
package com.company.HDFS; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class step1 { /** * 查看 所有文件 */ @Test public void demo_03() { try { //1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root"); // 2 获取文件详情 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus status = listFiles.next(); // 输出详情 // 文件名称 System.out.println(status.getPath().getName()); // 长度 System.out.println(status.getLen()); // 权限 System.out.println(status.getPermission()); // 分组 System.out.println(status.getGroup()); // 获取存储的块信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 获取块存储的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("-----------分割线----------"); } // 3 关闭资源 fs.close(); } catch (Exception ex) { } } /** * 上传 */ @Test public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException { // 1 获取文件系统 Configuration configuration = new Configuration(); configuration.set("dfs.replication", "2"); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root"); // 2 上传文件 fs.copyFromLocalFile(new Path("J:\\the_efforts_paid_offf\\HDFS_HBase_HiveApi\\src\\main\\java\\com\\company\\datas\\iris.data"), new Path("hdfs://192.168.88.100/input")); // 3 关闭资源 fs.close(); System.out.println("over"); } }
package com.company.HDFS; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import java.io.IOException; /** * @author ChinaManor * #Description hbase的javaAPI * #Date: 2021/12/19 18:10 */ public class step2 { /** * @Description: createTable():创建表的方法 * @Param: 0 * @return: 0 */ @Test public void createTable() throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181"); //建立连接 Connection conn = ConnectionFactory.createConnection(conf); //获取表的管理类 Admin admin = conn.getAdmin(); //定义表 HTableDescriptor hTableDescriptor=new HTableDescriptor(TableName.valueOf("demo")); //定义列簇 HColumnDescriptor hColumnDescriptor =new HColumnDescriptor("info"); //讲列簇定义到表中 hTableDescriptor.addFamily(hColumnDescriptor); //执行建表操作 admin.createTable(hTableDescriptor); admin.close(); conn.close(); } /** * @Description: 向Hbase中插入数据的方法 * @Param: null * @return: null */ @Test public void put(){ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181"); try { //建立连接 Connection conn= ConnectionFactory.createConnection(conf); //获取表 Table table=conn.getTable(TableName.valueOf("demo")); //用行键实例化put Put put= new Put("rk001".getBytes()); //指定列簇名,列名,和值 put.addColumn("info".getBytes(),"name".getBytes(),"zhangsan".getBytes()); table.put(put); table.close(); conn.close(); } catch (IOException e) { e.printStackTrace(); } } /** * @Description: scan()查询一个表的所有信息 * @Param: 1 * @return: 1 */ @Test public void scan() throws IOException { Configuration conf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181"); //建立连接 Connection conn=ConnectionFactory.createConnection(conf); //获取表 Table table=conn.getTable(TableName.valueOf("demo")); //初始化Scan实例 Scan scan=new Scan(); //增加过滤条件 scan.addColumn("info".getBytes(), "name".getBytes()); //返回结果 ResultScanner rss=table.getScanner(scan); //迭代并取出结果 for(Result rs:rss){ String valStr=Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes())); System.out.println(valStr); } //关闭连接 table.close(); conn.close(); } /** * @Description: delete()删除表中的信息 * @Param: 1 * @return: 1 */ @Test public void delete() throws IOException { Configuration conf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181"); //建立连接 Connection conn=ConnectionFactory.createConnection(conf); //获取表 Table table=conn.getTable(TableName.valueOf("demo")); // 用行键来实例化Delete实例 Delete del = new Delete("rk0001".getBytes()); // 执行删除 table.delete(del); //关闭连接 table.close(); conn.close(); } }