比如现在有一张表,数据如下:
分别用行存储于列存储。
a. 行存储的存储方式
传统数据库就是行存储,如MySQL等。
a. 列存储的存储方式
其中,这里对行进行了一个split,两行为一个split。思想与HBase的Region分区类似。
五、HBase的存储结构
a. Avro是行存储,Parquet是列存储。
b. 还需要清楚的是Avro与Parquet格式都是有Schema的,即结构。类似于我们传统数据库的字段,所以在写的时候需要指定。
a. 引入Avro相关jar包
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.0</version> </dependency>
b. 引入Avro的Schema文件,编辑,放于src/main/data
目录下,命名为:person.avsc
{"namespace": "com.shaonaiyi.hadoop.filetype.avro", "type": "record", "name": "Person", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": ["int", "null"]}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
我们准备使用此定义好结构的文件生成一个对应的Java实体类,所以这里定义了实体类存放的位置,这里是:com.shaonaiyi.hadoop.filetype.avro
c. 我们准备使用Maven插件工具生成Java类,此处引入插件:
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.7</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/data</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java</outputDirectory> </configuration> </execution> </executions> </plugin>
d. 生成Java类(clean
->compile
)
执行完,会发现已经生成了一个Person
类,可能会报错,我们将@Override
注释掉即可,因为之前写过一些代码,所以报错了,不管它。
e. Person
类里面这行就是我们所需要的Schema
,对应着我们的person.avsc
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.shaonaiyi.hadoop.filetype.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
g. 完整的写Avro文件代码
package com.shaonaiyi.hadoop.filetype.avro; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import java.io.File; import java.io.IOException; /** * @Author shaonaiyi@163.com * @Date 2019/12/17 16:17 * @Description 编码实现写Avro文件 */ public class AvroFileWriter { public static void main(String[] args) throws IOException { GenericRecord record1 = new GenericData.Record(Person.SCHEMA$); record1.put("name", "shaonaiyi"); record1.put("age", 18); record1.put("favorite_number", 7); record1.put("favorite_color", "red"); GenericRecord record2 = new GenericData.Record(Person.SCHEMA$); record2.put("name", "shaonaier"); record2.put("age", 17); record2.put("favorite_number", 1); record2.put("favorite_color", "yellow"); File file = new File("person.avro"); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(Person.SCHEMA$); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer); dataFileWriter.create(Person.SCHEMA$, file); dataFileWriter.append(record1); dataFileWriter.append(record2); dataFileWriter.close(); } }
h. 完整的读Avro文件代码
package com.shaonaiyi.hadoop.filetype.avro; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import java.io.File; import java.io.IOException; /** * @Author shaonaiyi@163.com * @Date 2019/12/17 16:48 * @Description 编码实现读Avro文件 */ public class AvroFileReader { public static void main(String[] args) throws IOException { File file = new File("person.avro"); DatumReader<GenericRecord> reader = new GenericDatumReader<>(); DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, reader); GenericRecord record = null; while (dataFileReader.hasNext()) { record = dataFileReader.next(); System.out.println("name:" + record.get("name").toString()); System.out.println("age:" + record.get("age").toString()); System.out.println("favorite_number:" + record.get("favorite_number").toString()); System.out.println("favorite_color:" + record.get("favorite_color")); System.out.println("-----------------------------------"); } } }
a. 写Avro文件
b. 读Avro文件
a. 引入所需要的jar包
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.8.0</version> </dependency>
b. 写Avro文件到HDFS完整代码
package com.shaonaiyi.hadoop.filetype.avro; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import java.io.IOException; /** * @Author shaonaiyi@163.com * @Date 2019/12/17 17:15 * @Description 编码实现写Avro文件到HDFS */ public class MRAvroFileWriter { public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException, InterruptedException { //1 构建一个job实例 Configuration hadoopConf = new Configuration(); Job job = Job.getInstance(hadoopConf); //2 设置job的相关属性 // job.setOutputKeyClass(NullWritable.class); // job.setOutputValueClass(Text.class); // job.setOutputFormatClass(TextOutputFormat.class); //job.setOutputKeyClass(AvroKey.class); //job.setOutputValueClass(Person.class); job.setOutputFormatClass(AvroKeyOutputFormat.class); //AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT)); AvroJob.setOutputKeySchema(job, Person.SCHEMA$); //3 设置输出路径 FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9999/user/hadoop-sny/mr/filetype/avro")); //FileOutputFormat.setCompressOutput(job, true); //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); //4 构建JobContext JobID jobID = new JobID("jobId", 123); JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID); //5 构建taskContext TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.REDUCE, 0, 0); TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId); //6 构建OutputFormat实例 OutputFormat format = job.getOutputFormatClass().newInstance(); //7 设置OutputCommitter OutputCommitter committer = format.getOutputCommitter(hadoopAttemptContext); committer.setupJob(jobContext); committer.setupTask(hadoopAttemptContext); //8 获取writer写数据,写完关闭writer RecordWriter<AvroKey, Person> writer = format.getRecordWriter(hadoopAttemptContext); // writer.write(null, new Text("shao")); // writer.write(null, new Text("nai")); // writer.write(null, new Text("yi")); // writer.write(null, new Text("bigdata-man")); Person person = new Person(); person.setName("jeffy"); person.setAge(20); person.setFavoriteNumber(10); person.setFavoriteColor("red"); writer.write(new AvroKey(person), null); writer.close(hadoopAttemptContext); //9 committer提交job和task committer.commitTask(hadoopAttemptContext); committer.commitJob(jobContext); } }
与写Text格式(文章链接跳转:Hadoop支持的文件格式之Text)时类似,主要不同如下:
c. 从HDFS上读Avro文件完整代码
package com.shaonaiyi.hadoop.filetype.avro; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import java.io.IOException; import java.util.List; import java.util.function.Consumer; /** * @Author shaonaiyi@163.com * @Date 2019/12/17 17:29 * @Description 编码实现从HDFS上读Avro文件 */ public class MRAvroFileReader { public static void main(String[] args) throws IOException, IllegalAccessException, InstantiationException { //1 构建一个job实例 Configuration hadoopConf = new Configuration(); Job job = Job.getInstance(hadoopConf); //2 设置需要读取的文件全路径 FileInputFormat.setInputPaths(job, "hdfs://master:9999/user/hadoop-sny/mr/filetype/avro"); //3 获取读取文件的格式 // TextInputFormat inputFormat = TextInputFormat.class.newInstance(); AvroKeyInputFormat inputFormat = AvroKeyInputFormat.class.newInstance(); //4 获取需要读取文件的数据块的分区信息 //4.1 获取文件被分成多少数据块了 JobID jobID = new JobID("jobId", 123); JobContext jobContext = new JobContextImpl(job.getConfiguration(), jobID); List<InputSplit> inputSplits = inputFormat.getSplits(jobContext); //读取每一个数据块的数据 inputSplits.forEach(new Consumer<InputSplit>() { @Override public void accept(InputSplit inputSplit) { TaskAttemptID attemptId = new TaskAttemptID("jobTrackerId", 123, TaskType.MAP, 0, 0); TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), attemptId); // RecordReader reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext); RecordReader<AvroKey<Person>, NullWritable> reader = null; try { // reader.initialize(inputSplit, hadoopAttemptContext); // System.out.println("<key,value>"); // System.out.println("-----------"); // while (reader.nextKeyValue()) { // System.out.println("<"+reader.getCurrentKey() + "," + reader.getCurrentValue()+ ">" ); // } reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext); reader.initialize(inputSplit, hadoopAttemptContext); while (reader.nextKeyValue()) { Person person = reader.getCurrentKey().datum(); System.out.println("key=>" + person); System.out.println("value=>" + reader.getCurrentValue()); } reader.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
与读Text格式(文章链接跳转:Hadoop支持的文件格式之Text)时类似,主要不同如下:
a. 写文件结果
b. 读文件结果,我们在代码里没有设置值
作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |
福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。