本文已收录至Github,推荐阅读 👉 Java随想录
微信公众号:Java随想录
HBase(Hadoop Database)是一个开源的、分布式的、面向列的NoSQL数据库,它是构建在Hadoop之上的。HBase旨在提供可靠的、高性能的、可扩展的存储和访问大规模数据集的能力。
以下是HBase的一些关键特性和概念:
尽管Hadoop是一个强大的分布式计算框架,但它也存在一些不足之处,与HBase相比,以下是一些Hadoop的限制:
命名空间,类似于关系型数据库的Database概念,每个命名空间下有多个表。
HBase自带两个命名空间,分别是hbase和default,hbase 中存放的是HBase内置的表,default表是用户默认使用的命名空间,这2个命名空间默认是不展示的。
类似于关系型数据库的表概念。不同的是,HBase定义表时只需要声明列族即可,不需要声明具体的列。因为数据存储时稀疏的,空(null)列不占用存储空间,所有往HBase写入数据时,字段可以动态、按需指定。因此,和关系型数据库相比,HBase 能够轻松应对字段变更的场景。
HBase表中的每行数据都由一个RowKey和多个Column(列)组成,数据是按照RowKey的字典顺序存储的,并且查询数据时只能根据RowKey进行检索,所以RowKey的设计十分重要。
HBase中的每个列都由Colunn Family (列族)和Column Qualifier (列限定符)进行限定,例如info: name, info: age。 建表时,只需指明列族,而列限定符无需预先定义。
用于标识数据的不同版本(version),每条数据写入时,系统会自动为其加上该字段,其值为写入HBase的时间。
由{rowkey, column Family:column Qualifier, timestamp} 唯一确定的单元,Cell 中的数据全部是字节码形式存贮。
一条数据有多个版本,每个版本都是一个Cell。
HBase存储结构如下:
上面的这种数据会存储为下面这样,底层存储为Byte:
行分为Region,列分为Store,Region可以放在其他机器上。
HBase是基于HDFS的,而HDFS是不能够修改数据的,所以HBase其实也是不能修改数据的。HBase使用时间戳实现修改功能。取数据的时候取最新时间戳的数据,取出来的就是最新的数据。
HBase数据访问可以通过以下几种形式进行:
以上形式提供了不同的数据访问方式,可以根据具体的需求和查询条件选择适合的方式来访问和操作HBase中的数据。
HBase的架构体系是基于分布式存储和处理的设计。它包含了以下几个重要的组成部分:
这些组成部分共同构成了HBase的架构体系,实现了分布式存储和处理大规模数据集的能力。HMaster负责管理元数据和协调工作,RegionServer存储和处理数据,ZooKeeper提供分布式协调服务,HDFS提供底层的分布式文件存储,而HBase客户端用于与HBase进行交互。表和列族的概念提供了数据的组织和存储方式。
MemStore提供了临时的内存存储,StoreFile提供了持久化的磁盘存储,WAL用于保证数据的持久性。这种架构设计使得HBase能够提供高可用性、高性能和可扩展性的分布式存储和处理能力。
在HBase中,MemStore Flush是将内存中的数据刷新到磁盘上的StoreFile的过程。当MemStore中的数据达到一定大小阈值时,或者达到了一定的时间限制,HBase会触发MemStore Flush操作,以将数据持久化到磁盘,确保数据的持久性和可靠性。
下面是MemStore Flush的基本过程:
通过MemStore Flush操作,HBase可以将内存中的数据持久化到磁盘,以确保数据的持久性和可靠性。Flush操作的频率和成本可以通过配置参数进行调整,以适应不同的应用场景和性能需求。频繁的Flush操作可能会影响写入性能,而较长的Flush间隔可能会增加数据丢失的风险。因此,根据实际情况,需要合理设置Flush操作的参数,以平衡数据的持久性和写入性能的要求。
MemStore Flush在HBase中由以下几个参数进行控制,它们的含义如下:
上述的1和2,满足任一条件都会触发MemStore Flush操作。
这些参数需要根据具体的应用场景和性能要求进行合理的设置。较小的Flush阈值可以提高数据的持久性,但可能会增加Flush的频率和写入的开销;较大的Flush阈值可以减少Flush的频率和开销,但可能会增加数据丢失的风险。因此,需要根据应用的读写特征和数据的重要性,选择合适的参数值。
StoreFile Compaction(文件合并)是 HBase 中的一个重要操作,它用于合并和优化存储在磁盘上的数据文件(StoreFile)。StoreFile Compaction 可以帮助减少磁盘空间占用、提高读取性能,并且在某些情况下可以提高写入性能。
StoreFile Compaction 的基本过程如下:
通过 StoreFile Compaction,HBase 可以减少磁盘上的存储空间占用,提高读取性能,同时合并操作还可以优化数据布局,加速数据的访问。合适的合并策略的选择可以根据数据的访问模式和应用需求,以达到最佳的性能和存储效率。
StoreFile Compaction 过程中涉及到的一些相关参数及其含义如下:
这些参数可以在 HBase 的配置文件(hbase-site.xml)中进行设置。通过调整这些参数的值,可以根据数据量、存储需求和性能要求来优化 Compaction 操作的触发条件和行为。
以下是判断是否触发 Compaction 的过程:
判断是否满足进行 Minor Compaction 的条件:
判断是否满足进行 Major Compaction 的条件:
或者
对于即将进行 Compaction 的 StoreFile:
检查是否启用 Compaction:
判断触发 Compaction 的时间间隔:
根据以上判断过程,HBase 在每个 RegionServer 上的每个 Store(列族)会根据配置参数进行定期的 Compaction 检查。一旦满足触发 Compaction 的条件,相应的 Minor Compaction 或 Major Compaction 将被触发,合并和优化存储的数据文件。这样可以提高读取性能、节省磁盘空间,并且在某些情况下可以提高写入性能。
Region Split(区域分割)是 HBase 中的一个重要操作,它用于在数据增长过程中,将一个较大的 HBase 表的 Region(区域)划分成更小的子区域,以提高读写性能和负载均衡。
当一个 Region 的大小达到了预先配置的阈值时,HBase 将触发 Region Split 操作。Region Split 的基本过程如下:
常见的区域分割方式包括:
通过合理地使用区域分割,可以充分利用集群资源,提高读写性能和负载均衡能力。不同的分割策略和分割方式可以根据数据规模、访问模式和应用需求进行选择,以满足不同场景下的需求。
在 HBase 中进行预分区可以通过 HBase Shell 或 HBase API 进行操作。以下是使用 HBase Shell 进行预分区的示例:
打开 HBase Shell:
$ hbase shell
创建表并指定分区:
hbase(main):001:0> create 'my_table', 'cf', {SPLITS => ['a', 'b', 'c']}
上述命令创建了一个名为 my_table
的表,并指定了三个分区点:'a'、'b' 和 'c'。这将创建四个初始的子区域。
查看表的分区情况:
hbase(main):002:0> describe 'my_table'
这将显示表的详细信息,包括分区信息。
通过上述步骤,你可以在创建表时预先定义分区点,从而实现预分区。每个分区点将成为一个子区域的边界,确保数据在表创建时就能分布在多个子区域中,从而实现负载均衡和性能优化。
请注意,上述示例是使用 HBase Shell 进行预分区的简单示例。如果需要在编程中进行预分区,可以使用 HBase API,例如 Java API,通过在创建表时设置 SPLITS
参数来指定分区点。
以下是使用 HBase Java API 进行预分区的示例代码:
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class PreSplitExample { public static void main(String[] args) throws IOException { // 创建 HBase 配置 org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); // 创建 HBase 连接 try (Connection connection = ConnectionFactory.createConnection(config)) { // 创建 HBase 管理器 try (Admin admin = connection.getAdmin()) { // 定义表名 TableName tableName = TableName.valueOf("my_table"); // 定义分区点 byte[][] splitKeys = { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") }; // 创建表并指定分区 admin.createTable(TableDescriptorBuilder.newBuilder(tableName) .addColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")) .setSplitKeys(splitKeys) .build()); } } } }
上述代码通过 HBase Java API 创建了一个名为 my_table
的表,并指定了三个分区点:'a'、'b' 和 'c'。这将创建四个初始的子区域。
请注意,在使用 Java API 进行预分区时,需要先建立与 HBase 的连接,并通过 HBase 管理器(Admin)执行表的创建操作,并设置 setSplitKeys(splitKeys)
方法来指定分区点。
通过上述示例代码,你可以在编程中使用 HBase Java API 实现预分区功能。
在HBase中,可以通过设置Scan
对象的setCaching()
方法来调整Scan
缓存的大小。Scan
缓存用于指定每次扫描操作从RegionServer返回给客户端的行数。通过调整缓存大小,可以在一定程度上控制数据的读取性能和网络传输的开销。
以下是设置Scan
缓存的示例代码:
Scan scan = new Scan(); scan.setCaching(500); // 设置缓存大小为500行 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // 处理扫描结果 } scanner.close();
在上述示例中,setCaching()
方法将缓存大小设置为500行。可以根据实际需求调整这个值,需要根据数据大小、网络带宽和性能要求进行权衡。较大的缓存大小可以减少客户端与RegionServer之间的通信次数,提高读取性能,但同时也会增加内存消耗。较小的缓存大小可以减少内存消耗,但可能会增加通信次数和网络传输开销。
需要注意的是,setCaching()
方法设置的是每次扫描的缓存大小,并不是全局的设置。如果需要对整个表的扫描操作生效,需要在每次扫描时都设置缓存大小。
此外,还可以通过调整HBase的配置参数来全局设置缓存大小。在hbase-site.xml
配置文件中添加以下参数可以设置默认的缓存大小:
<property> <name>hbase.client.scanner.caching</name> <value>500</value> <!-- 设置默认的缓存大小为500行 --> </property>
以上是通过代码和配置文件来设置Scan
缓存大小的方法,根据具体的应用场景和需求,可以选择适当的方式进行设置。
当使用Scan或者GET获取大量的行时,最好指定所需要的列,因为服务端通过网络传输到客户端,数据量太大可能是瓶颈。如果能有效过滤部分数据,能很大程度的减少网络I/O的花费。
在HBase中,可以使用Scan
或Get
操作来显示指定的列。下面分别介绍两种方式的用法:
Scan
操作显示指定列:Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定列族(cf)和列(col1) ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 处理列(col1)的值 } scanner.close();
在上述示例中,使用scan.addColumn()
方法来指定要显示的列族和列。在for
循环中,通过result.getValue()
方法获取指定列的值。
Get
操作显示指定列:Get get = new Get(Bytes.toBytes("row1")); // 指定行键(row1) get.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定列族(cf)和列(col1) Result result = table.get(get); byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 处理列(col1)的值
在上述示例中,使用get.addColumn()
方法来指定要显示的列族和列。通过table.get()
方法获取行数据,并通过result.getValue()
方法获取指定列的值。
无论是使用Scan
还是Get
,都可以通过addColumn()
方法来指定要显示的列族和列。可以根据具体的需求,多次调用addColumn()
方法来显示多个列。
需要注意的是,HBase中的列是以字节数组(byte[]
)形式表示的,因此在使用addColumn()
和getValue()
方法时,需要将列族和列名转换为字节数组。
如果批量进行全表扫描,默认是有缓存的,如果此时有缓存,会降低扫描的效率。
在HBase中,可以通过设置Scan
对象的setCacheBlocks()
方法来禁用块缓存。块缓存是HBase中的一种缓存机制,用于加快数据的读取操作。然而,在某些情况下,禁用块缓存可能是有益的,例如对于某些热点数据或者需要立即获取最新数据的场景。
以下是禁用Scan
块缓存的示例代码:
Scan scan = new Scan(); scan.setCacheBlocks(false); // 禁用块缓存 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // 处理扫描结果 } scanner.close();
在上述示例中,setCacheBlocks(false)
方法将禁用Scan
操作的块缓存。
需要注意的是,禁用块缓存可能会增加对HBase存储的实际磁盘读取次数,并且在一些场景下可能导致性能下降。因此,在禁用块缓存之前,建议仔细评估应用需求和场景,确保禁用块缓存的决策是合理的。
对于经常读到的数据,建议使用默认值,开启块缓存。
Htable有一个属性是AutoFlush,该属性用于支持客户端的批量更新,默认是true,当客户端每收到一条数据,立刻发送到服务端,如果设置为false,当客户端提交put请求时候,先将该请求在客户端缓存,到达阈值的时候或者执行hbase.flushcommits(),才向RegionServer提交请求。
在HBase中,可以通过设置Table
对象的setAutoFlush()
方法来控制自动刷新(AutoFlush)行为。AutoFlush决定了在何时将数据从客户端发送到RegionServer并写入到存储中。
以下是设置AutoFlush的示例代码:
// 创建HBase配置对象 Configuration conf = HBaseConfiguration.create(); // 创建HBase连接 Connection connection = ConnectionFactory.createConnection(conf); // 获取表对象 TableName tableName = TableName.valueOf("your_table_name"); Table table = connection.getTable(tableName); // 设置AutoFlush table.setAutoFlush(false); // 关闭AutoFlush // 执行写入操作 Put put = new Put(Bytes.toBytes("row1")); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("value1")); table.put(put); // 手动刷新数据 table.flushCommits(); // 手动刷新数据到RegionServer // 关闭表和连接 table.close(); connection.close();
在上述示例中,table.setAutoFlush(false)
方法将关闭AutoFlush。这意味着在执行写操作时,数据不会立即被刷新到RegionServer和存储中,而是先缓存在客户端的内存中。只有当调用table.flushCommits()
方法时,数据才会被手动刷新到RegionServer。
需要注意的是,关闭AutoFlush可以提高写入性能,尤其是在批量写入或者频繁写入的场景中。但是,关闭AutoFlush也会增加数据在客户端内存中的暂存时间,并增加了数据丢失的风险。因此,在关闭AutoFlush时,需要在适当的时机手动调用flushCommits()
方法来确保数据的持久性。
同时,还可以通过设置table.setWriteBufferSize()
方法来指定客户端写缓冲区的大小。这可以帮助在缓存中存储更多的数据,减少刷新到RegionServer的次数,提高写入性能。例如:
table.setWriteBufferSize(1024 * 1024); // 设置写缓冲区大小为1MB
在上述示例中,将写缓冲区大小设置为1MB。
总之,通过设置table.setAutoFlush(false)
和table.setWriteBufferSize()
方法,可以控制AutoFlush行为和客户端写缓冲区大小,以优化写入性能和数据刷新的策略。根据具体的应用需求和场景,可以进行适当的配置调整。
属性:zookeeper.session.timeout
解释:默认值为 90000 毫秒(90s)。当某个 RegionServer 挂掉,90s 之后 Master 才能察觉到。可适当减小此值,尽可能快地检测 regionserver 故障,可调整至 20-30s。看你能有都能忍耐超时,同时可以调整重试时间和重试次数
hbase.client.pause(默认值 100ms)
hbase.client.retries.number(默认 15 次)
属性:hbase.regionserver.handler.count
解释:默认值为 30,用于指定 RPC 监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。
属性:hbase.hregion.majorcompaction
解释:默认值:604800000 秒(7 天), Major Compaction 的周期,若关闭自动 Major Compaction,可将其设为 0。如果关闭一定记得自己手动合并,因为大合并非常有意义。
属性:hbase.hregion.max.filesize
解释:默认值 10737418240(10GB),如果需要运行 HBase 的 MR 任务,可以减小此值,因为一个 region 对应一个 map 任务,如果单个 region 过大,会导致 map 任务执行时间。过长。该值的意思就是,如果 HFile 的大小达到这个数值,则这个 region 会被切分为两个 Hfile。
属性:hbase.client.write.buffer
解释:默认值 2097152bytes(2M)用于指定 HBase 客户端缓存,增大该值可以减少 RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少 RPC 次数的目的。
属性:hbase.client.scanner.caching
解释:用于指定 scan.next 方法获取的默认行数,值越大,消耗内存越大。
添加 Maven 依赖:
<!-- HBase 2.4.3 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.4.3</version> </dependency>
配置 HBase 连接:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @Configuration public class HBaseConfig { @Bean public Connection hbaseConnection() throws IOException { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "localhost"); // HBase ZooKeeper 地址 config.set("hbase.zookeeper.property.clientPort", "2181"); // HBase ZooKeeper 端口 return ConnectionFactory.createConnection(config); } }
编写增删改查代码:
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class HBaseService { @Autowired private Connection hbaseConnection; //添加数据 public void putData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException { Table table = hbaseConnection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); table.put(put); table.close(); } //删除数据 public void deleteData(String tableName, String rowKey) throws IOException { Table table = hbaseConnection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); table.close(); } //获取数据 public String getData(String tableName, String rowKey, String columnFamily, String column) throws IOException { Table table = hbaseConnection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); byte[] valueBytes = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); table.close(); return Bytes.toString(valueBytes); } }
在上述代码中,HBaseConfig
类配置了 HBase 连接,通过 hbaseConnection()
方法创建 HBase 连接。HBaseService
类提供了 putData()
、deleteData()
和 getData()
方法,分别用于插入数据、删除数据和获取数据。
以下是使用Scan 操作的示例代码:
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class HBaseScanExample { public static void main(String[] args) throws IOException { // 创建 HBase 配置对象 Configuration conf = HBaseConfiguration.create(); // 创建 HBase 连接 Connection connection = ConnectionFactory.createConnection(conf); // 获取表对象 TableName tableName = TableName.valueOf("your_table_name"); Table table = connection.getTable(tableName); // 创建 Scan 对象 Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定要查询的列族和列 // 执行 Scan 操作 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // 处理每一行数据 byte[] row = result.getRow(); byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")); System.out.println("Row key: " + Bytes.toString(row) + ", Value: " + Bytes.toString(value)); } // 关闭资源 scanner.close(); table.close(); connection.close(); } }
在上述代码中,首先创建 HBase 配置对象 Configuration
,然后通过 ConnectionFactory
创建 HBase 连接 Connection
。接下来,通过连接获取表对象 Table
,指定要进行 Scan 操作的表名。然后创建 Scan
对象,并使用 addColumn
方法指定要查询的列族和列。最后,使用 getScanner
方法执行 Scan 操作,并遍历 ResultScanner
获取每一行的数据,并进行处理。
Phoenix是一个开源的基于Apache HBase的关系型数据库引擎,它提供了SQL接口来访问HBase中存储的数据。它在HBase的基础上添加了SQL查询和事务功能,使得使用HBase的开发者可以使用熟悉的SQL语言进行数据操作和查询。
Phoenix在HBase中的主要用途包括:
要在HBase中使用Phoenix,需要先安装并配置好Phoenix。以下是一个在HBase中使用Phoenix的示例代码:
pom.xml
文件中添加以下依赖:<!-- Phoenix 依赖 --> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>4.16.0-HBase-2.4</version> </dependency>
users
的表:CREATE TABLE users ( id BIGINT PRIMARY KEY, name VARCHAR, age INTEGER );
PhoenixConnection
和 PhoenixStatement
来执行 SQL 操作。import java.sql.*; public class PhoenixExample { public static void main(String[] args) throws SQLException { // 创建 Phoenix 连接 String url = "jdbc:phoenix:<HBase ZooKeeper Quorum>:<HBase ZooKeeper Port>"; Connection connection = DriverManager.getConnection(url); // 执行 SQL 查询 String query = "SELECT * FROM users"; Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(query); // 处理查询结果 while (resultSet.next()) { long id = resultSet.getLong("ID"); String name = resultSet.getString("NAME"); int age = resultSet.getInt("AGE"); System.out.println("ID: " + id + ", Name: " + name + ", Age: " + age); } // 关闭资源 resultSet.close(); statement.close(); connection.close(); } }
在上述代码中,需要将 <HBase ZooKeeper Quorum>
和 <HBase ZooKeeper Port>
替换为你的 HBase ZooKeeper 地址和端口。
通过创建 PhoenixConnection
并传递正确的 JDBC URL,可以获得连接对象。接下来,可以使用 createStatement()
方法创建 PhoenixStatement
对象,并使用 executeQuery()
方法执行 SQL 查询。
然后,可以使用 ResultSet
对象遍历查询结果,并提取所需的字段。在此示例中,遍历了 users
表的结果,并打印了每行的 ID、Name 和 Age。
本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。文章持续更新,可以关注公众号第一时间阅读。