pip install redis
import redis # 导入redis模块 r = redis.Redis(host='localhost', port=6379, password="pwd@321", db=1,decode_responses=True) # host是redis主机,password为认证密码,redis默认端口是6379,db表示指定的数据库, decode_responses=True 表示存储的数据是字符串格式,如果没有密码,则去掉password参数即可 r.set('name', 'phyger-from-python-redis') # key是"name" value是"phyger-from-python-redis" 将键值对存入redis缓存 print(r['name']) # 第一种:取出键name对应的值 print(r.get('name')) # 第二种:取出键name对应的值 print(type(r.get('name')))
import redis # 导入redis模块 def client_redis_pool(): pool = redis.ConnectionPool(host='localhost', port=6379, db=1, decode_responses=True, max_connections=10) r = redis.Redis(connection_pool=pool) r.set('name', 'phyger-from-python-redis') print(r.get('name')) print(r['name']) print(type(r.get('name')))
import redis # 导入redis模块 # 格式: set(name, value, ex=None, px=None, nx=False, xx=False) # ex <int>过期时间(m) # px <int>过期时间(ms) # nx <bool>如果为真,则只有 name 不存在时,当前 set 操作才执行 # xx <bool>如果为真,则只有 name 存在时,当前 set 操作才执行 def client_redis_pool_ex(): pool = redis.ConnectionPool(host='localhost', port=6379, db=1, decode_responses=True) r = redis.Redis(connection_pool=pool) r.set('name', 'phyger-from-python-redis', ex=3) print(r['name']) time.sleep(3) print(r.get('name')) print(type(r.get('name')))
import threading import redis def redis_pool_test(): # 使用阻塞连接池 (当连接池中没有空闲的连接时,会等待timeout秒,直到获取到连接或超时报错。) # pool = redis.BlockingConnectionPool(host='localhost', port=6379, db=1, max_connections=2, timeout=5) # 使用普通连接池 pool = redis.ConnectionPool(host='localhost', port=6379, db=1, max_connections=2, timeout=5) redis_client = redis.Redis(connection_pool=pool) thread1 = RedaisExexThread(redis_client) thread2 = RedaisExexThread(redis_client) thread3 = RedaisExexThread(redis_client) thread4 = RedaisExexThread(redis_client) thread5 = RedaisExexThread(redis_client) thread6 = RedaisExexThread(redis_client) thread7 = RedaisExexThread(redis_client) thread8 = RedaisExexThread(redis_client) thread9 = RedaisExexThread(redis_client) thread10 = RedaisExexThread(redis_client) thread1.start() thread2.start() thread3.start() thread4.start() thread5.start() thread6.start() thread7.start() thread8.start() thread9.start() thread10.start() # 使用普通连接池的时候,连接数不够会直接报错 # 使用阻塞连接池的时候,如果连接数不够,会等待,等待时间在设定的超时时间之后,才会舍弃, # 在实际应用的时候,可以把线程池和超时时间设置的稍微大一些
# !/usr/bin/env python3 # -*- coding: utf-8 -*- import redis RedisConfig = dict(host='127.0.0.1', port=6379, db=1, decode_responses=True, max_connections=10, timeout=5) class RedisCache: _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = object.__new__(cls) return cls._instance _redis_pool_data = redis.BlockingConnectionPool(**RedisConfig) # 以管道方式运行redis @classmethod def get_pipe(cls): conn = cls.get_conn() return conn.pipeline(transaction=True) # 实例化redis @classmethod def get_conn(cls): conn = redis.StrictRedis(connection_pool=cls._redis_pool_data) return conn
import redisPool import threading def redis_clientt(): redis_client = redisPool.RedisCache.get_conn() thread1 = RedaisExexThread(redis_client) thread2 = RedaisExexThread(redis_client) thread3 = RedaisExexThread(redis_client) thread4 = RedaisExexThread(redis_client) thread5 = RedaisExexThread(redis_client) thread6 = RedaisExexThread(redis_client) thread7 = RedaisExexThread(redis_client) thread8 = RedaisExexThread(redis_client) thread9 = RedaisExexThread(redis_client) thread10 = RedaisExexThread(redis_client) thread1.start() thread2.start() thread3.start() thread4.start() thread5.start() thread6.start() thread7.start() thread8.start() thread9.start() thread10.start()
def redis_client_set_pip(ss): redis_clientt = redisPool.RedisCache.get_pipe() for i in range(1, 100000): redis_clientt.sadd(ss + str(i), i) redis_clientt.execute() print(time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())) # 添加10000个key-value 使用管道和不使用管道的区别 def redis_client_set(ss): redis_clientt = redisPool.RedisCache.get_conn() for i in range(1, 100000): redis_clientt.set(ss + str(i), i) # redis_clientt.execute() print(time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())) # 添加10000个key-value 使用管道和不使用管道的区别 if __name__ == '__main__': print(time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime())) t1 = threading.Thread(target=redis_client_set_pip, args=['no_pipe1:']) t2 = threading.Thread(target=redis_client_set_pip, args=['no_pipe2:']) t3 = threading.Thread(target=redis_client_set_pip, args=['no_pipe3:']) t1.start() t2.start() t3.start() print(time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime()))
同样是创建相同的数据pip方式比普通一个一个set的方式要提高好多倍