效果图
目录结构说明:以下这些目录必须存在。
效果说明:
导出指定表的数据到指定目录,并输出两个目录下的同一文件里的数据以供比较。
yaml配置文件说明:
html页面代码:
<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>json串转换</title> </head> <div> 所有的表:{{msg3}}<br><br> <form action="/" method="POST" name="form1" action="tijiao.asp" onSubmit="return checkform(this)"> 表名:<input type="text" placeholder="表名" name="test_table_name" /> <br></br> 选项:</br> <input type="radio" name="choice" value="pre">导出交易/清算前的数据【test目录】</br> <input type="radio" name="choice" value="fin">导出交易/清算后的数据【test1目录】</br> <input type="radio" name="choice" value="com">对比数据</br> <br></br> <input type="submit" value="提交" id="input2"/> </form> </div> <div class="contant"> <p class="top-title">对比的数据</p> </div> <div id="content"> {{msg}} {{msg2}} </div> <style type="text/css"> #input1{ height: 20px; width: 1600px; border-radius:12px ; } #input2{ height: 30px; width: 700px; border-radius:12px ; } </style> </body> <script type="text/javascript"> function formatStr(msg) { var mainStr = ""; var remark="格式错误,请重新输入"; mainStr += '<table border="1" cellspacing="0" bordercolor="#ADADAD">'; msg=msg.split("|"); for (var i=0; i<msg.length; i++){ line_arr = msg[i].split(",") mainStr += "<tr>"; for(var j=0; j<line_arr.length; j++){ mainStr += "<th>" + line_arr[j] + "</th>"; } mainStr += "<tr>"; } //msg.forEach(e => { //mainStr += "<td>" + `${e}`.replace('[','').replace(']','').replace('\'','').replace('\'','') + "</td>" //}) //mainStr += "</tr>"; return mainStr; } document.getElementById('content').innerHTML = formatStr(document.getElementById('content').innerHTML); </script> </html> python代码:
# --*-- coding:utf8 --*-- import pymysql, xlwt,xlrd import pandas as pd import openpyxl import csv import json import requests from flask_script import Manager from flask import Flask, request, jsonify, render_template import yaml import os import shutil import datetime def load_config(): f = open('conf2.yaml', 'r', encoding='utf-8') cfg = f.read() conf = yaml.load(cfg,Loader=yaml.FullLoader) # 读取配置文件 return conf def backup(back_path): #file_path_and_name = os.path.abspath(__file__)#获取执行程序所在目录及程序名 file_full_path = os.path.dirname(os.path.abspath(__file__))#获取程序所在目录 print('file_full_path',file_full_path) bc_serial_no = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f') tar_path=file_full_path+'\\backup\\'+back_path+'\\test_backup_'+bc_serial_no #自定义文件夹目录 os.mkdir(tar_path)#创建文件夹 primary_dir=str(file_full_path)+'\\test' # 遍历primary_dir下所有文件,包括目录 files=os.listdir(primary_dir) # print("遍历的结果:",files) for i in files: primary_dir=str(primary_dir) tar_path=str(tar_path) #print("目标目录222",tar_path) old=os.path.join(primary_dir,i)#源文件夹字符串拼接 new=os.path.join(tar_path,i)#目标文件夹字符串拼接 # print('源目录是:', old) # print('备份目录是:', new) if os.path.exists(old):#如果文件不存在,存在了就不拷贝了 print("----文件备份----") shutil.copyfile(old,new) def html_data_format(data): res_str = "" if data=='交易/清算前数据导出完成': res_str=data elif data=='交易/清算后数据导出完成': res_str=data elif data=='请输入表名': res_str = data elif data=='test目录下不存在该表': res_str = data elif data == 'test1目录下不存在该表': res_str = data elif data=='请做出选择': res_str = data else: if len(data) > 0: for res_item in data: res_str += ','.join(["%s" % i for i in res_item]) res_str += "|" return res_str def export_excel(table_name,sql,path,cfg): # 连接数据库,查询数据 host, user, passwd, db = cfg['host'], cfg['user'], cfg['passwd'], cfg['db'] conn = pymysql.connect(user=user, host=host, port=33061, passwd=passwd, db=db, charset='utf8') cur = conn.cursor() # sql = 'select * from %s' % table_name cur.execute(sql) # 返回受影响的行数 fields = [field[0] for field in cur.description] # 获取所有字段名 all_data = cur.fetchall() # 所有数据 # 写入excel book = xlwt.Workbook() sheet = book.add_sheet('sheet1') for col, field in enumerate(fields): sheet.write(0, col, field) row = 1 for sql_data in all_data: for col, field in enumerate(sql_data): sheet.write(row, col, field) row += 1 #book.save("C:/%s/%s.xls" % (path,table_name)) book.save("./%s/%s.xls" % (path, table_name)) app = Flask(__name__) @app.route("/", methods=["GET", "POST"]) def get_sum(): table = cfg['table_data'] show=[] for table_name, sql in table.items(): #print(table_name, sql) show.append(table_name) print("show is:",show) if request.method == "POST": choice = '' test_table_name='' for k, v in request.form.to_dict().items(): if k == 'choice': choice = str(v) elif k=='test_table_name': test_table_name=str(v) if choice == 'pre': path = 'test' backup(path) for table_name, sql in table.items(): export_excel(table_name, sql, path,cfg) data = '交易/清算前数据导出完成' data_new = '!' elif choice == 'fin': path = 'test1' backup(path) for table_name, sql in table.items(): export_excel(table_name, sql, path,cfg) data='交易/清算后数据导出完成' data_new='!' elif choice == '': data = '请做出选择' data_new = '!' elif choice == 'com': if test_table_name=='': data='请输入表名' data_new='!' else: k1 = test_table_name+'.xls' #k_path='C:\\test\\'+k1 k_path = '.\\test\\' + k1 k_path2 = '.\\test1\\' + k1 flag=os.path.lexists(k_path) flag2 = os.path.lexists(k_path2) if flag==False: data='test目录下不存在该表' data_new='!' elif flag2==False: data = 'test1目录下不存在该表' data_new = '!' else: data = [] data_new = [] old_data = xlrd.open_workbook(os.path.join('.\\test', k1)) new_data = xlrd.open_workbook(os.path.join('.\\test1', k1)) table = old_data.sheets()[0] # 选取要读的sheet表单 nrows = table.nrows table2 = new_data.sheets()[0] # 选取要读的sheet表单 nrows2 = table2.nrows for i in range(nrows): data.append(table.row_values(i)) for i in range(nrows2): data_new.append(table2.row_values(i)) html_data = html_data_format(data) html_data2 = html_data_format(data_new) #html_data3 = html_data_format(show) return render_template("test_data.html", error_code='0', msg=html_data, msg2=html_data2,msg3=show) elif request.method == "GET": return render_template("test_data.html") return render_template("test_data.html") if __name__ == '__main__': cfg = load_config() # 读取配置文件 manager = Manager((cfg['web_ip'], cfg['web_port']), app) app.config["JSON_AS_ASCII"] = False app.run(host=cfg['web_ip'], port=cfg['web_port']) manager.run()