主要讲解:
Kudu 存储引擎
,类似HBase数据库,属于HBase和HDFS折中产品,既能够随机数据读写,又支持批量数据加载分析。
1、物流项目ETL流程 三大业务板块 1)、数据源Source,都是从Kafka消费交易业务数据 2)、编写结构化流程序应用,消费数据Kafka数据,进行ETL存储到各个业务板块存储引擎,比如Kudu、ES等 3)、开发相关业务板块应用程序 - 离线报表和即席查询:Kudu、SparkSQL及Impala和Hue - 实时大屏和数据服务接口:ClickHouse、NodeJS&Vue、SpringCloud - 快递物流信息检索:Es、SpringCloud 2、Kudu 框架概述 为什么要使用Kudu,解决什么问题??? 业务数据需要离线批处理(比如每日统计报表,批量加载数据分析):HDFS Parquet 随机数据读写(比如依据某个字段或主键查询相关数据):HBase | Kudu 诞生背景,小米、网易都在使用Kudu SQL on Hadoop 技术框架发展史 Hive -> HDFS\HBase 最早,最基础 Impala -> HDFS\HBase 内存分析引擎 Impala -> Kudu 快速存储之上快速分析 Kudu是什么 Kudu 架构 数据模型:表、Tablet、副本(leader和follower,Raft协议,数据一致性) 分区策略:Range(范围)、Hash(哈希)、多级分区 列式存储:查询少量列时 IO 少,速度快;数据压缩比高 整体架构:分布式架构,主从架构,主节点和从节点 Master:老大,管理者,元数据管理,Client请求Kudu表数据时,首先找到是Master 使用Raft协议,不需要依赖Zookeeper,奇数个节点,高可用性 TabletServer:小弟,干活的,管理Tablet数据,尤其数据读写 Kudu 安装部署 采用CM部署安装Kudu集群(伪分布式),如果是分布式集群,注意集群中各个机器时间同步
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6Z98S3vb-1621851226298)(/img/1612408861444.png)]
Kudu Client API:三种方式 - JavaClient、C++Client、PythonClient等API调用 - 与Spark集成,使用RDD或DataFrame操作数据 KuduContext SparkSession - 与Impala集成,提供SQL语句进行操作
主要讲解:存储引擎
Kudu
,类似HBase数据库,由Cloudera公司开发,目的取代HDFS和HBase框架,
Kudu API使用 1)、Java Client API使用 DDL操作(创建表、删除表和修改表) DML操作(CRUD,增删改查) 2)、与Spark集成 提供与Spark集成库,直接调用API使用即可
Kudu提供三种方式,操作Kudu数据库,进行DDL操作和DML操作:
Java client
、C++ client、Python client操作Kudu表,要构建Client并编写应用程序;
Kudu-Spark包
集成Kudu与Spark,并编写Spark应用程序来操作Kudu表
无论是Java Client API使用,还是Kudu集成Spark使用,添加Maven 依赖:
<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.9.0-cdh6.2.1</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark_2.11</artifactId> <version>1.9.0-cdh6.2.1</version> </dependency>
KUDU Client 在与服务端交互时,先从 Master Server 获取元数据信息,然后去 Tablet Server读写数据,如下图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-69QNixz5-1621851226300)(/img/1612410080266.png)]
今日工程目录结构说明:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FcxyBXWO-1621851226301)(/img/image-20210524084243810.png)]
首先使用Java Client API操作Kudu数据库,DDL操作(创建表、删除表及修改表)和DML操作(CRUD)。
创建Maven Project设置GAV如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EBTeL1ad-1621851226302)(/img/1612411674589.png)]
创建Maven Module模块,用于编写Java API 操作Kudu,模块GAV设置如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e4cZmSz7-1621851226303)(/img/1612411802819.png)]
构建Maven Project工程或Maven Module模块,POM文件添加依赖如下:
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <!-- 版本属性 --> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <kudu.version>1.9.0-cdh6.2.1</kudu.version> <junit.version>4.12</junit.version> </properties> <dependencies> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>${kudu.version}</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>${kudu.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
配置IDEA远程连接虚拟机,方便文件传输和远程命令行基本操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cHRnXzKW-1621851226304)(/img/1615876822272.png)]
在使用Java Client API之前,首先包package创建完成,此外,使用Java Client API操作Kudu数据库,需要创建客户端实例对象:
KuduClient
对象。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wyUENGxH-1621851226304)(/img/1612412290745.png)]
首先创建KuduClient对象,并且在应用运行结束的时候,需要关闭Client,所以采用JUnit方式构建和关闭。
package cn.itcast.kudu.table; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.junit.After; import org.junit.Before; import org.junit.Test; /** * 基于Java API对Kudu进行CRUD操作,包含创建表及删除表的操作 */ public class KuduTableDemo { // 定声明KuduClient实例对象 private KuduClient kuduClient = null ; @Before public void init() { // KuduMaster地址信息 String masterAddresses = "node2.itcast.cn:7051" ; // 初始化KuduClient实例对象 kuduClient = new KuduClient.KuduClientBuilder(masterAddresses) // 设置对此Kudu进行操作时超时时间,默认值为30s .defaultOperationTimeoutMs(10000) .build(); } @Test public void testKuduClient(){ System.out.println(kuduClient); } @After public void close() throws KuduException { // 测试完成以后,关闭连接 if(null != kuduClient) { kuduClient.close(); } } }
在Kudu提供API中,尤其是Java Client API,构建对象时,比如KuduClient,往往使用
建造者设计模式
,首先创建Builder对象,设置相关属性,最后获取实例对象。
任务:使用Java Client API在Kudu中创建表。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b4tcGCu9-1621851226305)(/img/1615877522545.png)]
create table itcast_users( id int, name string, age byte, primary key(id) ) paritition by hash(id) partitions 3 stored as kudu ;
Kudu提供面向对象API,将创建表DDL语句,封装到类中,具体如下图所示:
- 1)、
Schema
,里面存储表的所有列信息(列名称和列类型)- 2)、
CreateTableOptions
,封装表的分区策略,分区数目和副本数
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vL1nngmJ-1621851226305)(/img/1612420659082.png)]
创建测试方法,编写创建表的代码:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eCOXXCUD-1621851226306)(/img/1615877820058.png)]
/** * 用于构建Kudu表中每列的字段信息Schema * * @param name 字段名称 * @param type 字段类型 * @param isKey 是否为Key * @return ColumnSchema对象 */ private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) { // 创建ColumnSchemaBuilder实例对象 ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type); // 设置是否为主键 column.key(isKey) ; // 构建 ColumnSchema return column.build() ; } /** * 创建Kudu中的表,表的结构如下所示: create table itcast_users( id int, name string, age byte, primary key(id) ) paritition by hash(id) partitions 3 stored as kudu ; */ @Test public void createKuduTable() throws KuduException { // a. 定义Schema信息,列名称和列类型 List<ColumnSchema> columns = new ArrayList<>(); columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); columns.add(newColumnSchema("name", Type.STRING, false)); columns.add(newColumnSchema("age", Type.INT8, false)); Schema schema = new Schema(columns) ; // b. 设置表的属性 CreateTableOptions options = new CreateTableOptions() ; // 设置分区策略 options.addHashPartitions(Arrays.asList("id"), 3); // 设置副本数目 options.setNumReplicas(1) ; // c. 传递参数,创建表 /* public KuduTable createTable(String name, Schema schema, CreateTableOptions builder) */ KuduTable kuduTable = kuduClient.createTable("itcast_users", schema, options); System.out.println("Kudu Table ID = " + kuduTable.getTableId()); }
任务:删除Kudu中表,先判断表是否存在。
/** * 判断表是否存在,如果存在,将表删除 */ @Test public void dropKuduTable() throws KuduException { // 判断表是否存在 if(kuduClient.tableExists("itcast_users")){ // 传递表的名称,进行删除 kuduClient.deleteTable("itcast_users") ; } }
任务Task:向Kudu表中插入数据,先插入单条数据,再批量插入。
- 1)、获取表的句柄:
KuduTable
,通过KuduClient
获取- 2)、插入数据时,创建
Insert
对象,设置每行Row
的值- 3)、当向Kudu表插入数据时,创建会话实例对象
KuduSession
,类似PreparedStatement对象
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vt2Uwfy2-1621851226306)(/img/1612421761460.png)]
编写代码,向Kudu表插入数据,步骤如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yfKOvYMC-1621851226307)(/img/1615878818247.png)]
/** * 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26) */ @Test public void insertKuduData() throws KuduException { // a. 获取操作表句柄 KuduTable kuduTable = kuduClient.openTable("itcast_users"); // b. 获取KuduSession实例对象 KuduSession kuduSession = kuduClient.newSession(); // c. 插入数据,获取Insert对象 Insert insert = kuduTable.newInsert(); // d. 获取Row对象 PartialRow insertRow = insert.getRow(); // 设置值 insertRow.addInt("id", 1001); insertRow.addString("name", "itcast"); insertRow.addByte("age", (byte)25); // e. 插入数据 kuduSession.apply(insert); // f. 关闭连接 kuduSession.close(); }
上面编写代码,完成单条数据插入,接下来,批量插入数据,代码如下所示:
/** * 将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26) */ @Test public void insertKuduData() throws KuduException { // a. 获取操作表句柄 KuduTable kuduTable = kuduClient.openTable("itcast_users"); // b. 获取KuduSession实例对象 KuduSession kuduSession = kuduClient.newSession(); // 设置手动提交,刷新数据 kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); // 设置缓存数据量 kuduSession.setMutationBufferSpace(1000); Random random = new Random(); for(int index = 0; index < 100; index ++){ // c. 插入数据,获取Insert对象 Insert insert = kuduTable.newInsert(); // d. 获取Row对象 PartialRow insertRow = insert.getRow(); // 设置值 insertRow.addInt("id", 100 + index); insertRow.addString("name", "zhangsan-" + index); insertRow.addByte("age", (byte)(random.nextInt(10) + 21)); // e. 插入数据 kuduSession.apply(insert); } // 手动提交 kuduSession.flush(); // f. 关闭连接 kuduSession.close(); }
任务:从Kudu表中查询数据,属于全量查询数据。
从Kudu中查询数据与从Hbase查询数据代码类似,进行表的数据扫描
Scanner
。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o8oi16fg-1621851226307)(/img/1615880361316.png)]
注意:从Kudu表加载数据时,思路与HBase不一样,从表的每个Tablet中扫描查询数据,放到迭代器中,最后将所有Tablet查询结果的迭代器放入迭代器中。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ew3KfU03-1621851226307)(/img/1612422739021.png)]
编写代码,从Kudu表全量加载数据,注意,遍历查询数据时,进行双层循环获取数据。
/** * 从Kudu表中全量加载数据 */ @Test public void queryKuduData() throws KuduException { // 1. 获取表的句柄 KuduTable kuduTable = kuduClient.openTable("itcast_users"); // 2. 获取扫描器对象 KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable); KuduScanner kuduScanner = scannerBuilder.build(); // 3. 遍历获取的数据 int index = 0 ; while (kuduScanner.hasMoreRows()){ // 判断是否还有表的Tablet数据为获取 index += 1; System.out.println("tablet index = " + index); // 获取每个tablet中扫描的数据 RowResultIterator rowResults = kuduScanner.nextRows(); // 遍历每个Tablet中数据 while (rowResults.hasNext()){ RowResult rowResult = rowResults.next(); System.out.println( "id = " + rowResult.getInt("id") + ", name = " + rowResult.getString("name") + ", age = " + rowResult.getByte("age") ); } } }
任务:在实际项目中,从Kudu加载数据,肯定有过滤条件,接下来实现,如何进行过滤查询数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u8K3XPkw-1621851226308)(/img/1615880958773.png)]
使用KuduPlus实现上述过滤条件
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p6FNbbma-1621851226308)(/img/1615881042484.png)]
分析思路:
- 1)、功能一、选取字段,在Kudu中或者SQL语句中,称为
project
,投影,选择字段- 2)、功能二、过滤条件,在Kudu中或者SQL语句中,称为
predicate
,谓词,过滤条件
/** * 从Kudu表中全量加载数据 */ @Test public void queryKuduData() throws KuduException { // 1. 获取表的句柄 KuduTable kuduTable = kuduClient.openTable("itcast_users"); // 2. 获取扫描器对象 KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable); // TODO: 设置过滤条件 /* 查询id和age两个字段的值,年龄age小于25,id大于150 */ // TODO: 查询id和age两个字段 scannerBuilder.setProjectedColumnNames(Arrays.asList("id", "age")); // TODO: 年龄age小于25,id大于150 scannerBuilder.addPredicate( KuduPredicate.newComparisonPredicate( newColumnSchema("id", Type.INT32, true), KuduPredicate.ComparisonOp.GREATER, 150 ) ); scannerBuilder.addPredicate( KuduPredicate.newComparisonPredicate( newColumnSchema("age", Type.INT8, false), KuduPredicate.ComparisonOp.LESS, (byte)25 ) ); KuduScanner kuduScanner = scannerBuilder.build(); // 3. 遍历获取的数据 int index = 0 ; while (kuduScanner.hasMoreRows()){ // 判断是否还有表的Tablet数据为获取 index += 1; System.out.println("tablet index = " + index); // 获取每个tablet中扫描的数据 RowResultIterator rowResults = kuduScanner.nextRows(); // 遍历每个Tablet中数据 while (rowResults.hasNext()){ RowResult rowResult = rowResults.next(); System.out.println( "id = " + rowResult.getInt("id") + ", age = " + rowResult.getByte("age") ); } } }
任务:向Kudu表中数据进行更新和删除操作,类似Insert插入数据时操作。
主键
更新数据[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EoYlNBy8-1621851226309)(/img/1615881765826.png)]
/** * 更新Kudu表中数据 */ @Test public void updateKuduData() throws KuduException { // a. 获取操作表句柄 KuduTable kuduTable = kuduClient.openTable("itcast_users"); // b. 获取KuduSession实例对象 KuduSession kuduSession = kuduClient.newSession(); // c. 获取更新数据update对象 Update newUpdate = kuduTable.newUpdate(); // 获取Row对象 PartialRow updateRow = newUpdate.getRow(); // 设置更新的数据 updateRow.addInt("id", 153); updateRow.addString("name", "张三疯"); // e. 更新数据 kuduSession.apply(newUpdate); // f. 关闭连接 kuduSession.close(); }
在Kudu中,除了提供insert和update插入与更新方法外,开提供:
upsert
,表示当表中主键存在时,更新数据;不存在时,插入数据。实际项目中,建议使用upsert操作。
/** * 更新Kudu表中数据 */ @Test public void upsertKuduData() throws KuduException { // a. 获取操作表句柄 KuduTable kuduTable = kuduClient.openTable("itcast_users"); // b. 获取KuduSession实例对象 KuduSession kuduSession = kuduClient.newSession(); // c. 获取更新数据update对象 Upsert newUpsert = kuduTable.newUpsert(); // 获取Row对象 PartialRow upsertRow = newUpsert.getRow(); // 设置更新的数据 upsertRow.addInt("id", 253); upsertRow.addString("name", "张疯"); upsertRow.addByte("age", (byte)50); // e. 更新数据 kuduSession.apply(newUpsert); kuduSession.flush(); // f. 关闭连接 kuduSession.close(); }
对Kudu表数据进行删除时,需要按照主键id删除。
/** * 删除Kudu表中数据 */ @Test public void deleteKuduData() throws KuduException { // a. 获取操作表句柄 KuduTable kuduTable = kuduClient.openTable("itcast_users"); // b. 获取KuduSession实例对象 KuduSession kuduSession = kuduClient.newSession(); // c. 获取删除数据对象 Delete newDelete = kuduTable.newDelete(); // 获取Row对象 PartialRow deleteRow = newDelete.getRow(); // 设置主键 deleteRow.addInt("id", 253); // e. 更新数据 kuduSession.apply(newDelete); kuduSession.flush(); // f. 关闭连接 kuduSession.close(); }
为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。
- 1)、哈希分区:Hash Partitioning
- 哈希分区通过哈希值,将行分配到不同的 buckets ( 存储桶 )中;
- 哈希分区是一种有效的策略,当不需要对表进行有序访问时,哈希分区对于在 tablet 之间随
机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-octxG2uM-1621851226309)(/img/1615882540436.png)]
- 2)、范围分区:Range Partitioning
- 范围分区可根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象;
- 分区键必须是主键 或 主键的一部分;
- Range分区的方式:
id
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TdPfwI0j-1621851226310)(/img/1615882598956.png)]
实现上述案例需求:创建Kudu表,按照id进行范围分区
/** * 创建Kudu中的表,采用对id进行Range范围分区 */ @Test public void createKuduTableByRange() throws KuduException { // a. 定义Schema信息,列名称和列类型 List<ColumnSchema> columns = new ArrayList<>(); columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); columns.add(newColumnSchema("name", Type.STRING, false)); columns.add(newColumnSchema("age", Type.INT8, false)); Schema schema = new Schema(columns) ; // b. 设置表的属性 CreateTableOptions options = new CreateTableOptions() ; // 设置分区策略 options.setRangePartitionColumns(Arrays.asList("id")); // 设置范围分区字段名称 /* id < 100 100 <= id < 500 id > 500 */ // id < 100 PartialRow upper100 = new PartialRow(schema); upper100.addInt("id", 100); options.addRangePartition(new PartialRow(schema), upper100); // 100 <= id < 500 PartialRow lower100 = new PartialRow(schema); lower100.addInt("id", 100); PartialRow upper500 = new PartialRow(schema); upper500.addInt("id", 500); options.addRangePartition(lower100, upper500); // id > 500 PartialRow lower500 = new PartialRow(schema); lower500.addInt("id", 500); options.addRangePartition(lower500, new PartialRow(schema)); // 设置副本数目 options.setNumReplicas(1) ; // c. 传递参数,创建表 /* public KuduTable createTable(String name, Schema schema, CreateTableOptions builder) */ KuduTable kuduTable = kuduClient.createTable("itcast_users_range", schema, options); System.out.println("Kudu Table ID = " + kuduTable.getTableId()); }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9sUqmb2P-1621851226310)(/img/1615883070155.png)]
在Kudu中,创建表时,除了Hash分区和Range范围分区以外, 还支持多级分区:
- 1)、形式一、先哈希分区,再进行范围分区
- 2)、形式二、先哈希分区,再哈希分区
多级分区特点:
- Kudu 允许一个表上组合使用Hash分区 及 Range分区;
- 分区键必须是主键 或 主键的一部分;
- 多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T5IpLYut-1621851226311)(/img/1615883257060.png)]
创建表,实现上述表分区要求:先按照id进行哈希分区,再按照age做范围分区
/** * 创建Kudu中的表,采用多级分区策略,结合哈希分区和范围分区组合使用 */ @Test public void createKuduTableMulti() throws KuduException { // a. 构建表的Schema信息 List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); columnSchemas.add(newColumnSchema("id", Type.INT32, true)) ; columnSchemas.add(newColumnSchema("age", Type.INT8, true)) ; columnSchemas.add(newColumnSchema("name", Type.STRING, false)) ; // 定义Schema信息 Schema schema = new Schema(columnSchemas) ; // b. Kudu表的分区策略及分区副本数目设置 CreateTableOptions tableOptions = new CreateTableOptions() ; // TODO: e.1. 设置哈希分区 List<String> columnsHash = new ArrayList<>() ; columnsHash.add("id") ; tableOptions.addHashPartitions(columnsHash, 5) ; // TODO: e.2. 设值范围分区 /* age 做 range分区,分3个区 - < 21(小于等于20岁) - 21 - 41(21岁到40岁) - 41(41岁以上,涵盖41岁) */ List<String> columnsRange = new ArrayList<>() ; columnsRange.add("age") ; tableOptions.setRangePartitionColumns(columnsRange) ; // 添加范围分区 PartialRow upper21 = new PartialRow(schema) ; upper21.addByte("age", (byte)21); tableOptions.addRangePartition(new PartialRow(schema), upper21) ; // 添加范围分区 PartialRow lower21 = new PartialRow(schema) ; lower21.addByte("age", (byte)21); PartialRow upper41 = new PartialRow(schema) ; upper41.addByte("age", (byte)41); tableOptions.addRangePartition(lower21, upper41) ; // 添加范围分区 PartialRow lower41 = new PartialRow(schema) ; lower41.addByte("age", (byte)41); tableOptions.addRangePartition(lower41, new PartialRow(schema)) ; // 副本数设置 tableOptions.setNumReplicas(1) ; // c. 在Kudu中创建表 KuduTable userTable = kuduClient.createTable("itcast_users_multi", schema, tableOptions); System.out.println(userTable.toString()); }
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y7pVm4NG-1621851226311)(/img/1615883515978.png)]
任务:对Kudu中表进行修改,要么表添加列,要么表删除列,编程演示代码
- 1)、添加列
addColumn
/** * 对Kudu中表进行修改,增加列:address,String */ @Test public void alterKuduTableAddColumn() throws KuduException { // 添加列 AlterTableOptions ato = new AlterTableOptions() ; ato.addColumn("address",Type.STRING, "shanghai"); // 修改表 AlterTableResponse response = kuduClient.alterTable("itcast_users", ato); System.out.println(response.getTableId()); }
- 2)、删除列
dropColumn
/** * 对Kudu中表进行修改,删除列:address */ @Test public void alterKuduTableDropColumn() throws KuduException { // 添加列 AlterTableOptions ato = new AlterTableOptions() ; ato.dropColumn("address"); // 修改表 AlterTableResponse response = kuduClient.alterTable("itcast_users", ato); System.out.println(response.getTableId()); }
Kudu支持与Spark集成,并且提供集成库jar包,直接引入库,调用API即可,提供2套API:
- 1)、第一套:基于
RDD
数据集操作,KuduContext
上下文对象
- DDL操作,创建Kudu表和删除Kudu表
- 2)、第二套:基于
DataFrame
数据集操作,SparkSession
会话对象
- 从Kudu表中加载
load
和保存save
数据首先,创建Maven Module模块,添加相关依赖,创建包,如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HoAb0m1j-1621851226312)(/img/1612427178722.png)]
构建Maven Project工程或Maven Module模块,POM文件添加依赖如下:
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <!-- 版本属性 --> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.0-cdh6.2.1</spark.version> <hadoop.version>3.0.0-cdh6.2.1</hadoop.version> <kudu.version>1.9.0-cdh6.2.1</kudu.version> </properties> <!-- 依赖JAR包 --> <dependencies> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client-tools</artifactId> <version>${kudu.version}</version> </dependency> <!-- Kudu Client 依赖包 --> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>${kudu.version}</version> </dependency> <!-- Junit 依赖包 --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 --> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-spark2_2.11</artifactId> <version>${kudu.version}</version> </dependency> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark SQL 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
编写Spark Application时,设置日志级别,通过log4j.properties
设置,内容如下所示:
# Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to WARN. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=WARN # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=WARN log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
spark-shell时,可以通过--packages或--jars加载依赖jar包: 1)、--packages --packages org.apache.kudu:kudu-spark_2.10:1.5.0 必须联网,基于ivy方式下载所需要的jar包,存储在当前用户宿主目录下$USER_HOME/.ivy/jars/ 2)、--jars --jars /root/jars/xxx.jar,/root/jars/yy.jar 需要将jar包下载完成,放在本地,加载到应用中
任务:使用KuduContext创建Kudu表和删除Kudu表
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U9zFWH7W-1621851226312)(/img/1615885235177.png)]
package cn.itcast.kudu.table import java.util import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} /** * Kudu与Spark集成,使用KuduContext创建表和删除表 */ object KuduSparkTableDemo { /** * 创建Kudu表,指定名称 * * @param tableName 表的名称 * @param kuduContext KuduContext实例对象 */ def createKuduTable(tableName: String, kuduContext: KuduContext): Unit = { // a. 表的Schema信息 val schema: StructType = StructType( Array( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true), StructField("gender", StringType, nullable = true) ) ) // b. 表的主键 val keys: Seq[String] = Seq("id") // c. 创建表的选项设置 val options: CreateTableOptions = new CreateTableOptions() options.setNumReplicas(1) options.addHashPartitions(util.Arrays.asList("id"), 3) // 调用创建表方法 /* def createTable( tableName: String, schema: StructType, keys: Seq[String], options: CreateTableOptions ): KuduTable */ val kuduTable = kuduContext.createTable(tableName, schema, keys, options) println("Kudu Table ID: " + kuduTable) } /** * 删除Kudu中表 * @param tableName 表的名称 * @param kuduContext KuduContext实例对象 */ def dropKuduTable(tableName: String, kuduContext: KuduContext) = { // 判断表是否存在,如果存在,就删除表 if(kuduContext.tableExists(tableName)){ kuduContext.deleteTable(tableName) } } def main(args: Array[String]): Unit = { // 1. 构建SparkSession实例对象 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") .config("spark.sql.shuffle.partitions", "2") .getOrCreate() import spark.implicits._ // TODO: 创建KuduContext对象 val kuduContext: KuduContext = new KuduContext("node2.itcast.cn:7051", spark.sparkContext) println(s"KuduContext: ${kuduContext}") // 任务1: 创建表 //createKuduTable("kudu_itcast_users", kuduContext) // 任务2: 删除表 dropKuduTable("kudu_itcast_users", kuduContext) // 应用结束,关闭资源 spark.stop() } }
注意:在创建表时,主键不能为null,必须设置为false,字段放在最前面。
任务:编写程序,对Kudu表的数据,进行CRUD操作,与Java Client API类似。
- 1)、Insert插入数据、INSERT-IGNORE 如果存在,忽略
- 2)、DELETE删除数据
- 3)、UPDATE更新数据
- 4)、UPSERT插入更新数据,主键不存在就是插入,存在就是更新
package cn.itcast.kudu.data import cn.itcast.kudu.table.KuduSparkTableDemo.createKuduTable import org.apache.kudu.spark.kudu.KuduContext import org.apache.spark.sql.{DataFrame, SparkSession} /** * 对Kudu表的数据,进行CRUD操作 */ object KuduSparkDataDemo { /** * 向Kudu表中插入数据 */ def insertData(spark: SparkSession, kuduContext: KuduContext, tableName: String): Unit = { // a. 模拟产生数据 // TODO: 当RDD或Seq中数据类型为元组时,直接调用toDF,指定列名称,转换为DataFrame val usersDF: DataFrame = spark.createDataFrame( Seq( (1001, "zhangsan", 23, "男"), (1002, "lisi", 22, "男"), (1003, "xiaohong", 24, "女"), (1004, "zhaoliu2", 33, "男") ) ).toDF("id", "name", "age", "gender") // b. 将数据保存至Kudu表 kuduContext.insertRows(usersDF, tableName) } def main(args: Array[String]): Unit = { // 1. 构建SparkSession实例对象 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") .config("spark.sql.shuffle.partitions", "2") .getOrCreate() import spark.implicits._ // TODO: 创建KuduContext对象 val kuduContext: KuduContext = new KuduContext("node2.itcast.cn:7051", spark.sparkContext) //println(s"KuduContext: ${kuduContext}") val tableName = "kudu_itcast_users" // 插入数据 insertData(spark, kuduContext, tableName) // 查询数据 //selectData(spark, kuduContext, tableName) // 更新数据 //updateData(spark, kuduContext, tableName) // 插入更新数据 //upsertData(spark, kuduContext, tableName) // 删除数据 //deleteData(spark, kuduContext, tableName) // 应用结束,关闭资源 spark.stop() } }
/** * 从Kudu表中读取数据,封装到RDD数据集 */ def selectData(spark: SparkSession, kuduContext: KuduContext, tableName: String): Unit = { /* def kuduRDD( sc: SparkContext, tableName: String, columnProjection: Seq[String] = Nil, options: KuduReadOptions = KuduReadOptions() ): RDD[Row] */ val kuduRDD: RDD[Row] = kuduContext.kuduRDD(spark.sparkContext, tableName, Seq("name", "age")) // 遍历数据 kuduRDD.foreach{row => println( "name = " + row.getString(0) + ", age = " + row.getInt(1) ) } }
此外,可以使用KuduContext对表的数据进行update、upsert、delete等操作,类似insert操作。
任务:基于SparkSQL提供外部数据源方式从Kudu数据库中加载load和保存save数据,封装DataFrame中。
从Kudu表加载和保存数据数据时,可选项如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SygYXZOA-1621851226313)(/img/1612430230737.png)]
编写SparkSQL程序,从Kudu表加载load数据,进行转换,最终保存到Kudu表中。
// TODO: 2. 从Kudu表加载数据 val kuduDF: DataFrame = spark.read .format("kudu") .option("kudu.table", "kudu_itcast_users") .option("kudu.master", "node2.itcast.cn:7051") .load() kuduDF.printSchema() kuduDF.show(10, truncate = false)
// TODO: 保存数据到Kudu表 etlDF.write .mode(SaveMode.Append) .format("kudu") .option("kudu.table", "kudu_itcast_users") .option("kudu.master", "node2.itcast.cn:7051") .option("kudu.operation", "upsert") .save()
完整代码:从Kudu表读取数据,经过ETL转换,保存到Kudu表
package cn.itcast.kudu.sql import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.functions._ /** * 编写SparkSQL程序,从Kudu表加载load数据,进行转换,最终保存到Kudu表中。 */ object KuduSparkSQLDemo { def main(args: Array[String]): Unit = { // 1. 构建SparkSession实例对象 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") .config("spark.sql.shuffle.partitions", "2") .getOrCreate() import spark.implicits._ // TODO: 2. 从Kudu表加载数据 val kuduDF: DataFrame = spark.read .format("kudu") .option("kudu.table", "kudu_itcast_users") .option("kudu.master", "node2.itcast.cn:7051") .load() //kuduDF.printSchema() //kuduDF.show(10, truncate = false) /* +----+--------+---+------+ |id |name |age|gender| +----+--------+---+------+ |1001|zhangsan|23 |男 | -> M |1002|lisi |22 |男 | |1004|zhaoliu2|33 |男 | |1003|xiaohong|24 |女 | -> F +----+--------+---+------+ */ // 自定义UDF函数,转换gender性别 val gender_to_udf: UserDefinedFunction = udf( (gender: String) => { gender match { case "男" => "M" case "女" => "F" case _ => "M" } } ) // TODO: 调用UDF函数,进行转换 val etlDF: DataFrame = kuduDF.select( $"id", $"name", // $"age".plus(1).as("age"), gender_to_udf($"gender").as("gender") ) //etlDF.printSchema() //etlDF.show(10, truncate = false) // TODO: 保存数据到Kudu表 etlDF.write .mode(SaveMode.Append) .format("kudu") .option("kudu.table", "kudu_itcast_users") .option("kudu.master", "node2.itcast.cn:7051") .option("kudu.operation", "upsert") .save() // 应用结束,关闭资源 spark.stop() } }
ocal[2]")
.config(“spark.sql.shuffle.partitions”, “2”)
.getOrCreate()
import spark.implicits._
// TODO: 2. 从Kudu表加载数据 val kuduDF: DataFrame = spark.read .format("kudu") .option("kudu.table", "kudu_itcast_users") .option("kudu.master", "node2.itcast.cn:7051") .load() //kuduDF.printSchema() //kuduDF.show(10, truncate = false) /* +----+--------+---+------+ |id |name |age|gender| +----+--------+---+------+ |1001|zhangsan|23 |男 | -> M |1002|lisi |22 |男 | |1004|zhaoliu2|33 |男 | |1003|xiaohong|24 |女 | -> F +----+--------+---+------+ */ // 自定义UDF函数,转换gender性别 val gender_to_udf: UserDefinedFunction = udf( (gender: String) => { gender match { case "男" => "M" case "女" => "F" case _ => "M" } } ) // TODO: 调用UDF函数,进行转换 val etlDF: DataFrame = kuduDF.select( $"id", $"name", // $"age".plus(1).as("age"), gender_to_udf($"gender").as("gender") ) //etlDF.printSchema() //etlDF.show(10, truncate = false) // TODO: 保存数据到Kudu表 etlDF.write .mode(SaveMode.Append) .format("kudu") .option("kudu.table", "kudu_itcast_users") .option("kudu.master", "node2.itcast.cn:7051") .option("kudu.operation", "upsert") .save() // 应用结束,关闭资源 spark.stop() }
}