1、事务
事务保证多条命令要么全部执行,要么都不执行。使用MULTI命令来开始一个事务,使用EXEC来执行一个事务,EXEC返回多行字符串,其内容为事务中每条命令的返回值。
当事务中命令有语法错误的时候,所有的命令都不会执行,EXEC返回错误信息。当事务中命令执行的时候出现错误的情况(比如使用散列类型命令来操作集合),其它命令不受影响会继续执行,EXEC的返回值中出现错误的那条命令的返回值就是命令的错误信息,所以出现这种情况的话要还要自己实现回滚的操作。
有些情况下,命令的参数是上一条命令的结果,比如我们自己实现INCR命令的话,要先使用GET命令获得键值后将其加1,然后使用SET命令设置到键值。为了防止竞态条件,这两条命令使用一个事务,但问题是事务中的命令的执行结果都是最后一起返回的(可以看做事务中的所有命令都是同时执行的),所以无法将上一条命令的结果作为下一条命令的参数。解决方法是使用WATCH:WATCH命令用来监控一个或多个键,当其中有一个键被修改或删除后(被自己修改或者被其它客户端修改),之后执行EXEC的话,事务中的命令并不会被执行。如下为自己实现INCR:
label: WATCH key; auto value = GET key; if (value == not exist) value = 0; ++value; MULTI SET key value; auto result = EXEC; if (result.isEmpty()) //EXEC中命令没有执行话返回为空 goto label;
如果我们使用WATCH命令后并没有使用MULTI/EXEC的话,应该使用UNWATCH来取消监视,防止下一个事务受影响。
2、TTL(TIME TO LIVE)
过期时间常见的使用有验证码、限时优惠活动、缓存等。
相关的命令:
EXPIRE/PEXPIRE命令用来设置一个键的过期时间,单位为秒/毫秒,超时的话自动删除该键,如EXPIRE foo 60为设置foo键的过期时间为1分钟。该命令返回1表示成功,返回0表示键不存在或失败。对一个已经设置了过期时间的键使用该命令的话会重新设置键的过期时间,相当于是调用PERSIST后接着调用EXPIRE。
EXPIREAT/PEXPIREAT:使用UNIX时间戳作为过期时间,如EXPIREAT foo 1351858600。
TTL:获得键的剩余生存时间,单位是秒,该键没有设置TTL的话返回-1,键不存在的话返回-2。如TTL foo。
PERSIST:取消键的过期时间设置,成功返回1,键不存在或者本来就没有设置TTL的话返回0,如PRESIST foo。
SET(GETSET)命令会取消键的过期时间设置,INCR、LPUSH、HSET、ZREM命令不会影响键的过期时间。如果使用WATCH检测一个拥有过期时间设置的键,当该键过期被自动删除后,不会被WATCH认为该键被修改。
下面是设置TTL的应用,设置用户一分钟内访问页面的次数最多为10次:
auto iExists = EXISTS rate.limiting:user.IP; //判断用户一分钟内访问次数是否已存在 if (iExists == 1) { //已存在 auto visitTimes = INCR rate.limiting:user.IP; //递增访问次数 if (visitTimes > 10) //一分钟内访问超过10次 printf("超出访问频率限制,请稍后再试"); } else { //不存在:首次访问,或者TTL超时(键被自动删除) MULTI INCR rate.limiting:user.IP; //递增访问次数 EXPIRE rate.limiting:user.IP 60; //设置该键TTL为60秒 EXEC //使用事务的原因:防止INCR后,EXPIRE因为某些原因退出没有执行,那么该键会永久存在,就没有了访问次数限制 }
上面的代码实际上有一些问题,比如一开始用户浏览了1次网页,然后这分钟的最后一秒又浏览了9次网页,然后下一分钟的第一秒又浏览了10次网页,这相当于是在两秒内浏览了19次网页。一般我们设置用户一分钟内访问页面的次数最多为10次的话,实际上应该是第1次浏览网页和第11次浏览网页的时间差应该大于或等于1分钟,下面是改进的方法,记录用户访问的时间到一个list,当第11次用户访问的时候就要判断当前访问时间和第一次访问时间只差是否大于或等于1分钟:
auto listLen = LLEN rate.limiting:user.IP; //获得访问次数 if (listLen < 10) { //小于10次 LPUSH rate.limiting:user.IP getCurrentTime(); //记录访问时间到list } else { //等于10次 auto time = LINDEX rate.limiting:user.IP -1; //获得第一次访问的时间 auto curTime = getCurrentTime(); //本次访问时间 if (curTime - time < 60) { //本次访问时间与第一次访问时间差在一分钟之内 printf("超出访问频率限制,请稍后再试"); } else { //本次访问时间与第一次访问时间差大于等于一分钟 MULTI LPUSH rate.limiting:user.IP curTime; //将本次访问记录到list LTRIM rate.limiting:user.IP 0 9; //清空上一分钟的访问时间记录 EXEC //使用事务的原因:防止LPUSH后,LTRIM因为某些原因退出没有执行,那么该键大小永远大于10次 } }
Redis将键保存在内存中并可以为减设置过期时间这两个特性使得Redis也非常适合作为缓存系统,对于那些访问频率很高或者对CPU、IO消耗较大的操作,可以将其操作内容保存到Redis键中,并设置过期时间,如下所示。
auto data = GET cache.data; //获得缓存的数据 if (data.isEmpty()) { //第一次请求数据,或者缓存过期被删除 auto dat = calculate(); //获得计算后的数据 MULTI SET cache.data dat; //缓存数据到 EXPIRE cache.data 3600 //设置缓存有效期1个小时 EXEC }else{ //直接使用缓存的数据 ...... }
我们可以修改配置文件的maxmemory参数来设置Redis的最大可用内存,当超出了这个设置后,Redis会根据maxmemory-policy参数指定的策略来不断的删除指定的键直到内存小于设置的值。如下为maxmemory-policy支持的选项,其中LRU表示最近最少使用的的键:
3、排序
SORT命令可以获得list、set、sortSet(根据元素排序,而不是分数)排序后的结果,SORT默认会将元素转换成浮点数来排序:
SORT fooList:获得foo排序后的结果(升序)。
SORT fooList DESC:降序排序。
SORT fooList ALPHA:按照元素的字符字典顺序排序。
SORT fooList LIMIT 5 3:获得foo排序后的结果,并且跳过前5个元素,只获得之后的3个元素,适合分页展示的情况。
SORT...BY...可以根据另一些键来对容器排序,其中这些键跟要排序的容器的元素必须有名称对应关系—健名包含元素值,比如键fooList的元素分别为leon、ada、tom,另外还存在三个键分别为student:leon(值为3)、student:ada(值为2)、student:tom(值为1),那么就可以通过这三个键的值为fooList排序,通过使用BY:SORT fooList BY student:*,排序结果为tom、ada、leon。
使用SORT...BY...也可以根据另一些为散列的键来对容器排序,比如键fooList的元素分别为leon、ada、tom,另外还存在三个键分别为mapStudent_leon(包含元素 {age,3} )、mapStudent_ada(包含元素 {age,2})、mapStudent_tom(包含元素 {age,1}),那么就可以通过这三个键的key为age的元素为fooList排序,通过使用BY:SORT fooList BY mapStudent*->age,排序结果为tom、ada、leon。
SORT...GET...可以通过排序结果获得与其有关联的另一些键的值,如键 idList的元素分别为2、1、3,另外还存在三个键分别为name1(值为leon),name2(值为ada),name3(值为tom),使用SORT idList DESC GET name* 获得的结果是 tom、ada、leon。GET可以使用多次,比如另外还存在三个哈希键分别为1mapStudent(包含元素{age, 11})、2mapStudent(包含元素{age, 21})、3mapStudent(包含元素{age, 31}),使用SORT idList DESC GET name* GET *mapStudent->age 获得的结果是 tom、31、ada、21、leon、11。GET#可以获得元素本身的值,如SORT idList DESC GET name* GET *mapStudent->age GET# 获得的结果是tom、31、3、ada、21、2、leon、11、1。
SORT...STORE可以将排序的结果保存在另一个键中,如SORT foo STORE bar,该命令的返回值为保存结果个数。SORT命令的时间复杂度是O(n + mlog(m)),其中n为要排序容器的大小,m为返回结果的个数,当n或m很大的时候排序性能就很差,所以可以注意以下三点:
以下为SORT...STORE结合EXPIRE命令来缓存排序结果示例:
auto isCacheExists = EXISTS cache.sort; //判断缓存是否存在 if (isCacheExists) {//存在 return LRANGE cache.sort 0 - 1; //直接获得缓存中的数据 } else { //不存在 auto resultSort = SORT dataList STORE cache.sort; //计算排序结果并将结果保存到cache.sort键中 EXPIRE cache.sort 60; //设置缓存过期时间为1分钟 return resultSort; }
4、消息通知
任务队列:
BRPOP命令与RPOP命令功能类似,不同的是当列表中没有元素的时候BROP会一直阻塞,直到有元素或者超时。BRPOP除了返回元素值外,还会返回键名,如BRPOP listFoo 0表示弹出并获得列表最右边的元素,等待时间为0表示一直等待(指定时间超时的话会返回nil),当其它客户端向listFoo插入一个元素100后,该命令会立即返回两个值: listFoo和100。BRPOP支持监视多个列表,比如BRPOP listFoo1 listFoo2 0,其中一个列表有元素的话就会弹出并获得,如果两个列表都有元素的话,会只弹出第一个列表即listFoo1的元素。BLPOP是从列表的左端弹出和获得元素。假如现在有两个任务队列,一个优先级比较高,当其中有元素的时候我们应该先处理其中的数据,当其中没有元素的时候我们再处理另一个队列,那么可以编程如下:
while (1) { auto value = BRPOP listHigh listNormal 0; //ToDo ....... }
发布/订阅:
PUBLISH/SUBSCRIBE命令可以实现“发布/订阅”模式,如一个客户端SUBSCRIBE channel订阅了channel,另一个客户端PUBLISH channel hello向channel发布了hello后,则所有订阅了channel的客户端都会收到hello。PUBLISH命令的返回值表示当前订阅该频道的订阅者数量。SUBSCRIBE可以订阅多个频道,如SUBSCRIBE channel1 channel2。在SUBSCRIBE订阅之前PUBLISH的消息,不会被收到。SUBSCRIBE命令的返回值有三条值,一个是字符串"subscribe"表示订阅成功,一个是订阅的频道名,一个是当前客户端订阅的所有频道的数量。客户端收到订阅的消息也包含三条值,一个是字符串“message”,一个是产生消息的频道名称,一个是消息值。执行SUBSCRIBE命令后进入订阅模式,以后只能执行订阅相关的命令( SUBSCRIBE、UNSUBSCRIBE、 PSUBSCRIBE、PUNSUBSCRIBE)。UNSUBSCRIBE用来取消订阅指定的频道或多个频道,不指定频道名的话表示取消订阅所有频道,其返回值也包含三条值,一个是字符串“unsubscribe”表示取消订阅成功,一个是对应的频道名,一个是当前客户端订阅的所有频道的数量。
PSUBSCRIBE与SUBSCRIBE功能一样,但支持glob风格通配符,如PSUBSCRIBE channel* 表示订阅所有以channel开头的频道。 PSUBSCRIBE命令返回值为三个,一个是字符串"psubscribe"表示订阅成功,一个是订阅的频道(如" channel* "),一个是当前客户端订阅的所有频道的数量。 PSUBSCRIBE收到的订阅消息包含四个值,一个是字符串"pmessage",一个是订阅时使用的通配符(如channel*),一个是实际发送消息的频道名,一个是消息值。当客户端执行了SUBSCRIBE channel1后又执行了PSUBSCRIBE channel*,另一个客户端PUBLISH channel1 100后(该命令会返回2),会收到两条消息(一条类型为"message",一条类型为"pmessage")。使用PUNSUBSCRIBE来取消PSUBSCRIBE订阅的频道,如PUNSUBSCRIBE channel*,不指定参数的话为退订所有频道。需要注意的是,PUNSUBSCRIBE不会影响通过SUBSCRIBE订阅的频道,同样UNSUBSCRIBE不会影响通过PSUBSCRIBE订阅的频道。
5、管道
执行Redis的命令后必须等待命令的回复后才能执行下一条命令,使用管道技术可以一次性向Redis发送多条命令,然后一次性获得多条命令的返回结果。如下为java不适用管道和使用管道的示例:
Jedis jedis = new Jedis("localhost"); jedis.auth("123"); //不使用管道的测试 long startTime = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { jedis.lpush("key", "" + i); } long endTime = System.currentTimeMillis(); long times = endTime - startTime; //耗时8368 jedis.flushAll(); //使用管道的测试 startTime = System.currentTimeMillis(); Pipeline pipeline = jedis.pipelined(); for (int i = 0; i < 100000; i++) { pipeline.lpush("key", "" + i); } List<Object> list = pipeline.syncAndReturnAll(); endTime = System.currentTimeMillis(); times = endTime - startTime; //耗时243 jedis.flushAll(); jedis.disconnect();