多租户平台存在同一个表有多个租户具有访问权限,hive提供锁机制,保障数据的安全。由此会引发由于某个租户占用锁时间较长,其他租户作业滞后,如果将导致表锁的SQL和租户信息截取到,可以提供给局方进行业务流程的优化,加快作业执行效率。本文档记录一个可实现的方法供参考
场景:hive使用分布式协调服务(Zookeeper)提供的分布式锁,hive底层由mapreduce执行,yarn统一资源调度,本文档由python语言实现
# xxxx # 2021/11/4 8:54 import configparser import os import time from kazoo.client import KazooClient import ConnMysqlTmpl ''' #获取zookeeper集群信息 ZookeeperInfo.ini文件为: [zookeeper-conn] url=zk_host_name:port ''' def zookeeper_info(): cf = configparser.ConfigParser() pass1 = os.path.dirname(os.path.abspath('.')) cf.read(pass1 + "/config/ZookeeperInfo.ini") items = cf.items("zookeeper-conn") list = [] for line in items: list.append(line[1]) return list ''' #定义一个zookeeper类,实现zk集群的链接和关闭 ''' class Zookeeper: def connection(self,zk_url): self.zk_url=zk_url try: self.conn_path=KazooClient(self.zk_url) self.conn_path.start() return self.conn_path except Exception as e: print(f"SSH连接异常, 错误如下: {e}") def conn_close(self): self.conn_path.stop() print("已关闭Zookeeper连接...") ''' #获取需要监控的表 HiveMonLockInfo文件格式随意,只要在zk中拿到需要监控的表路径即可 ''' def monitor_info(): filename = 'datainfo/HiveMonLockInfo' if os.path.exists(filename): with open(filename, 'r', encoding='utf8') as rfile: lines = rfile.readlines() return lines else: print("未找到文件:【 {0} 】信息,请联系后台管理员处理".format(filename)) ''' #获取表锁列表 ''' def get_lock(): zk=Zookeeper() zk_info = zookeeper_info() zk_conn=zk.connection(zk_info[0]) table_info=monitor_info() locks = [] for table in table_info: if zk_conn.exists("/hive_zookeeper_namespace/" + table.rstrip('\n')): lock_name = zk_conn.get_children("/hive_zookeeper_namespace/" + table.rstrip('\n')) for lock in lock_name: if "LOCK" in lock: locks.append("/hive_zookeeper_namespace/"+table.rstrip('\n')+"/"+lock) else: pass else: print("/hive_zookeeper_namespace/" + table.rstrip('\n'), "无锁情况", get_time()) zk.conn_close() return locks ''' #获取每个表锁的具体信息 table_locks_info为入库执行的语句 ''' def get_lock_info(): zk = Zookeeper() zk_info = zookeeper_info() zk_conn = zk.connection(zk_info[0]) all_locks = get_lock() print(all_locks) if all_locks: for path in all_locks: #获取所有锁node信息 all_info=zk_conn.get(path) #获取query_id和sql语句 all_sql_info = str(all_info).split("ZnodeStat")[0] query_id = (all_sql_info.split(":")[0])[3:] sql_info_tmp = all_sql_info.split(":")[3] sql_info_tmp1=sql_info_tmp.replace("\n","\\n") sql_info=sql_info_tmp1.replace("\'","\\'") #获取节点创建信息 time_info = str(all_info).split("ZnodeStat")[1] create_time = time_info.split(",")[2] #时间戳转换 timeStamp = int(create_time.split("=")[1]) ''' 爬取下来的时间戳长度都是13位的数字,而time.localtime的参数要的长度是10位,所以我们需要将其/1000并取整即可 ''' timeArray = time.localtime(timeStamp/1000) otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) try: ConnMysqlTmpl.table_locks_info(query_id,sql_info,otherStyleTime,path.split("/")[3]) except Exception as e: print(f"执行SQL失败,请排查:{e}") else: print("所监控的表无锁情况~", get_time()) zk.conn_close() def get_time(): create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return create_time if __name__ == '__main__': get_lock_info()
#获取所有锁node信息
#获取query_id和sql语句 all_sql_info = str(all_info).split("ZnodeStat")[0] query_id = (all_sql_info.split(":")[0])[3:] sql_info_tmp = all_sql_info.split(":")[3] sql_info_tmp1=sql_info_tmp.replace("\n","\\n") sql_info=sql_info_tmp1.replace("\'","\\'")
/SQL语句和SQL语句的query_id/
import requests import configparser import os import AppSourceInfo import ConnMysqlTmpl from json import JSONDecodeError from requests.auth import HTTPBasicAuth ''' #获取配置信息:访问url ''' def get_app_info(): cf=configparser.ConfigParser() pass1=os.path.dirname(os.path.abspath(".")) cf.read(pass1 + "/config/AppExecSqlInfo.ini") items = cf.items("appexecsql-conn") list = [] for line in items: list.append(line[1]) return list ''' #获取运行状态下作业的query_id和租户 ''' def job_info(): conn_info=get_app_info() app_ids = AppSourceInfo.get_each_app1() if app_ids: for line in app_ids: try: all_info_tmp=requests.get(conn_info[0]+line+"/ws/v1/mapreduce/jobs/"+str(line).replace("application","job")+"/conf", auth=HTTPBasicAuth(conn_info[1],conn_info[2])) try: info_json=all_info_tmp.json() result=info_json["conf"] all_info=result["property"] list=[] for line in all_info: ''' if line["name"] == "hive.query.string": #list.append(str(line["value"])) sql_info_tmp = line["value"] if sql_info_tmp: sql_info_tmp1 = sql_info_tmp.replace("\n", "\\n") sql_info = sql_info_tmp1.replace("\'", "\\'") list.append(sql_info) else: sql_info = "0" list.append(sql_info) elif line["name"] == "mapreduce.jdbc.input.query": #list.append(str(line["value"])) jdbc_query_tmp = line["value"] #print(type(jdbc_query_tmp)) if len(jdbc_query_tmp): jdbc_query_tmp1 = jdbc_query_tmp.replace("\n", "\\n") jdbc_query = jdbc_query_tmp1.replace("\'", "\\'") list.append(jdbc_query) else: jdbc_query = "0" list.append(jdbc_query) ''' #获取query_id if line["name"] == "hive.query.id": #list.append(str(line["value"])) query_id_tmp = line["value"] if query_id_tmp: query_id = line["value"] list.append(query_id) else: query_id = "0" list.append(query_id) #获取执行作业的队列名称,就能对应到租户 elif line["name"] == "mapreduce.job.queuename": #list.append(str(line["value"])) user_info = (line["source"])[1] user = str(user_info).split("/") queue_name = user[4] list.append(queue_name) else: pass print(list) print(len(list)) try: pass ConnMysqlTmpl.user_app_query_info1(list) except Exception as e: print(f"数据入库失败,请排查:{e}") except JSONDecodeError as e: print("{0}作业非MapReduce任务,请排查~".format(line)) except Exception as e: print("获取app信息失败,请排查 {0}".format(e)) else: print("获取app_id信息失败,请排查~")
· 执行前(ACCEPTED)状态下的作业,内容编码存在异常,无法通过yarn rest api进行读取
· 执行完成的作业(FINISHED)的作业无异常情况下都会及时的释放锁,如果集群开始了日志服务,可以访问jobhistoryserver服务读取对应日志
获取运行状态下作业的信息,链接为:
/http://resouecemanage:port/applicationID/ws/v1/mapreduce/jobs/jobId/conf/
将返回的数据进行json格式转换:
info_json=all_info_tmp.json() result=info_json["conf"] all_info=result["property"]
获取query_id:
if line["name"] == "hive.query.id": #list.append(str(line["value"])) query_id_tmp = line["value"]
获取资源队列名称:
line["name"] == "mapreduce.job.queuename": #list.append(str(line["value"])) user_info = (line["source"])[1] user = str(user_info).split("/") queue_name = user[4] list.append(queue_name)
SELECT a.query_id, a.queue_name, b.sql_info, b.table_name, b.ctime FROM user_app_query_info a JOIN zookeeper_lock_info b WHERE a.query_id = b.query_id 两个表根据query_id进行关联,获取到锁表情况下,锁表的SQL语句和租户信息。