文件目录如下
│ sql_speed_test.py │ ├─input │ data-report_in_visit_20240704.log │ resource_in_sso_20240704.log │ └─output data-report_in_visit_20240704.csv resource_in_sso_20240704.csv
目前每次做实验都要将Java中的SQL做性能测试,否则就没法通过考核,属实难崩。
sql_speed_test.py是我用python写的程序,将从Java mybatis-plus控制台产生的日志复制到data-report_in_visit_20240704.log
和resource_in_sso_20240704.log
文件中,运行程序之后output文件夹会自动输出csv文件,下面为csv文件夹详情。
data-report_in_visit_20240704.log
文件
-- 301 -- [2024-07-04 14:22:55.055] [org.apache.ibatis.logging.jdbc.BaseJdbcLogger] [http-nio-9006-exec-2] [143] [DEBUG] [traceId:] ==> SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_type -- 302 -- [2024-07-04 14:22:55.055] [org.apache.ibatis.logging.jdbc.BaseJdbcLogger] [http-nio-9006-exec-2] [143] [DEBUG] [traceId:] ==> SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic_202406 where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_type
data-report_in_visit_20240704.csv
文件
序号 | SQL功能描述 | SQL | 预估业务数据量 | 实际测试数据量 | 执行时间 | 执行结果 | 索引是否生效 | 所属项目 | 所在库 |
---|---|---|---|---|---|---|---|---|---|
1 | SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_type | t_bw_article_daily_statistic | t_bw_article_daily_statistic:5741行 | 0.419603 秒 | 462 | 是 | data-report | visit | |
2 | SELECT resource_id AS resource_id, resource_type AS resource_type, sum(IFNULL(internal_pv, pv)) AS internal_pv, sum(IFNULL(google_pv, pv)) AS google_pv, sum(IFNULL(other_pv, pv)) AS other_pv, sum(IFNULL(pv, 0)) AS pv FROM t_bw_article_daily_statistic_202406 where resource_type = 9 GROUP BY resource_id, resource_type order by resource_id, resource_type | t_bw_article_daily_statistic_202406 | t_bw_article_daily_statistic_202406:296358行 | 0.291532 秒 | 2691 | 是 | data-report | visit |
sql_speed_test.py
文件
import os import csv import re from pymysql import * import time """ 开发一个用于SQL性能测试的工具,避免一直做重复工作 将Java中的每一条SQL采用mybatis-plus-plugin插件抓出来转存至log文件中 抓住每条SQL做测试 """ def extract_query_sql_table_names(sql): # 正则表达式模式 pattern = re.compile( r"\b(?:FROM|JOIN|INTO|UPDATE|TABLE|INTO)\s+([`\"\[\]]?[a-zA-Z_][\w$]*[`\"\[\]]?)", re.IGNORECASE ) # 查找所有匹配的表名 matches = pattern.findall(sql) # 去掉引号和方括号 tables = [re.sub(r'[`"\[\]]', '', match) for match in matches] filter_tables = [] for table in tables: if "t_bw" in table: filter_tables.append(table) return filter_tables def check_index_usage(connection, sql): if not sql.startswith("select") and not sql.startswith("SELECT"): return False try: with connection.cursor() as cursor: # 使用 EXPLAIN 来获取查询计划 explain_sql = f"EXPLAIN {sql}" cursor.execute(explain_sql) explain_result = cursor.fetchall() # 打印 EXPLAIN 结果 print("EXPLAIN 结果:") for row in explain_result: print(row) # 检查每行是否使用了索引 for row in explain_result: if row[5] is not None: print(f"SQL 使用了索引: {row[5]}") return True print("SQL 未使用索引") return False finally: pass # connection.close() def count_rows(connection, table_name): try: with connection.cursor() as cursor: # 构建 SQL 语句 sql = f"SELECT COUNT(*) FROM {table_name}" # 执行 SQL 语句 cursor.execute(sql) # 获取结果 result = cursor.fetchone() # 返回结果 return result[0] except Exception as e: print(e) finally: # connection.close() pass class SqlSpeedTest: def __init__(self): self.input_dir = "./input" self.output_dir = "./output" self.databases = { "sso": { "ip": "xxxxxx", "database": "sso", "username": "xxxx", "password": "xxxx" } } def get_all_input_files(self): files = os.listdir(r'./input') # file_paths = [] # for file in files: # file_paths.append(self.input_dir + "/" + file) return files def handle_sql_log(self, project, database, lines): sql_lines = [] row_count = 1 database_info = self.databases.get(database) conn = connect(host=database_info.get("ip"), port=3306, user=database_info.get("username"), password=database_info.get("password"), database=database_info.get("database"), charset='utf8mb4') for index, line in enumerate(lines): if line.startswith("--"): continue current_sql = line.replace("\n", "") execute_info = self.execute_sql_and_get_execute_time(conn, database, current_sql) tables = extract_query_sql_table_names(current_sql) real_rows = "" for table in tables: total_rows = count_rows(conn, table) real_rows += f"{table}:{total_rows}行 " sql_line = { "row_count": row_count, "sql_description": "", "sql": current_sql, "expect_rows": ",".join(tables), "real_rows": real_rows, "execute_time": execute_info["execute_time"], "execute_rows": execute_info["execute_rows"], "index_has_work": execute_info["index_has_work"], "project": project, "project": project, "database": database } sql_lines.append(sql_line) row_count += 1 conn.close() return sql_lines def execute_sql_and_get_execute_time(self, conn, database, sql): print(f"==================> {database}库正在执行SQL: {sql}") # 记录开始时间 try: cs = conn.cursor() # 获取光标 start_time = time.time() cs.execute(sql) rows = cs.fetchall() # 记录结束时间 end_time = time.time() # 计算执行时间 execution_time = end_time - start_time conn.commit() print(f"======>{database}库共花费{execution_time:.6f}秒执行完毕,{sql}") except Exception as e: print(e) return {"execute_rows": "", "execute_time": "", "index_has_work": ""} index_has_work = check_index_usage(conn, sql) return {"execute_rows": len(rows), "execute_time": f"{execution_time:.6f} 秒", "index_has_work": "是" if index_has_work else "否"} def handle_log_file(self, filename): with open(self.input_dir + "/" + filename, "r", encoding="utf-8") as file: lines = file.readlines() pre_filename = filename.split(".")[0] with open(self.output_dir + "/" + pre_filename + ".csv", "w", newline='', encoding='utf-8-sig') as f: writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) csv_title = ["序号", "SQL功能描述", "SQL", "预估业务数据量", "实际测试数据量", "执行时间", "执行结果", "索引是否生效", "所属项目", "所在库"] writer.writerow(csv_title) info = pre_filename.split("_in_") project_name = info[0] database_name = info[1].split("_")[0] sql_lines = self.handle_sql_log(project_name, database_name, lines) for sql_line in sql_lines: write_line = [sql_line["row_count"], sql_line["sql_description"], sql_line["sql"], sql_line["expect_rows"], sql_line["real_rows"], sql_line["execute_time"], sql_line["execute_rows"], sql_line["index_has_work"], sql_line["project"], sql_line["database"]] writer.writerow(write_line) def do_work(self): files = self.get_all_input_files() for file in files: self.handle_log_file(file) if __name__ == '__main__': sql_speed_test = SqlSpeedTest() sql_speed_test.do_work()
编程精选网(www.codehuber.com),程序员的终身学习网站已上线!
如果这篇【文章】有帮助到你,希望可以给【JavaGPT】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【JavaGPT】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!
本文由博客一文多发平台 OpenWrite 发布!