配置文件,名称为config.py
#config.py rds_v1 = { "host": "127.0.0.1", "user": "root", "password": "12345678", "database": "users", "port": 3306 }
类对象内容
#database.py import pymysql import pandas as pd from config import * class DB: def __init__(self, engine): self._engine = engine self._conn = pymysql.connect(host=self._engine['host'], user=self._engine['user'], password=self._engine['password'], port=self._engine['port'], database=self._engine['database'] , charset='utf8') def insert(self, dataframe, table): """插入dataframe类型数据到指定表""" with self._conn.cursor() as cursor: col = ','.join(dataframe.columns) par = ','.join(['%s'] * dataframe.shape[1]) keys = ','.join([f'{i} = values({i})' for i in dataframe.columns]) sql = f'insert into {table} ({col}) values({par}) on duplicate key update {keys};' cursor.executemany(sql, [tuple(i) for i in df.values]) try: self._conn.commit() msg = f"""数据写入成功:共{dataframe.shape[0]}条数据写入{table}表!!!""" except Exception as err: self._conn.rollback() msg = f"数据写入失败!!!" raise Exception(msg) return msg def select(self, sql, kind=True): """查询数据:kind为True表示需要输出数据""" with self._conn.cursor() as cursor: try: cursor.execute(sql) if kind: result = cursor.fetchall() cols = cursor.description return pd.DataFrame(result, columns=[i[0] for i in cols]) else: return self._conn.commit() except Exception as err: self._conn.rollback() msg = f"ERROR - {self._engine['host']} session init failed: {err}" + "\n" raise Exception(msg) def tables(self): """查看表""" sql = "show tables;" return self.select(sql) def drop(self, table): """删除表""" sql = f"drop table {table};" return self.select(sql, kind=False) def delete(self, table, tag='1=1'): """删除数据,可以按条件删除""" sql = f"delete from {table} where {tag};" return self.select(sql, kind=False) def desc(self, table): """查看表结构""" sql = f"desc {table};" return self.select(sql) def process(self): """查看数据库进程""" sql = r"show processlist;" return self.select(sql) def kill(self, process_id): """关闭数据库进程""" sql = f"kill {process_id};" return self.select(sql, kind=False) def use(self, database): """切换数据库""" sql = f"use {database};" return self.select(sql, kind=False) def close(self): self._conn.close() def create_sql(self, table): sql = f"""show create table {table}""" dataframe = self.select(sql) return print(dataframe['Create Table'][0]) if __name__ == '__main__': db = DB(rds_v1) df = db.tables()