这个问题在网上已经一搜一大把了,为什么要自己亲手总结一下仅仅是因为最近自己公司新上了HBase然后使用Spark去读取MySQL的数据写入HBase的时候遇到了一些问题,也困扰了挺久
现在就详细描述一下我去编写这个程序的流程,代码是如何去变化的
我们现在就需要做两件事情,一个是MySQL中的表需要迁移过来HBase,这部分是全量同步,还有就是做数据的增量同步,这个现在不列入我们的需求之中
我下面的代码是scala代码,非常简单,首先和MySQL取得连接,然后通过一个DataFrame去接收它就好了
val url = "jdbc:mysql://xxx:xxx/xxx?characterEncoding=utf-8&useSSL=false" val connectProperties = new Properties() connectProperties.setProperty("user","xxx") connectProperties.setProperty("password","xxx") connectProperties.setProperty("driver","com.mysql.jdbc.Driver") connectProperties.setProperty("partitionColumn","xxx") val columnName = "Id" val lowerBound = 1 val upperBound = 30 val numPartitions = 2 val tableName = "xxx" def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("JuejinDemo").setMaster("local[2]") val spark = SparkSession.builder().config(sparkConf).getOrCreate() val jdbcDF = spark.read.jdbc(url, tableName,columnName,lowerBound,upperBound,numPartitions,connectProperties) //展示表结构 jdbcDF.printSchema() //展示数据 jdbcDF.show() } 复制代码
几行代码的事,简单说明一下参数
// MySQL的URL var url = "jdbc:mysql://xxx:xxx/xxx?characterEncoding=utf-8&useSSL=false" // MySQL表名 val tableName = "xxx" // 连接配置 var connectProperties = new Properties() // MySQL用户名 connectProperties.setProperty("user","xxx") // MySQL密码 connectProperties.setProperty("password","xxx") // 驱动 connectProperties.setProperty("driver","com.mysql.jdbc.Driver") val columnName = "Id" // 从id为1开始读 val lowerBound = 1 // 下方解释 val upperBound = 100 val numPartitions = 2 复制代码
打印的结果:
现在有的数据:
注意:upperBound和numPartitions两个参数是有关联的,upperBound / numPartitions = 每个分区需要写入多少条数据,所以最好就是搞清楚数据总量是多少,因为笔者就遇到这么一个问题,本来总量是4000W条数据,笔者设置upperBound = 3千万,numPartitions = 300,那么每个分区就需要写入10W条数据。
而这个分区的规则是,前面299的分区都写入10W条数据,但是最后第300个分区就会写入10W+4000W-3000W = 1010W条数据,导致程序OOM了好几回而且找不出原因,设置executor-memory = 3G都不够吃,所以一定要注意
当然此时你也可以用SQL的方式去查询MySQL,然后把查询出来的结果当做你要写入的数据,因为使用jdbc方法的时候,是固定要把MySQL的整张表给读完的,所以会不可控,代码如下
// 连接MySQL读取数据 val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://" +host+ ":xxx/" +dbName+ "?characterEncoding=utf-8&useSSL=false") .option("driver", "com.mysql.jdbc.Driver") .option("user", username) .option("password", password) .option("query","select * from " +tableName).load() 复制代码
这里是使用了format方法,此时你需要提供的参数也很简单,和上面的jdbc是一样的
这个时候我们就已经成功能读到数据了,和数据源的整合就是我们开发的第一步,之后就是对数据进行处理和发送到对应的下游,而对应的下游其实很多都是由API去提供支持,我们只需要把数据转成这个整合的API需要的格式即可
这里我使用了dtype属性,这里.var直接出来的是一个Array(String,String)
从输出的结果可以看到,我们这个type数组是记录了这张MySQL表中的字段和字段类型,所以这个时候我就可以用循环去遍历它并且对每个字段的数据进行处理了,让我们先拿到数据
这时我把DataFrame转换成了RDD进行处理,我先使用foreach输出一下,之后我会用map代替
现在拿到数据了,我们需要和HBase进行映射,那HBase的存储刚刚也说过了是列式存储,就是一个rowkey,对应多个列族的多个字段。这里我假设只有1个列族info。现在我们需要的条件就是,rowkey,columnFamily = info,字段,字段值即可
这里我简单把MySQL的Id作为rowkey的值,而且定义了一个getString方法,就是按照这条数据row的不同字段value_type去取得这个字段所对应的值,因为字段存在多个,所以它们是作为一个数组存在的,i就是这个数组的下标
/** * 根据每个字段的字段类型调用不同的函数,将字段值转换为HBase可读取的字节形式 * 解决数字导入到HBase中变成乱码的问题 * * @param value_type * @param row * @param i * @return */ def getString(value_type: String, row: Row, i: Int): String = { if (row != null && row.length != 0) { var str = "" if ("IntegerType" == value_type) { str = row.getInt(i).toString } else if ("StringType" == value_type) { str = row.getString(i) } else if ("FloatType" == value_type) { str = row.getFloat(i).toString } else if ("DoubleType" == value_type) { str = row.getDouble(i).toString } else if ("TimestampType" == value_type) { str = row.getTimestamp(i).toString } str } else "" } 复制代码
然后我们再在刚刚的rdd中去调用上面的方法,把Id取出来赋值给rowkey
// 遍历所有的字段 for (j <- 0 to (types.length-1)){ // 取出值为Id的那个字段 if (types.apply(j)._1 == "Id"){ // 将Id字段对应的值赋给rowKey rowKey = getString(types.apply(j)._2,row,j) println("rowkey的值为:"+rowKey) } } 复制代码
运行结果如下,因为我现在这张表只有2条数据,所以只有两个rowkey
这时候我们就要创建Put对象了,Put是我们往HBase插入数据需要实例化的对象
丢到百度翻译后我们可以看到参数方面的信息
new一个Put对象需要传入rowkey,然后调用addColumn方法。这里的addColumn需要有4个参数,第一个byte[]数组是rowkey,第二个byte[]数组是列族,long是这条row的时间戳(这个不用我们自己传入),因为hbase删除和更新的数据仅仅只是做了标记,并没有物理移除,更新即是插入一条最新时间戳的数据而已,而最后一个字节数组就是这个字段对应的值
// 列族名,定义在main方法外层即可 val columnFamily = "info" val put = new Put(Bytes.toBytes(rowKey)) for (i <- 0 until row.size){ put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(types.apply(i)._1), Bytes.toBytes(getString(types.apply(i)._2, row, i))) //打印HBase中的数据 println("rowKey: "+rowKey+" , "+"columnFamily: "+columnFamily+" , "+ "column: "+types.apply(i)._1+" , "+"cell: "+getString(types.apply(i)._2, row, i)) } 复制代码
我把这个打印出来的数据给你们对应一下,你们就看的很清楚了,因为addColumn需要字节数组,所以存入的时候和打印出来的数据会有一些出入
此时我们把foreach换回map,然后把这个Put作为返回值即可,这里注意数据结构叫做ImmutableBytesWritable
我把整个结果截图了,方便大家对照一下
这一步就很简单了,调用API的事儿,补充一下,到目前为止我们使用到的依赖有以下
import java.util.Properties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants} import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} 复制代码
现在开始写入,这是固定的套路
val sc = spark.sparkContext sc.setLogLevel("ERROR") val hadoopConfiguration: Configuration = sc.hadoopConfiguration val hbaseTableName = "zbchatmsg" hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName) val hbaseConf = getHBaseConf(hadoopConfiguration) val job = new Job(hbaseConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) 复制代码
getHBaseConf方法只是定义了zookeeper的地址信息
/** * 获取 HBase相关参数 * @param hadoopConf * @return */ def getHBaseConf(hadoopConf: Configuration): Configuration = { val hbaseConf = HBaseConfiguration.create(hadoopConf) // zookeeper的地址 hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "") // zookeeper的端口 hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "") hbaseConf } 复制代码
这里我先把原本的zbchatmsg给disable后删除,然后创建了一个新的,执行程序后,scan它,就可以输出我的数据了
到此为止,我们的MySQLToHBase的程序就已经搞定了
我们可以自己去写一个方法,在没有创建好表的时候让程序帮忙创一个表,或者我们也可以直接自己事先建表
val hBaseTableName = "你的HBase表名" /** * 创建HBase表 * @param tableName 表名 */ def createHTable(tableName: String, hBaseConf : Configuration) = { val connection = ConnectionFactory.createConnection(hBaseConf) val hBaseTableName = TableName.valueOf(tableName) val admin = connection.getAdmin if (!admin.tableExists(hBaseTableName)) { val tableDesc = new HTableDescriptor(hBaseTableName) tableDesc.addFamily(new HColumnDescriptor("info".getBytes)) admin.createTable(tableDesc) } connection.close() } 复制代码
当然你会发现我们现在的这个程序非常的不方便,因为我们跑程序的时候肯定是有生产环境和测试环境的,我们想要通过运行程序时传递参数来判断,这个程序应该是用什么环境,应该读那张MySQL表,应该写入哪张HBase表就好了。
所以我们就开始改了
首先是执行环境的问题,我写了一个 checkArguments 方法去判断,当接收dev的时候就是测试环境,pro就是生产环境,此时我们的url,connectProperties就需要定义为
var url = new String var connectProperties = new Properties() 复制代码
checkArguments方法:
/** * 检验运行时参数的方法 --- args = pro or dev * @param args */ def checkArguments(args: String): Unit ={ val runType = mutable.HashSet("dev", "pro") if (args == null || args.length == 0 || !runType.contains(args)) { throw new Exception("Illegal starting parameter ......") } if (args == "pro"){ url = "" connectProperties.setProperty("user","") connectProperties.setProperty("password","") connectProperties.setProperty("driver","com.mysql.jdbc.Driver") connectProperties.setProperty("partitionColumn","Id") } if (args == "dev"){ url = "" connectProperties.setProperty("user","") connectProperties.setProperty("password","") connectProperties.setProperty("driver","com.mysql.jdbc.Driver") connectProperties.setProperty("partitionColumn","Id") } } 复制代码
逻辑很简单,就是一个集合存放了dev和pro,如果你输入的不是这俩的其中一个,就异常,如果对了,dev和pro里面分别对应生产和测试的MySQL的URL,用户名,密码即可
/** * 获取 HBase相关参数 * (这里需要修改 HConstants.ZOOKEEPER_QUORUM 与 "hbase.master" 参数) * * @param hadoopConf * @return */ def getHBaseConf(hadoopConf: Configuration,runType:String): Configuration = { val hbaseConf = HBaseConfiguration.create(hadoopConf) if (runType== "prototest"){ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "") hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") } if (runType == "dev"){ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "") hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") } hbaseConf } 复制代码
增加下面代码即可
// 运行环境 dev or pro val runType = args(0) checkArguments(runType) // MySQL表名 val tableName = args(1) // HBase表名 val hbaseTableName = args(2) upperBound = args(3).toInt numPartitions = args(4).toInt 复制代码
因为 upperBound 和 numPartitions 控制了我们的每个task的写入数据量,所以也作为参数一并在运行时传入即可,比如此时要运行这个代码,我就会在IDEA中点击
传入需要输入的参数
即可正常运行
使用这个方式时用小表测试是没有意义的,因为几万数据的小表这个程序都不会出现什么问题,这里我使用的是一张5000W左右的bigtable表,然后记录一下踩坑的记录。
这个套路就是先把MySQL的表数据按照HFile需要的
(rowkey,Array(info,column,columnName)) 复制代码
格式先读到HDFS,然后再直接映射到HBase
BulkloadToHBase的套路是,你的MySQL表存在多少个字段,它插入的时候是按照(rowkey,columnName,field,fieldValue)来进行写入,所以假如我这张表有5000W条数据,表中26个字段,然后程序中设置1000个分区的时候,计算出一个task就要跑5W条数据,然后26个字段5W条数据,那就是总数130W条消息
此时可以看到每个task就要跑130W条消息了,所以对于字段多,数量大的表来说,这个任务量是非常惊人的
其实没有太多需要说明的,就是注意写入HDFS和写入HBase代码是可以分离的。
import java.io.IOException import java.sql.{DriverManager, SQLException} import java.util.Properties import com.dataserver.vzan.confmanager.JobProperties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.{ClusterConnection, ConnectionFactory, HRegionLocator, Put} import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat} import org.apache.hadoop.hbase.spark.{HBaseContext, KeyFamilyQualifier} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark._ import org.apache.spark.sql.{Row, SparkSession} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.Breaks /** * 需要传入的参数 * runType:dev or pro * MySQL表名 * HBase表名 * example:pro zbchatmsg2003 zbchatmsg2003 100000000 1000 /user/hive/bulkload * * 需要手动修改的参数为 columnFamily,默认为 info */ object BulkLoadToHBase { var path = new String var url = new String var connectProperties = new Properties() val columnName = "Id" val lowerBound = 1 var upperBound = 0 var numPartitions = 0 def main(args: Array[String]): Unit = { // 运行环境 dev or pro val runType = args(0) checkArguments(runType) // MySQL表名 val tableName = args(1) // HBase表名 val hbaseTableName = args(2) // 决定了每个task分配的数据量 upperBound = args(3).toInt numPartitions = args(4).toInt // HDFS的路径 path = args(5) val sparkConf = new SparkConf().setAppName("BulkLoadToHBase") .set("spark.shuffle.file.buffer","128") .set("spark.reducer.maxSizeInFlight","96") .set("spark.shuffle.io.maxRetries","10") val spark = SparkSession.builder().config(sparkConf).getOrCreate() val jdbcDF = spark.read.jdbc(url, tableName,columnName,lowerBound,upperBound,numPartitions,connectProperties) jdbcDF.printSchema() var fields = jdbcDF.columns // 可以通过这行代码去把不必要的字段给删除 //fields = fields.dropWhile(_ == "") val types = jdbcDF.dtypes val sc = spark.sparkContext val hadoopConfiguration: Configuration = sc.hadoopConfiguration hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName) val hbaseConf = getHBaseConf(hadoopConfiguration,runType) //表不存在则建HBase临时表 createHTable(hbaseTableName, hbaseConf) // 将DataFrame转换成BulkLoad需要的RDD形式 val data = jdbcDF.rdd.map(row => { var rowkey = new String // -----------------------这里开始写你的处理逻辑--------------------------------- // 简单把Id作为rowkey了 for (j <- 0 to (types.length-1)){ if (types.apply(j)._1 == "Id"){ rowkey = getString(types.apply(j)._2,row,j) } } fields.map(field => { val fieldValue = row.getAs[Any](field).toString (Bytes.toBytes(rowkey),Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue)))) }) }).flatMap({array =>{ (array) }}) //--------------------处理逻辑完成------------------------ //----------------------写入HDFS-------------------------- val hBaseContext = new HBaseContext(sc, hbaseConf) hBaseContext.bulkLoad(data.map(record => { val put = new Put(record._1) record._2.foreach(putValue => put.addColumn(putValue._1, putValue._2, putValue._3)) put }), TableName.valueOf(tableName), (t : Put) => putForLoad(t), path) //----------------------写入HDFS-------------------------- //----------------------写入HBase---------------------------- val conn = ConnectionFactory.createConnection(hbaseConf) val hbTableName = TableName.valueOf(hbaseTableName.getBytes()) val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn)) val realTable = conn.getTable(hbTableName) HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator) // bulk load start val loader = new LoadIncrementalHFiles(hbaseConf) val admin = conn.getAdmin() loader.doBulkLoad(new Path(path),admin,realTable,regionLocator) sc.stop() //----------------------写入HBase---------------------------- } /** * 获取 HBase相关参数 * * @param hadoopConf * @return */ def getHBaseConf(hadoopConf: Configuration,runType:String): Configuration = { val hbaseConf = HBaseConfiguration.create(hadoopConf) if (runType == "dev"){ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, JobProperties.DEV_ZOOKEEPER_QUORUM) hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, JobProperties.ZOOKEEPER_CLIENT_PORT) } else { hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, JobProperties.PRO_ZOOKEEPER_QUORUM) hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, JobProperties.ZOOKEEPER_CLIENT_PORT) } hbaseConf } /** * 创建HBase表 * @param tableName 表名 */ def createHTable(tableName: String, hBaseConf : Configuration) = { val connection = ConnectionFactory.createConnection(hBaseConf) val hBaseTableName = TableName.valueOf(tableName) val admin = connection.getAdmin if (!admin.tableExists(hBaseTableName)) { val tableDesc = new HTableDescriptor(hBaseTableName) tableDesc.addFamily(new HColumnDescriptor("info".getBytes)) admin.createTable(tableDesc) } connection.close() } /** * 根据每个字段的字段类型调用不同的函数,将字段值转换为HBase可读取的字节形式 * 解决数字导入到HBase中变成乱码的问题 * * @param value_type * @param row * @param i * @return */ def getString(value_type: String, row: Row, i: Int): String = { if (row != null && row.length != 0) { var str = "" if ("IntegerType" == value_type) { str = row.getInt(i).toString } else if ("StringType" == value_type) { str = row.getString(i) } else if ("FloatType" == value_type) { str = row.getFloat(i).toString } else if ("DoubleType" == value_type) { str = row.getDouble(i).toString } else if ("TimestampType" == value_type) { str = row.getTimestamp(i).toString } str } else "" } /** * Prepare the Put object for bulkload function. * @param put The put object. * @throws java.io.IOException * @throws java.lang.InterruptedException * @return Tuple of (KeyFamilyQualifier, bytes of cell value) */ @throws(classOf[IOException]) @throws(classOf[InterruptedException]) def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = { val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList() import scala.collection.JavaConversions._ for (cells <- put.getFamilyCellMap.entrySet().iterator()) { val family = cells.getKey for (value <- cells.getValue) { val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value)) ret.+=((kfq, CellUtil.cloneValue(value))) } } ret.iterator } def getResult(str: String): String = { val resultStr = new StringBuffer val loop = new Breaks for (i <- 0 until str.length) { val getChar = str.charAt(i) loop.breakable{ if (getChar == '-'|| getChar == ':' || getChar == '.'|| getChar == ' ') loop.break() else resultStr.append(getChar) } } resultStr.toString } /** * 检验运行时参数的方法 --- args = pro or dev * @param args */ def checkArguments(args: String): Unit ={ val runType = mutable.HashSet("dev", "pro") connectProperties.setProperty("driver","com.mysql.jdbc.Driver") connectProperties.setProperty("partitionColumn","Id") connectProperties.setProperty("fetchsize","1000") if (args == null || args.length == 0 || !runType.contains(args)) { throw new Exception("Illegal starting parameter ......") } if (args == "pro"){ url = JobProperties.pro_mysql_url connectProperties.setProperty("user",JobProperties.pro_mysql_user) connectProperties.setProperty("password",JobProperties.pro_mysql_password) } if (args == "dev"){ url = JobProperties.dev_mysql_url connectProperties.setProperty("user",JobProperties.dev_mysql_user) connectProperties.setProperty("password",JobProperties.dev_mysql_password) } } } 复制代码
中间穿插了好几次的记录,所以有一张bigtable和一张chatmsg2003的表
来到HDFS的页面,现在文件已经开始写入了
再看看Spark UI的界面
已经正常开始写入到HDFS了
这个Spark的UI的JOB每次读完MySQL往HDFS上存Bulkload文件的时候,都会自动创建16个任务,这个16的决定因素应该就是HBase中的预分区决定的
我的HBase的建表语句为:
create 'bigtable','info',SPLITS=> ['10|','20|','30|','40|','50|','60|','70|','80|','90|', 'd0|','h0|','l0|','p0|','t0|','x0|'] 复制代码
这里加起来就刚好16,有一些task是在咸鱼的,后面可能需要解决一下
而且出现了比较大的数据倾斜问题,这两个任务跑了大部分的数据,等到整个任务跑完的时候,程序就开始报错了,比如
bailing out的问题,整这个问题整了挺久,网上的解决方法比较多,我看到了一个似乎比较靠谱的是修改
hbase.hregion.max.filesize 复制代码
这个参数,这个是HFile能接受的最大的大小的,超过了就要切分,事实证明这个参数和我遇到的情况是没关联的,因为我看了我之后执行成功的表,是这样的
这就是修改了这个参数造成的文件不超过30G就不进行切分的问题
还有一个参数是
hbase.bulkload.retries.number 复制代码
他们说是切分的次数不够,所以要修改这个重试的参数,为0的时候是一直重试到成功
我看了一下集群中这个hbase.hregion.max.filesize的默认值为10,我设置成了30,然后把hbase.bulkload.retries.number设置为0,重跑程序,还是报错,当然小伙伴们千万别整个程序重跑,这太耗费时间了,我是直接把我的程序分割了,因为本身这个代码写入HDFS和写入HBase就是可以分离的
这里面还有一个小插曲,就是我把我的程序分离跑的时候,我把我的HDFS的文件给迁移到了另一个文件夹,只留一个190多M的文件下来单跑,发现我仍然报错
Exception in thread "main" java.io.IOException: Retry attempted 30 times without completing, bailing out 复制代码
我就很绝望,这个错误。
我还尝试读一张同样格式的小表,读出来到HDFS中只有57M的,程序不分离一起跑的时候,是能够在处理它自身的57M的时候顺带吃掉我的190多M的文件的,非常的奇怪,因为我那190M的文件一直留在了HDFS中,程序运行时候顺带把这个190M的文件给读到HBase中了。
为什么我会有这样操作的想法呢,理由也很简单,还记得我之前说的吧,读小表的时候这个bulkload方式是不会有任何问题的,所以就这么操作了
但是我想用相同的套路去吃剩下的那些几G,10几G的文件的时候,它也不行了,也是报上面的错误。
彻底解决这个问题是修改了预分区的数量,在修改预分区数量之前,这个程序还是有问题的,然后我把预分区的数量从16增加到22之后,就不会出现这个bailing out的问题了
现在任务已经跑完了,然后HDFS文件会被清空,然后映射过来HBase这里
还曾经遇到过一个JVM heap的问题,不过那个只要在提交 Spark 程序中的命令手动调大堆外内存即可。就是资源问题。
点进去doBulkload,这个是hbase-spark包下的源码
secureClient猜想没错的话,应该就是 RPC 的客户端的一个代理对象。应该是要像调用本地方法那样调用 hbase-spark 的包的方法
点击prepare的那个方法进来能看见两个方法
discoverLoadQueue方法使用visitBulkHFiles来遍历我们的HDFS的目录,然后对每个hfile会做一系列validation,而且单个hfile的大小不应超过HREGION_MAX_FILESIZE, 该值由参数hbase.hregion.max.filesize控制,默认为10GB。
退回去doBulkLoad,然后点进来 performBulkload
while循环的每次迭代主要执行 groupOrSplit 和 bulkLoad 两个 phase 的操作:
groupOrSplitPhase 方法
把queue中的所有文件根据目标表的region metadata进行分组,把每个文件划分到其所属region。
groupOrSplitPhase 和 bulkLoadPhase 我代码没看懂,所以我百度了
如果某个hfile的[firstkey, lastkey]不在任何region的[starkey, endkey]范围内,则将此hfile拆分成两个文件(拆分后的文件后缀为.top和.bottom),拆分的split key就是firstkey所在region的endkey。
拆分后得到的两个hfile会被封装成LQI再添加回LQI队列,这就是为什么需要一个while循环判断LQI队列是否为空。需注意,拆分后,第一个LQI肯定会在某个region范围内(除非在下次迭代加载该LQI之前目标region又发生了split),第二个LQI有可能仍需拆分。
groupOrSplitPhase完成之后,所有可加载的LQI都会被放到regionGroups中。regionGroups是一个Multimap,key为region的startkey,value为对应的LQI,一个region可对应多个LQI
我觉得逻辑应该就是这个样子
bulkLoadPhase
对于regionGroups中的每个key(即region的startkey),调用方法tryAtomicRegionLoad将其对应的所有LQI加载到目标table中。如果加载失败,则将failed LQI再加入到LQI队列中,
供下一循环检测和加载(我推测这里就是重试17,22的那个地方,regionGroups.asMap().entrySet().iterator()应该就是预分区的数量,regionGroup的iterator遍历这名字看起来也像这么回事)。tryAtomicRegionLoad方法会连接hbase region server,发送SecureBulkLoadHFilesRequest请求。
groupOrSplit和bulkLoad的操作都是通过上面创建的线程池对所有hfile并发执行的。除了这两个phase的操作外,while循环中还会检测一些异常情况:
对于doBulkLoad中while(!queue.isEmpty)循环,如果经过maxRetries次尝试后,LQI队列仍不为空,则抛出异常。maxRetries由参数hbase.bulkload.retries.number控制,默认为10 :
bulkload会将hfile的[firstkey, lastkey]和目标表region的[startkey, endkey]进行匹配,如果匹配失败则会进行文件拆分
之后可能会视情况修改或者补充,如果有问题还希望大家指正
还有一个问题就是bulkloadToHBase存在无缘无故冒出很多读请求的问题,至今我也不知道是咋回事,如果有知道的朋友,还请告知