经常会有这样的需求:在现有数仓表的基础上,写一些sql,然后生成hive表并同步到mysql。
次数多了,就像写一个工具完成这个工作
1.数仓使用hive存储,datax导数据、airflow调度 2.不知道怎么利用hive解析sql,拿到对应的schema,但是spark知道 spark.sql(sql).schema.toList所以就用了scala
就是通过配置完成hive,mysql的建表,airflow调度任务的生成
1.配置mysql链接 2.根据输入sparksql,生成对应的hive,mysql表结构,建表 3.生成airflow调度任务(插入hive数据,调用datax同步数据到mysql)
MysqlToHive.properties
jdbcalias:ptx_read #mysql别名要和同步的数据库的别名保持一致 table:be_product #要同步的表名 owner=owner ##airflow任务的owner lifecycle=180 ##hive表的生命周期,数据数据产品删除数据 airflowpath=/airflow/dags/ods/ ##生成airflow任务文件的路径 jdbc1alias : hive ##可以写多个mysql链接,不用一个来回改 jdbc1host : 127.0.0.1 jdbc1port : 3306 jdbc1user : root jdbc1passwd : ** jdbc1db_name : test jdbc2alias:read jdbc2host : 127.0.0.1 jdbc2port : 3306 jdbc2user : root jdbc2passwd :** jdbc2db_name :test
MysqlToHive.java
object HiveToMysql { //mysql配置内部类 case class Database(host: String,port: Int,user: String,passwd: String,db_name: String){} //读取配置文件 def readDbPropertiesFile(fileName: String,spark:SparkSession,sql: String): Unit = { val pp = new Properties val fps = new FileInputStream("HiveToMysql.properties") // val fps = Thread.currentThread.getContextClassLoader.getResourceAsStream(fileName) pp.load(fps) parseProperties(pp,spark,sql) fps.close() } //解析配置文件对应配置 def parseProperties(pp: Properties,spark:SparkSession,sql: String): Unit = { val table = pp.getProperty("table") val owner = pp.getProperty("owner") val lifecycle = pp.getProperty("lifecycle") val jdbcalias = pp.getProperty("jdbcalias") val airflowpath = pp.getProperty("airflowpath") import scala.collection.mutable.ArrayBuffer var tableColumn: ArrayBuffer[String] = new ArrayBuffer[String](); var dbindex = 1 while (pp.getProperty("jdbc" + dbindex + "alias") != null && !pp.getProperty("jdbc" + dbindex + "alias").equals(jdbcalias)) { dbindex += 1 } var database = new Database(pp.getProperty("jdbc" + dbindex + "host"),pp.getProperty("jdbc" + dbindex + "port").toInt, pp.getProperty("jdbc" + dbindex + "user"),pp.getProperty("jdbc" + dbindex + "passwd"),pp.getProperty("jdbc" + dbindex + "db_name")) val mysqlSelectBuilder = new StringBuilder val schemaList = spark.sql(sql).schema.toList //sparksql 利用schema生成hive建表语句和mysql建表语句 for ( i <- 0 until schemaList.length ) { println(schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName) tableColumn += (schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName) mysqlSelectBuilder.append(schemaList.apply(i).name+",") } mysqlSelectBuilder.deleteCharAt(mysqlSelectBuilder.length - 1) buildExecuteHiveSql(table,tableColumn,lifecycle,owner) buildExecuteMysql(table,tableColumn,database); printAirflowJob(airflowpath,table,owner,jdbcalias,mysqlSelectBuilder.toString(),sql: String) } //airflow封装太多了,就不写了 def printAirflowJob(airflowpath:String,table:String,owner:String,jdbcalias:String,mysqlSelect:String,sql:String){ val db = table.substring(0, table.indexOf(".")); val tableNoDatabase = table.substring(table.indexOf(".") + 1); System.out.println(airflowpath +db+"/"+ tableNoDatabase) if (new File(airflowpath +db+"/"+ tableNoDatabase).exists()) System.out.println("folder exist,please delete the folder " + airflowpath +db+"/"+ tableNoDatabase) else { val dir = new File(airflowpath +db+"/"+ tableNoDatabase); dir.mkdirs(); val pw = new PrintWriter(airflowpath +db+"/"+ tableNoDatabase + "/" + tableNoDatabase + "_dag.py") pw.println("import airflow"); pw.println("from airflow import DAG"); pw.println(")"); pw.println(""); pw.println(""); pw.flush() pw.close() } } @throws[IOException] def buildExecuteMysql(table:String,tableColumn:ArrayBuffer[String],database:Database): Unit = { val mysqlSqlBuilder = new StringBuilder mysqlSqlBuilder.append("CREATE TABLE " + table.substring(table.indexOf(".")+1)+ " ( \n") mysqlSqlBuilder.append("dt varchar(10) DEFAULT NULL,"+"\n") println("tableColumnCopy"+tableColumn.size) val tableColumnCopy = tableColumn.toArray[String]; for (i <- 0 until tableColumnCopy.size) { val fieldAndType = tableColumnCopy.apply(i).split("\\|") mysqlSqlBuilder.append(fieldAndType(0)+ " ") if (fieldAndType(1).contains("integer") || fieldAndType(1).contains("long")) mysqlSqlBuilder.append(" bigint(10)") else if (fieldAndType(1).contains("float") || fieldAndType(1).contains("double") || fieldAndType(1).contains("decimal")) mysqlSqlBuilder.append(" decimal(36,6)") else if (fieldAndType(1).contains("string") ) mysqlSqlBuilder.append(" varchar(36)") else if (fieldAndType(1).contains("boolean") ) mysqlSqlBuilder.append(" boolean") else if (fieldAndType(1).contains("date") || fieldAndType(1).contains("timestamp") ) mysqlSqlBuilder.append(" varchar(36)") mysqlSqlBuilder.append(" DEFAULT NULL," +"\n") } mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和, mysqlSqlBuilder.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4") System.out.println(mysqlSqlBuilder.toString) Class.forName("com.mysql.cj.jdbc.Driver") var con = DriverManager.getConnection("jdbc:mysql://" + database.host + ":" + database.port + "/" + database.db_name + "?serverTimezone=UTC", database.user, database.passwd) var st = con.createStatement st.execute(mysqlSqlBuilder.toString) st.close();con.close(); } @throws[IOException] @throws[InterruptedException] def buildExecuteHiveSql(table:String,tableColumn:ArrayBuffer[String],lifecycle:String,owner:String): Unit = { val mysqlSqlBuilder = new StringBuilder mysqlSqlBuilder.append("CREATE TABLE " + table+ " ( \n") println("tableColumnCopy"+tableColumn.size) val tableColumnCopy = tableColumn.toArray[String]; for (i <- 0 until tableColumnCopy.size) { val fieldAndType = tableColumnCopy.apply(i).split("\\|") if(fieldAndType.apply(1).contains("integer") || fieldAndType.apply(1).contains("long")) mysqlSqlBuilder.append(fieldAndType.apply(0)+" bigint,") else if(fieldAndType.apply(1).contains("float") || fieldAndType.apply(1).contains("double") || fieldAndType.apply(1).contains("decimal")) mysqlSqlBuilder.append(fieldAndType.apply(0)+" "+"double ,") else if(fieldAndType.apply(1).contains("string")) mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,") else if(fieldAndType.apply(1).contains("boolean")) mysqlSqlBuilder.append(fieldAndType.apply(0)+" boolean,") else if(fieldAndType.apply(1).contains("date")||fieldAndType.apply(1).contains("timestamp")) mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,") mysqlSqlBuilder.append("\n") } mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和, mysqlSqlBuilder.append(") PARTITIONED BY ( dt string COMMENT '(一级分区)' ) \n") mysqlSqlBuilder.append("ROW FORMAT DELIMITED STORED AS PARQUET \n") mysqlSqlBuilder.append("TBLPROPERTIES ('lifecycle'='" + lifecycle + "','owner'='" + owner + "','parquet.compression'='snappy');") System.out.println(mysqlSqlBuilder.toString) val process = new ProcessBuilder("hive", "-e", "\"" + mysqlSqlBuilder.toString + "\"").redirectErrorStream(true).start val br = new BufferedReader(new InputStreamReader(process.getInputStream)) var line = "" do { line = br.readLine() Thread.sleep(1000) println(line) }while(line!=null) process.waitFor } def main(args: Array[String]): Unit = { // val sparkconf = new SparkConf().setAppName("test_Spark_sql").setMaster("local[2]") // val spark = SparkSession.builder().config(sparkconf).config("spark.driver.host", "localhost").getOrCreate() val spark= SparkSession.builder.appName("HiveToMysql").enableHiveSupport().getOrCreate() readDbPropertiesFile("HiveToMysql.properties",spark,args(0)) } }
#!/bin/bash mv bigData.jar . mv HiveToMysql.properties . sql=`cat /sql` spark-submit \ --class HiveToMysql \ --master yarn \ --deploy-mode client \ --num-executors 1 \ --executor-memory 4g \ --executor-cores 1 \ --driver-memory 1g \ --name "HiveToMysql" \ --conf spark.speculation=true \ --conf spark.speculation.interval=30000 \ --conf spark.speculation.quantile=0.8 \ --conf spark.speculation.multiplier=1.5 \ --conf spark.dynamicAllocation.enabled=false \ --files HiveToMysql.properties \ --jars fastjson-1.2.62.jar,mysql-connector-java-8.0.18.jar \ bigData.jar "$sql"
1.scala比较烂,代码比较难阅读 2.调度的时间一样(可做需改) 3.数据类型的处理,根据业务需求