操作Redis集群:
import rediscluster import redis # startup_nodes = [ # {"host":"118.24.3.40","password":"HK139bc&*","port":6379}, # {"host":"118.24.3.41","password":"HK139bc&*","port":6379} # ] # r = rediscluster.RedisCluster(startup_nodes =startup_nodes, decode_responses=True) # print(r.keys()) r = redis.Redis(host="118.24.3.40",password="HK139bc&*",port=6379,db=4,decode_responses=True) r.set("ljq:ljq1","ljj") #文件夹ljq,key:ljq1 print(r.get("ljq:ljq1")) #取值时要把文件夹带上
面向对象:
# class Car: # wheel = 4 # color = "heise" # name = "feimaqiche" # def fly(self): # print(self.name) # # fmz_car = Car()#实例化 fmz_car:实例对象 # fmz_car.fly()#调用类里面的方法 #self 本类对象,实例化的是谁,self代表的是谁 class Car: wheel = 4 #类变量,公共的,通过self.wheel来应用 def __init__(self,color,name): #构造函数,类在实例化的时候,自动执行的函数 self.color = color self.name = name def __del__(self):#析构函数,实例在销毁的时候执行 print("析构函数") def fly(self): name= "www"#局部变量 print(id(self)) print(self.name) def say(self): print("%s,%s"%(self.name,self.color)) fmz_car = Car("红色的","汽车")#实例化 fmz_car:实例对象 fmz_car.fly()#调用类里面的方法 fmz_car.say()#调用类里面的方法 print(id(fmz_car)) fmz_car2 = Car("黄色的","汽车")#实例化 fmz_car:实例对象 fmz_car2.fly()#调用类里面的方法 fmz_car2.say()#调用类里面的方法 #raise 主动抛出异常 # 私有方法,私有变量, :前面加 __ 类里可以调用,出了类之后就不能调用
封装MySQL类:
import pymysql from day8li.homework.const import mysql_info class MySQL: #经典类 def __init__(self,mysql_info,data_type=1): self.mysql_info = mysql_info self.data_type = data_type self.__connect_status = False self.__connect() def __connect(self): print("开始连接mysql") try: self.__conn = pymysql.connect(**self.mysql_info) except: print("数据库连接出错!" ) raise Exception("数据库连接出错") self.__connect_status = True if self.data_type != 1: self.__cur = self.__conn.cursor(pymysql.cursors.DictCursor) else: self.__cur = self.__conn.cursor() print("mysql连接成功!") def execute(self,sql): print("开始执行sql",sql) try: self.__cur.execute(sql) except: print("sql不正确,sql语句是%s" % sql) else: print("sql执行完成!") return True def fetchone(self,sql): if self.execute(sql): return self.__cur.fetchone() def fetchall(self,sql): if self.execute(sql): return self.__cur.fetchall() def __del__(self): self.__close() print("mysql 连接关闭完成") def __close(self): if self.__connect_status: self.__cur.close() self.__conn.close() if __name__ == '__main__': my = MySQL(mysql_info) my.execute("update user_nhy_7 set nick ='杜拉拉' where id = 3") ret = my.fetchone("select * from user_nhy_7 where id = 3") print(ret) ret = my.fetchall("select * from user_nhy_7") print(ret)
日志打印:
# import logging # log = logging.Logger("abc",level="INFO") # log.info("haha") # import loguru # loguru.logger.debug("aaa") # loguru.logger.info("aaa") # loguru.logger.warning("aaa") # loguru.logger.error("aaa") from loguru import logger import sys logger.remove() # 清除它的默认设置设置 # fmt = '{time}||{level}||{file.path}:line:{line}:function_name:{function} ||msg={message}' fmt = '{time}||msg={message}' # level file function module time message # logger.add(sys.stdout, level='DEBUG', format=fmt) # 咱们本地运行的时候,在控制台打印 # # enqueue=True 异步写入日志 # logger.add('fmz.log', level='DEBUG', format=fmt, encoding='utf-8', # enqueue=True, rotation='1 day', # rotation多久产生一个日志文件 # retention='10 days') # 写在日志文件里面 # # logger.info("3253252") class Log: logger.remove()#清除它的默认设置设置 fmt = '[{time}][{level}][{file.path}:line:{line}:function_name:{function}] ||msg={message}' #level file function module time message logger.add(sys.stdout,level="DEBUG",format=fmt)#咱们本地运行的时候,在控制台打印 logger.add("test.log",level="DEBUG",format=fmt,encoding='utf-8',enqueue=True,rotation='1 day',retention='10 days')#写在日志文件里面 debug = logger.debug info = logger.info warning = logger.warning error = logger.error if __name__ == '__main__': Log.info("xxxx")