整体结构
Config
package com.fuwei.bigdata.profile.conf import org.slf4j.LoggerFactory import scopt.OptionParser case class Config( env:String = "", username:String = "", password:String = "", url:String = "", cluster:String = "", startDate:String = "", endDate:String = "", proxyUser:String = "", topK:Int = 25 ) object Config{ private val logger = LoggerFactory.getLogger("Config") /** * 将args参数数据封装Config对象中 */ def parseConfig(obj:Object,args:Array[String]):Config = { //1、通过我们的类名获取到程序名 val programName: String = obj.getClass.getSimpleName.replaceAll("\\$", "") //2、获取到一个解析器,解析器解析参数 val parser = new OptionParser[Config]("spark sql "+programName) { //2.1添加使用说明 head(programName,"v1.0") //就相当于抬头 //2.2给env属性赋值 //这种效果就是-v或者--v ,后面的text()是说明的内容 opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod") opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username") programName match { case "LabelGenerator" => { logger.info("LabelGenerator") opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username") opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password") opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url") opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster") } case _ => } } parser.parse(args,Config()) match { //这个主要作用是解析参数,看参数中有没有值 case Some(conf) => conf case None => { logger.error("can not parse args") System.exit(-1) null } } } }
LabelGenerator
package com.fuwei.bigdata.profile import com.qf.bigdata.profile.conf.Config import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.slf4j.LoggerFactory /** * 生成基础画像标签的类 */ object LabelGenerator { private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //1、解析参数 val params: Config = Config.parseConfig(LabelGenerator, args) //2、获取SparkSession val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName) //val spark: SparkSession = SparkUtils.getSparkSession("dev", "test") import spark.implicits._ //3、读取归属地数据 val df: DataFrame = spark.read.option("sep", "\t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types") df.createOrReplaceTempView("phone_info") //构建一个虚表 //4、baseFeatrueSql val userSql = """ |select |t1.distinct_id as uid, |t1.gender, |t1.age, |case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile, |case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix, |t2.model |from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2 |on t1.distinct_id = t2.uid |""".stripMargin val userDF: DataFrame = spark.sql(userSql) userDF.createOrReplaceTempView("user_info") //4.2baseFeatureSql val baseFeatureSql = """ |select |t1.uid, |t1.gender, |t1.age, |t1.email_suffix, |t1.model, |concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region |from user_info as t1 left join phone_info as t2 |on |t2.phone = substring(t1.mobile,0,7) |""".stripMargin //4.3、建表 spark.sql( """ |create table if not exists dws_news.user_profile_base( |uid string, |gender string, |age string, |email_suffix string, |model string, |region string |) |stored as parquet |""".stripMargin) //4.4 查询生成df val baseFeaturedDF: DataFrame = spark.sql(baseFeatureSql) baseFeaturedDF.cache() //对查询的数据进行持久化内存中,用完之后要关闭 //把查询的数据导入到数据表中(查询生成df数据到HDFS) baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base") //5、把数据保存到clickhouse:1.meta:(filename,pl),2.占位符 val meta = TableUtils.getClickHouseUserProfileBaseTable(baseFeaturedDF,params) //6、插入ClickHouse数据 //6.1插入的sql val insertCHSql = s""" |insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATABASE}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2}) |""".stripMargin logger.warn(insertCHSql) //6.2按分区插入数据到clickhouse的表 baseFeaturedDF.foreachPartition(partition => { TableUtils.insertBaseFeaturedTable(partition,insertCHSql,params) }) baseFeaturedDF.unpersist()//关闭持久化 //7、释放资源 spark.stop() logger.info("job has success") } }
ClickHouseUtils
package com.fuwei.bigdata.profile.utils import ru.yandex.clickhouse.ClickHouseDataSource import ru.yandex.clickhouse.settings.ClickHouseProperties object ClickHouseUtils { /** * 连接clickhouse * @param username * @param password * @param url * @return */ def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = { Class.forName("ru.yandex.clickhouse.ClickHouseDriver") val properties = new ClickHouseProperties() properties.setUser(username) properties.setPassword(password) val dataSource = new ClickHouseDataSource(url, properties) dataSource } /** *把类型转化并返回为age String, gender String */ def df2TypeName2CH(dfCol: String): Unit ={ dfCol.split(",").map(line => { val fields: Array[String] = line.split(" ") val fName: String = fields(0) val fType: String = dfType2chType(fields(1)) //开始类型的转换 fName +" "+ fType //最后结果变成为age String, gender String }).mkString(",") } /** * 将df的type转换成clickhouse的type * * @param fieldType * @return */ def dfType2chType(fieldType: String):String = { fieldType.toLowerCase() match { case "string" => "String" case "integer" => "Int32" case "long" => "Int64" case "bigint" => "Int64" case "double" => "Float64" case "float" => "Float32" case "timestamp" => "Datetime" case _ => "String" } } }
SparkUtils(这个连接以后可以通用)
package com.fuwei.bigdata.profile.utils import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory object SparkUtils { private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName) def getSparkSession(env:String,appName:String):SparkSession = { val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.sql.hive.metastore.version", "1.2.1") .set("spark.sql.cbo.enabled", "true") .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true") .set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER") env match { case "prod" => { conf.setAppName(appName+"_prod") SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() } case "dev" => { conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven") SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() } case _ => { logger.error("not match env") System.exit(-1) null } } } }
TableUtils
package com.fuwei.bigdata.profile.utils import com.qf.bigdata.profile.conf.Config import org.apache.spark.sql.types.{IntegerType, LongType, StringType} import org.apache.spark.sql.{DataFrame, Row} import org.slf4j.LoggerFactory import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource} import java.sql.PreparedStatement /** * @author:lifuwei * @time:2022-01-07 * @params:这个类主要是用于把在hive中读取的数据存储到clickhouse中 */ object TableUtils { /** * 向clickhouse中插入数据 * @param partition * @param insertCHSql * @param params */ def insertBaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = { //1、获取到Clickhouse的数据源 val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url) val connection: ClickHouseConnection = dataSource.getConnection val ps: PreparedStatement = connection.prepareStatement(insertCHSql) //插入数据 var batchCount = 0 val batchSize = 2000 var lastBatchTime = System.currentTimeMillis() //2、填充占位符对应的参数值 partition.foreach(row => { var index = 1//设置值的索引下标 row.schema.fields.foreach(field => { field.dataType match { case StringType => ps.setString(index,row.getAs[String](field.name)) case LongType => ps.setLong(index,row.getAs[Long](field.name)) case IntegerType => ps.setInt(index,row.getAs[Int](field.name)) case _ => logger.error(s"type is err,${field.dataType}") } index +=1 }) //3、添加到批 ps.addBatch() batchCount += 1 //4、控制批次大小 var currentTime = System.currentTimeMillis() if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){ ps.executeBatch()//执行一批 logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s") batchCount = 0 lastBatchTime = currentTime } }) //5、控制如果没有满足以上条件的时候循环结束之后立刻执行ps中的数据 ps.executeBatch() logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s") //6、释放资源 ps.close() connection.close() } private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName) /** * 根据dataframe生成clickhouse中的表 * @param baseFeaturedDF : dataframe * @param params : 数据值 * @return 返回的dataframe各个的列的名称和占位符 */ /* * baseFeaturedDF的DF的schema * fieldName:uid,gender,age,region,model,email_suffix * fieldType:string,string,string,string,string,string * * 我们需要插入数据的形式是 * insert user_profile_base into value(?,?,?,?,?,?) * * 所以我们需要在这里面获得三个东西,第一个就是参数,第二个即使参数类型,第三个就是插入的值 * */ val USER_PROFILE_CLICKHOUSE_DATABASE = "app_news" //创建的数据库 val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" //创建的表 def getClickHouseUserProfileBaseTable(baseFeaturedDF: DataFrame, params: Config ):(String,String)= { //schema就是获取表的所有元数据(包括以上三个) //foldLeft是折叠函数 /* * baseFeaturedDF.schema : 获取df的整体架构 * baseFeaturedDF.schema.fields :把整体架构封装带一个数组中 * baseFeaturedDF.schema.fields.foldLeft : 对这个数组进行折叠 * ("","","") :这个表明是输入的初始值 * */ val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")( (z,f) => { //我们要返回的数据类型是:(age,gender , age string, gender string, ?,?) if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){ //说明不是第一次拼接 (z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?") }else{ (f.name,f.name+" "+ f.dataType.simpleString,"?") } } ) /* * 4、将spark的表达式转换为clickhouse的表达式 * 在spark中的string,但是在clickhouse中是String * 最终得出来的结果是age String,gender String ...... * */ val chCol = ClickHouseUtils.df2TypeName2CH(fieldType) //5、获取到连接到ch的cluster val cluster:String = params.cluster //6、创建数据库 val createCHDataBaseSql = s""" |create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATABASE} |""".stripMargin //7、创建表 /* * ENGINE = MergeTree():在clickhouse中需要使用引擎engine ,这里我们使用合并树引擎MergeTree() * */ val createCHTableSql = s""" |create table ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol}) |ENGINE = MergeTree() |ORDER BY(uid) |""".stripMargin //8、删除表的SQL val dropCHTableSql = s""" |drop table if exists ${USER_PROFILE_CLICKHOUSE_DATABASE}.${USER_PROFILE_CLICKHOUSE_TABLE} |""".stripMargin //9、连接clickhouse val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url) val connection: ClickHouseConnection = dataSource.getConnection logger.warn(createCHDataBaseSql) var ps: PreparedStatement = connection.prepareStatement(createCHDataBaseSql)//建库 ps.execute() logger.warn(dropCHTableSql) ps = connection.prepareStatement(dropCHTableSql) //删表 ps.execute() logger.warn(createCHTableSql) ps = connection.prepareStatement(createCHTableSql)//建表 ps.execute() ps.close() connection.close() logger.info("success!!!!!!!!!") (fileName,pl) } }
xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.fuwei.bigdata</groupId> <artifactId>user-profile</artifactId> <version>1.0-SNAPSHOT</version> <properties> <scala.version>2.11.12</scala.version> <play-json.version>2.3.9</play-json.version> <maven-scala-plugin.version>2.10.1</maven-scala-plugin.version> <scala-maven-plugin.version>3.2.0</scala-maven-plugin.version> <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version> <spark.version>2.4.5</spark.version> <scope.type>compile</scope.type> <json.version>1.2.3</json.version> <!--compile provided--> </properties> <dependencies> <!--json 包--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${json.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>${scala.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>com.github.scopt</groupId> <artifactId>scopt_2.11</artifactId> <version>4.0.0-RC2</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark-bundle_2.11</artifactId> <version>0.5.2-incubating</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.hankcs</groupId> <artifactId>hanlp</artifactId> <version>portable-1.7.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>${spark.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1</version> <scope>${scope.type}</scope> <exclusions> <exclusion> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> </dependencies> <repositories> <repository> <id>alimaven</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <updatePolicy>never</updatePolicy> </releases> <snapshots> <updatePolicy>never</updatePolicy> </snapshots> </repository> </repositories> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>${maven-assembly-plugin.version}</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>${scala-maven-plugin.version}</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-archetype-plugin</artifactId> <version>2.2</version> </plugin> </plugins> </build> </project>
测试
##1. 将core-site.xml\yarn-site.xml\hive-site.xml拷贝到工程resources目录下 ##2. clean and package ##3. hive metastore服务必须开 ##4. yarn/hdfs必须要开 ##5. clickhouse/chproxy也要打开 ##6. 编写提交jar包的spark脚本 ${SPARK_HOME}/bin/spark-submit \ --jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar \ --conf spark.sql.hive.convertMetastoreParquet=false \ --conf spark.executor.heartbeatInterval=120s \ --conf spark.network.timeout=600s \ --conf spark.sql.catalogImplementation=hive \ --conf spark.yarn.submit.waitAppCompletion=false \ --name log2hudi \ --conf spark.task.cpus=1 \ --conf spark.executor.cores=4 \ --conf spark.sql.shuffle.partitions=50 \ --master yarn \ --deploy-mode cluster \ --driver-memory 1G \ --executor-memory 3G \ --num-executors 1 \ --class com.qf.bigdata.profile.LabelGenerator \ /data/jar/user-profile.jar \ -e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1 ##7. 通过clickhouse-client去测试 clickhouse-client --host 10.206.0.4 --port 9999 --password qwert