import pandas as pd from sqlalchemy import create_engine from urllib import parse import uuid import numpy as np import time def write_data_mysql(db_conf, mode='r', table_name=None,dbname = 'finance_task_center_0',times=6,batch_size=500000): """ :param db_conf: 数据库配置 :param mode: 模式开启:r读取, w写入 :param table_name: 分表名称 :param dbname: 数据库db名 :param times: 分几次写入 :param batch_size: 每次写入多少行数据 :return: None,默认每次写入300w """ user = db_conf["user"] pwd = parse.quote_plus(db_conf["pwd"]) host = db_conf["host"] port = db_conf["port"] # dbname = 'finance_task_center_0' cols = ['task_id', 'id', 'company_id', 'zid', 'uid', 'type_id', 'sid', 'seller_id', 'run_type', 'origin_type', 'execution_time', 'expiration_time', 'finish_time', 'market_id', 'market_code', 'repeat', 'message_id', 'min_version', 'version', 'priority', 'status', 'data', 'result', 'gmt_create', 'gmt_modified'] connect_info = f"mysql+pymysql://{user}:{pwd}@{host}:{port}/" +f"{dbname}" engine = create_engine(connect_info) if mode == 'r': df = pd.read_sql_query('select * from {} limit 1'.format(table_name), engine) print('===========读取数据==============:') print(df.columns.tolist()) print(df.values.tolist()[0]) if mode == 'w': for t in range(times): st =time.time() genrator = gen_data(batch_size) # for i,v in enumerate(genrator): df = pd.DataFrame(genrator, columns=cols) print(df) end = time.time() df.to_sql(name=table_name, con=engine, if_exists='append', schema=dbname, index=False) print(f"=第{t+1}次写入行数{batch_size}==========finish=============== table_name : {table_name},cost time {end-st} 秒") print(f"\n@@@@@@@@=========={table_name}写入{(times*batch_size)} 行完成=================@@@@@@@@@@@@@@") def gen_data(nums): """ """ data =[] for k in range(nums): taskid = abs(hash(uuid.uuid1().hex)) type_id = np.random.choice([1, 2, 3, 4, 5], 1, True, [0.4, 0.3, 0.1, 0.1, 0.1])[0] row_data=[ taskid, # task id 4,# id 9012499355034237, # companyid, 100026, # zid 0, # UID type_id, # type_id, 117, # `sid`, 'A303OEQ77COZA8', # `seller_id`, 1 , # runtype 1, # origin type '2022-05-28 19:26:00', # excute time '2022-05-28 20:00:00' , # expire time '0000-00-00 00:00:00', # finish time 1, # `market_id`, 'US', # market_code`, 0, # `repeat`, '', # message id '*', # `min_version`, '', # version 12, # `priority`, 0, #status, '{"report_date_type":"custom", "report_date_month":"2022-04", ' \ '"report_custom_date_end":"2022-04-02", "report_custom_date_start":"2022-04-02"}', # `data`, '' , # `result '2022-05-31 17:37:58', '2022-05-31 17:37:58' ] data.append(row_data) return data if __name__ == '__main__': db_conf2 = { "user": "test", "pwd": "test@vW9JMvL", "host": "192.188.119.1", "port": "3306", "db_name": "finance_task_center_0" } table_name = 'client_tasks_33' # 'runoob_tbl' s = write_data_mysql(db_conf2, table_name=table_name, mode='w',times=2,batch_size=1)