原文链接:https://mp.weixin.qq.com/s/uyX9eRnd2xPOEr6lwax8Yw
在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。
在上面的第一种和第二种方式都是基于数据来扣减库存。
第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。
在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。
第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。
基于数据库来实现扣减库存还存在的一些问题:
用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:
update number set x=x-1 where x > 0
理论上即使是这样由于MySQL事务的特性,这种方法只能降低超卖的数量,但是不可能完全避免超扣。
因为数据库默认隔离级别是repeatable read,假如库存是5,有A、B两个请求分别创建了事务并且都没有提交,当A事务提交了,改了库存为4,但是因为是事务隔离级别是可重复读的,所有B看不到A事务改的库存。到时B看到的库存还是5,所以B修改库存为4,这样就出现了超扣问题。
所以我们扣库存的时候需要将事务隔离级别设置成read commit才可以。(我自己测试没有出现这种情况)
MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。
当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。
针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。
比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。
Redis Incrby 命令将 key 中储存的数字加上指定的增量值。
本操作的值限制在 64 位(bit)有符号数字表示之内。
redis Incrby 命令基本语法如下:
redis 127.0.0.1:6379> INCRBY KEY_NAME INCR_AMOUNT
>= 1.0.0
加上指定的增量值之后, key 的值。
具体关于lua脚本的内容使用请移步至 redis命令参考–Script脚本 :
http://doc.redisfans.com/script/index.html
Lua脚本,是一种轻量级的脚本语言。设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua脚本的应用也很多,比如Nginx+Lua实现的OpenResty,Redis+Lua配合使用(Redisson中大量使用了Lua脚本)。
1、减少网络开销:Lua脚本在执行的时候,是先发送到Redis服务器的,然后在服务器上执行脚本。多个命令和业务逻辑都封装到脚本里,一次性提交到服务器。
2、原子性操作:我们都知道redis在执行命令时是单线程的,但是每个命令之间就存在并发的情况,就存在先查询再操作时,两个命令没办法保证线程安全。但使用Lua脚本时,redis把这个脚本操作当成是一个命令,那么这个脚本中的多条操作也就保证了原子性。(注意:只保证原子性,不是事务)
1、Lua脚本可以在redis单机模式、主从模式、Sentinel集群模式下正常使用,但是无法在分片集群模式下使用。(脚本操作的key可能不在同一个分片)。(其实集群模式不支持问题也是可以解决的,在使用spring的RedisTemplate执行lua脚本时,报错EvalSha is not supported in cluster environment
,不支持cluster。但是redis是支持lua脚本的,只要拿到原redis的connection对象,通过connection去执行即可,在后面会说下这个问题)
2、Lua脚本中尽量避免使用循环操作(可能引发死循环问题),尽量避免长时间运行。
3、redis在执行lua脚本时,默认最长运行时间时5秒,当脚本运行时间超过这一限制后,Redis将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。
/** * 获取库存回调 * @author yuhao.wang */ public interface IStockCallback { /** * 获取库存 * @return */ int getStock(); }
扣减库存服务(StockService)
ackage com.xiaolyuh.service; import com.xiaolyuh.lock.RedisLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; /** * 扣库存 * * @author yuhao.wang */ @Service public class StockService { Logger logger = LoggerFactory.getLogger(StockService.class); /** * 库存还未初始化 */ public static final long UNINITIALIZED_STOCK = -3L; /** * Redis 客户端 */ @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 执行扣库存的脚本 */ public static final String STOCK_LUA; static { /** * * @desc 扣减库存Lua脚本 * 库存(stock)-1:表示不限库存 * 库存(stock)0:表示没有库存 * 库存(stock)大于0:表示剩余库存 * * @params 库存key * @return * -3:库存未初始化 * -2:库存不足 * -1:不限库存 * 大于等于0:剩余库存(扣减之后剩余的库存),直接返回-1 */ StringBuilder sb = new StringBuilder(); // exists 判断是否存在KEY,如果存在返回1,不存在返回0 sb.append("if (redis.call('exists', KEYS[1]) == 1) then"); // get 获取KEY的缓存值,tonumber 将redis数据转成 lua 的整形 sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));"); sb.append(" local num = tonumber(ARGV[1]);"); // 如果拿到的缓存数等于 -1,代表改商品库存是无限的,直接返回1 sb.append(" if (stock == -1) then"); sb.append(" return -1;"); sb.append(" end;"); // incrby 特性进行库存的扣减 sb.append(" if (stock >= num) then"); sb.append(" return redis.call('incrby', KEYS[1], 0-num);"); sb.append(" end;"); sb.append(" return -2;"); sb.append("end;"); sb.append("return -3;"); STOCK_LUA = sb.toString(); } /** * @param key 库存key * @param expire 库存有效时间,单位秒 * @param num 扣减数量 * @param stockCallback 初始化库存回调函数 * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存 */ public long stock(String key, long expire, int num, IStockCallback stockCallback) { long stock = stock(key, num); // 初始化库存 if (stock == UNINITIALIZED_STOCK) { RedisLock redisLock = new RedisLock(redisTemplate, key); try { // 获取锁 if (redisLock.tryLock()) { // 双重验证,避免并发时重复回源到数据库 stock = stock(key, num); if (stock == UNINITIALIZED_STOCK) { // 获取初始化库存 final int initStock = stockCallback.getStock(); // 将库存设置到redis redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS); // 调一次扣库存的操作 stock = stock(key, num); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } } return stock; } /** * 扣库存 * * @param key 库存key * @param num 扣减库存数量 * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】 */ private Long stock(String key, int num) { // 脚本里的KEYS参数 List<String> keys = new ArrayList<>(); keys.add(key); // 脚本里的ARGV参数 List<String> args = new ArrayList<>(); args.add(Integer.toString(num)); long result = redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行 // 集群模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args); } // 单机模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args); } return UNINITIALIZED_STOCK; } }); return result; } /** * 加库存(还原库存) * * @param key 库存key * @param num 库存数量 * @return */ public long addStock(String key, int num) { return addStock(key, null, num); } /** * 加库存 * * @param key 库存key * @param expire 过期时间(秒) * @param num 库存数量 * @return */ public long addStock(String key, Long expire, int num) { boolean hasKey = redisTemplate.hasKey(key); // 判断key是否存在,存在就直接更新 if (hasKey) { return redisTemplate.opsForValue().increment(key, num); } Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null"); RedisLock redisLock = new RedisLock(redisTemplate, key); try { if (redisLock.tryLock()) { // 获取到锁后再次判断一下是否有key hasKey = redisTemplate.hasKey(key); if (!hasKey) { // 初始化库存 redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } return num; } /** * 获取库存 * * @param key 库存key * @return -1:不限库存; 大于等于0:剩余库存 */ public int getStock(String key) { Integer stock = (Integer) redisTemplate.opsForValue().get(key); return stock == null ? -1 : stock; } }
@RestController public class StockController { @Autowired private StockService stockService; @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Object stock() { // 商品ID long commodityId = 1; // 库存ID String redisKey = "redis_key:stock:" + commodityId; long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId)); return stock >= 0; } /** * 获取初始的库存 * * @return */ private int initStock(long commodityId) { // TODO 这里做一些初始化库存的操作 return 1000; } @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Object getStock() { // 商品ID long commodityId = 1; // 库存ID String redisKey = "redis_key:stock:" + commodityId; return stockService.getStock(redisKey); } @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Object addStock() { // 商品ID long commodityId = 2; // 库存ID String redisKey = "redis_key:stock:" + commodityId; return stockService.addStock(redisKey, 2); } }
库存新增的操作一般不存在高并发的情况,因为不可能某一种商品一直在新增库存,这属于管理员后台管理的一种操作。
这里新增库存采用了redis的
1.库存发生新增操作,调用层一般传过来商品的id标识和新增量,调用库存新增服务
2.库存新增服务
/** * 加库存(还原库存) * @param key 库存key * @param num 库存数量 * @return */ public long addStock(String key, int num) { return addStock(key, null, num); }
库存新增服务主要是使用了redis的increment自增操作。
3.辛苦新增服务
boolean hasKey = redisTemplate.hasKey(key); // 判断key是否存在,存在就直接更新 if (hasKey) { return redisTemplate.opsForValue().increment(key, num); }
第一种情况是先判断redis中是否有这个商品库存的缓存,如果存在该商品库存,就直接进行增加操作;
Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null"); RedisLock redisLock = new RedisLock(redisTemplate, key); try { if (redisLock.tryLock()) { // 获取到锁后再次判断一下是否有key hasKey = redisTemplate.hasKey(key); if (!hasKey) { // 初始化库存 redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } return num;
然后是第二种情况了,就是redis中没有库存缓存了。所以就需要去初始化库存。因为初始化库存有一些非原子的操作,在分布式环境下不安全,所以这里先通过这个商品id获取分布式锁,拿到锁之后,再去判断一下redis中是否有这个缓存,确认没有,则可以进行初始化操作,然会返回数量,初始化操作可以从数据库查出真实库存的值,然后更新到缓存。
我这里的案列是直接把第一次传进来的库存数量进行初始化。
在对某key进行increment()方法时,可能会报错:
redis ERR value is not an integer or out of range
这里库存新增我们使用的是RedisTemplate
的increment
的自增方法。
Spring对Redis序列化的策略有两种,分别是StringRedisTemplate
和RedisTemplate
,其中StringRedisTemplate
用于操作字符串,RedisTemplate
使用的是JDK默认的二进制序列化。
大家都知道redis序列化是将key,value值先转换为流的形式,再存储到redis中。
RedisTemplate
是使用的JdkSerializationRedisSerializer
序列化,序列化后的值包含了对象信息,版本号,类信息等,是一串字符串,所以无法进行数值自增操作。
而StringRedisTemplate
序列化策略是字符串的值直接转为字节数组,所以存储到redis中是数值,所以可以进行自增操作。
所以问题出在这里,我们需要自定义序列化策略,在application启动类中添加如下:
@Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); //定义key序列化方式 //RedisSerializer<String> redisSerializer = new StringRedisSerializer();//Long类型会出现异常信息;需要我们上面的自定义key生成策略,一般没必要 //定义value的序列化方式 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // template.setKeySerializer(redisSerializer); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template;
利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。
库存发生扣减操作,调用层一般传过来商品的id标识和扣减量,调用库存扣减服务
long stock = stock(key, num);
第一步是进行扣减操作,在正常情况下,如果缓存中存在库存数据,则会进行正常的扣减操作,并且返回结果。
// 初始化库存 if (stock == UNINITIALIZED_STOCK) { RedisLock redisLock = new RedisLock(redisTemplate, key); try { // 获取锁 if (redisLock.tryLock()) { // 双重验证,避免并发时重复回源到数据库 stock = stock(key, num); if (stock == UNINITIALIZED_STOCK) { // 获取初始化库存 final int initStock = stockCallback.getStock(); // 将库存设置到redis redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS); // 调一次扣库存的操作 stock = stock(key, num); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } }
第二种是缓存中还没有数据的情况,则需要进行初始化操作。初始化库存存在非原子操作,所以需要使用分布式锁来实现。拿到锁之后,在进行一次库存扣减操作,看返回的结果还是不是没有缓存,这是进行一次双重验证,避免并发时重复回源到数据库。第二次验证的结果还是没有缓存的话,则需要进行一次初始化缓存操作。初始化操作可以从数据库查出真实库存的值,然后更新到缓存。然后再进行一次扣减操作。
RedisTemplate执行lua脚本,集群模式下报错解决
在使用spring的RedisTemplate执行lua脚本时,报错EvalSha is not supported in cluster environment
,不支持cluster。
但是redis是支持lua脚本的,只要拿到原redis的connection对象,通过connection去执行即可:
//spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本异常,此处拿到原redis的connection执行脚本 String result = (String)redisTemplate.execute(new RedisCallback<String>() { public String doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); // 集群模式和单点模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行 // 集群 if (nativeConnection instanceof JedisCluster) { return (String) ((JedisCluster) nativeConnection).eval(LUA, keys, args); } // 单点 else if (nativeConnection instanceof Jedis) { return (String) ((Jedis) nativeConnection).eval(LUA, keys, args); } return null; } });