安装库pymysql
pip install pymysql
使用前引入
import pymysql
使用步骤如下:
1.创建连接
db_config = { 'host': 'api.****.com', # 主机 'user': '*****', #用户名 'password': '123456', #密码 'port': 3306, #端口 3306 'database':'*****' #数据库名 } # 创建连接 conn = pymysql.connect(**db_config)
2.创建游标
cursor=conn.cursor()
3.准备并执行sql语句
sql='select leave_amount from member where id=100' cursor.execute(sql)
4.获取查询结果,类似于卸货,查询结果如果有10条,三个获取结果语句均使用,则fetchone()获得一条,fetchmany(3)获得三条,fetchall()即获得6条,三条语句合起来为查询结果为10条,如果仅使用fetchall()则获取10条
# 获取一条查询结果,以元组的形式返回 cursor.fetchone() # 获取三条查询结果 cursor.fetchmany(3) # 获取全部查询结果 cursor.fetchall()
上述结果均以元组形式返回,获取结果以字典形式返回
# 需要引入 DictCursor from pymysql.cursors import DictCursor # 创建游标的时候,指定为DictCursor cursor=conn.cursor(DictCursor)
查询数据库可以封装起来,后续直接导入使用
# 配置文件中db_config MYSQL_CONFIG = { 'host': 'api.*****.com', 'user': '*****', 'password': '123456', 'port': 3306, 'database':'*****', 'engine':'mysql', 'autocommit':True } import pymysql from pymysql.cursors import DictCursor class Handle_SQL: def __init__(self,db_config): # 创建数据库连接 # 根据不同的数据库,创建不同的链接 # 如果engine没有值默认返回mysql engine = db_config.pop('engine', 'mysql') if engine.lower() == 'mysql': self.conn = pymysql.connect(**db_config) else: pass# 此处可以配置其他的数据库 def get_fetchone(self,sql,res_type = 't'): """ 返回一条数据 :param sql: :param size: 默认返回元组 :return: """ # 创建游标 # 返回元组类型的数据 if res_type == 't': with self.conn.cursor() as cursor: cursor.execute(sql) return cursor.fetchone() # 返回字典类型的数据 else: with self.conn.cursor(DictCursor) as cursor: cursor.execute(sql) return cursor.fetchone() # 获取多个数据 def get_fetchmany(self,sql,size,res_type='t'): if res_type == 't': with self.conn.cursor() as cursor: cursor.execute(sql) return cursor.fetchmany(size) # 返回字典类型的数据 else: with self.conn.cursor(DictCursor) as cursor: cursor.execute(sql) return cursor.fetchmany(size) # 获取全部数据 def get_fetchall(self,sql,res_type='t'): if res_type== 't': with self.conn.cursor() as cursor: cursor.execute(sql) return cursor.fetchall() # 返回字典类型的数据 else: with self.conn.cursor(DictCursor) as cursor: cursor.execute(sql) return cursor.fetchall() def exist_sql(self,sql): if self.get_fetchone(sql): return True else: return False def __del__(self): self.conn.close()