import pandas as pd
import pymysql
repository = dict()
def init_repository():
tab1 = (1,'test.test_account')
tab2 = (2,'test.loan_apply')
repository[tab1[0]] = tab1
repository[tab2[0]] = tab2
return repository
class MysqlHander:
"""initiated a program to connect DB
"""
def __init__(self, host='xxxxxxx', port=xxxx, user='xxxxx', passwd='xxxxxxx', db='xxxx',
charset='utf8'):
self.host = host
self.port = port
self.user = user
self.password = passwd
self.db = db
self.charset = charset
def connect_all(self,sql):
try:
self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, db=self.db,
charset=self.charset)
self.conn.ping(reconnect=True)
df = pd.read_sql(sql,con=self.conn)
return df
except Exception as e:
print('failed:',e)
finally:
self.conn.close()
def connect(self,sql,chunksize=10000):
try:
print("数据提取中....")
self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, db=self.db,
charset=self.charset)
# self.cursor = self.conn.cursor()
self.conn.ping(reconnect=True)
self.chunks = []
reader = pd.read_sql(sql, con=self.conn,chunksize=chunksize)
for each in reader:
self.chunks.append(each)
df = pd.concat(self.chunks,axis=0)
print("数据提取完毕!")
return df
except Exception as e:
print('failed',e)
finally:
self.conn.close()
def show_table():
print("新的一天,新的工作,新的学习,加油干!")
print("常用数据表清单:")
print("--"*30)
print("%5s%10s" % ("序号","表名"))
print(".."*30)
for seq,tab in init_repository().items():
print("%5s| %-20s" % (seq,tab[1]))
print("--"*30)
def show_command():
require = int(input("请输入你需要使用的表数量:\n"))
tab_num = []
count = 0
while count < require:
cmd = int(input("请输入要使用的表名序号:\n"))
tab_num.append(cmd)
count += 1
for i in range(len(tab_num)):
if tab_num[i] not in init_repository().keys():
print("不要玩了,好不好!")
break
print(f"你需要的表名称是:{init_repository()[tab_num[i]][1]}")
tab_num[i] = init_repository()[tab_num[i]][1]
return tab_num
if __name__ == "__main__":
show_table()
tab_name = show_command()
c = MysqlHander()
namelist = init_repository()
for i in range(len(tab_name)):
script = f'select * from {tab_name[i]};'
path = tab_name[i].split(".")[1] + '.xlsx'
df = c.connect(script)
df.to_csv(path)