import pymysql # 配置数据库相关信息 dbinfo = { "host": "127.0.0.1", "user": "root", "password": "123456", "port": 8080 } class DbConnect(object): def __init__(self, db_cof, database=""): # 打开数据库连接 self.db = pymysql.connect(database=database, cursorclass=pymysql.cursors.DictCursor, **db_cof) # 使用cursor()方法获取操作游标 self.cursor = self.db.cursor() print("连接成功") def select(self, sql): # sql查询语句 # sql = 'select * from table where name="张三"' self.cursor.execute(sql) results = self.cursor.fetchall() print("查询成功") return results def execute(self, sql): # sql删除、提交、修改语句 print("删除成功") try: self.cursor.execute(sql) # 执行sql语句 self.db.commit() # 提交修改 except: # 发生错误时回滚 self.db.rollback() def close(self): # 关闭连接 self.db.close() if __name__ == "__main__": db = DbConnect(dbinfo, database='info') sql = 'select * from info where id=1;' res = db.select(sql) print(res) sql1 = 'delete from info where id =2;' res1 = db.execute(sql) print(res1)
from utils.connect_mysql import DbConnect, dbinfo def get_db_info(user_id,key='id'): db = DbConnect(dbinfo, database='info') sql = 'select * from info where id=%s;' % user_id res = db.select(sql) print(res) if len(res) == 0: result = '空' else: result = res[0][key] return result if __name__ == "__main__": r = get_db_info(user_id=1) print(r)
update_info.py
config: name: 修改用户信息用例 base_url: http://127.0.0.1:8080 variables: user_id: 1 username: "zhangsan" usertel: 13333333333 usersex: "m" birthaddress: "羊村" birthday: 2000-01-01 schoolname: "蓝翔技校" teststeps: - name: 修改用户信息 request: url: /api/test/info/$user_id method: PUT json: username: $username usertel: $usertel usersex: $usersex birthaddress: $birthaddress birthday: $birthday schoolname: $schoolname validate: - eq: [status_code, 200] - eq: [body.code, 1000] - eq: [body.msg, success!] - eq: [body.data.goodsstatus, '${get_db_info($user_id)}']