Redis教程

Redis 实现库存扣减操作

本文主要是介绍Redis 实现库存扣减操作,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

原文链接:https://mp.weixin.qq.com/s/uyX9eRnd2xPOEr6lwax8Yw

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

  • 使用mysql数据库,使用一个字段来存储库存,每次扣减库存去更新这个字段。
  • 还是使用数据库,但是将库存分层多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。
  • 将库存放到redis使用redis的incrby特性来扣减库存。

分析

在上面的第一种和第二种方式都是基于数据来扣减库存。

基于数据库单库存

第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。

在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

基于数据库多库存

第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。

基于数据库来实现扣减库存还存在的一些问题:

用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:

update number set x=x-1 where x > 0

理论上即使是这样由于MySQL事务的特性,这种方法只能降低超卖的数量,但是不可能完全避免超扣。

因为数据库默认隔离级别是repeatable read,假如库存是5,有A、B两个请求分别创建了事务并且都没有提交,当A事务提交了,改了库存为4,但是因为是事务隔离级别是可重复读的,所有B看不到A事务改的库存。到时B看到的库存还是5,所以B修改库存为4,这样就出现了超扣问题。

所以我们扣库存的时候需要将事务隔离级别设置成read commit才可以。(我自己测试没有出现这种情况)

  • MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。

  • 当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。

基于redis

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。

比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

Redis Incrby 命令

Redis Incrby 命令将 key 中储存的数字加上指定的增量值。

  • 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令。
  • 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。

本操作的值限制在 64 位(bit)有符号数字表示之内。

语法

redis Incrby 命令基本语法如下:

redis 127.0.0.1:6379> INCRBY KEY_NAME INCR_AMOUNT
可用版本

>= 1.0.0

返回值

加上指定的增量值之后, key 的值。

基于redis实现扣减库存的具体实现

  • 我们使用redis的lua脚本来实现扣减库存
  • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存
  • 库存扣减完之后可以进行一个异步的更改数据库数据,保证一致性

具体关于lua脚本的内容使用请移步至 redis命令参考–Script脚本 :

http://doc.redisfans.com/script/index.html

lua脚本注意点:

Lua脚本,是一种轻量级的脚本语言。设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua脚本的应用也很多,比如Nginx+Lua实现的OpenResty,Redis+Lua配合使用(Redisson中大量使用了Lua脚本)。

Lua脚本具有以下好处:

1、减少网络开销:Lua脚本在执行的时候,是先发送到Redis服务器的,然后在服务器上执行脚本。多个命令和业务逻辑都封装到脚本里,一次性提交到服务器。

2、原子性操作:我们都知道redis在执行命令时是单线程的,但是每个命令之间就存在并发的情况,就存在先查询再操作时,两个命令没办法保证线程安全。但使用Lua脚本时,redis把这个脚本操作当成是一个命令,那么这个脚本中的多条操作也就保证了原子性。(注意:只保证原子性,不是事务)

虽然Lua脚本有这么多优点,但是也不能乱用,使用的时候要注意:

1、Lua脚本可以在redis单机模式、主从模式、Sentinel集群模式下正常使用,但是无法在分片集群模式下使用。(脚本操作的key可能不在同一个分片)。(其实集群模式不支持问题也是可以解决的,在使用spring的RedisTemplate执行lua脚本时,报错EvalSha is not supported in cluster environment,不支持cluster。但是redis是支持lua脚本的,只要拿到原redis的connection对象,通过connection去执行即可,在后面会说下这个问题)

2、Lua脚本中尽量避免使用循环操作(可能引发死循环问题),尽量避免长时间运行。

3、redis在执行lua脚本时,默认最长运行时间时5秒,当脚本运行时间超过这一限制后,Redis将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。

初始化库存回调函数(IStockCallback )
/**
 * 获取库存回调
 * @author yuhao.wang
 */
public interface IStockCallback {
 
    /**
     * 获取库存
     * @return
     */
    int getStock();
}

扣减库存服务(StockService)

ackage com.xiaolyuh.service;
 
import com.xiaolyuh.lock.RedisLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
/**
 * 扣库存
 *
 * @author yuhao.wang
 */
@Service
public class StockService {
    Logger logger = LoggerFactory.getLogger(StockService.class);
 
    /**
     * 库存还未初始化
     */
    public static final long UNINITIALIZED_STOCK = -3L;
 
    /**
     * Redis 客户端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    /**
     * 执行扣库存的脚本
     */
    public static final String STOCK_LUA;
 
    static {
        /**
         *
         * @desc 扣减库存Lua脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock)0:表示没有库存
         * 库存(stock)大于0:表示剩余库存
         *
         * @params 库存key
         * @return
         *      -3:库存未初始化
         *   -2:库存不足
         *   -1:不限库存
         *   大于等于0:剩余库存(扣减之后剩余的库存),直接返回-1
         */
        StringBuilder sb = new StringBuilder();
        // exists 判断是否存在KEY,如果存在返回1,不存在返回0
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        // get 获取KEY的缓存值,tonumber 将redis数据转成 lua 的整形
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        // 如果拿到的缓存数等于 -1,代表改商品库存是无限的,直接返回1
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        // incrby 特性进行库存的扣减
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0-num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }
 
    /**
     * @param key           库存key
     * @param expire        库存有效时间,单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,避免并发时重复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final int initStock = stockCallback.getStock();
                        // 将库存设置到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
 
        }
        return stock;
    }
 
    /**
     * 扣库存
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的ARGV参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));
 
        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }
 
                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
 
 
 
/**
     * 加库存(还原库存)
     *
     * @param key    库存key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {
 
        return addStock(key, null, num);
    }
 
    /**
     * 加库存
     *
     * @param key    库存key
     * @param expire 过期时间(秒)
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判断key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }
 
        Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }
 
        return num;
    }
 
    /**
     * 获取库存
     *
     * @param key 库存key
     * @return -1:不限库存; 大于等于0:剩余库存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }
 
}
调用
@RestController
public class StockController {
 
    @Autowired
    private StockService stockService;
 
    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }
 
    /**
     * 获取初始的库存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 1000;
    }
 
    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.getStock(redisKey);
    }
 
    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品ID
        long commodityId = 2;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.addStock(redisKey, 2);
    }
}

思路理解

库存新增思路

库存新增的操作一般不存在高并发的情况,因为不可能某一种商品一直在新增库存,这属于管理员后台管理的一种操作。

这里新增库存采用了redis的

1.库存发生新增操作,调用层一般传过来商品的id标识和新增量,调用库存新增服务

2.库存新增服务

/**
 * 加库存(还原库存)
 * @param key    库存key
 * @param num    库存数量
 * @return 
 */
public long addStock(String key, int num) {

    return addStock(key, null, num);
}

库存新增服务主要是使用了redis的increment自增操作。

3.辛苦新增服务

boolean hasKey = redisTemplate.hasKey(key);
// 判断key是否存在,存在就直接更新
if (hasKey) {
    return redisTemplate.opsForValue().increment(key, num);
}

第一种情况是先判断redis中是否有这个商品库存的缓存,如果存在该商品库存,就直接进行增加操作;

Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
RedisLock redisLock = new RedisLock(redisTemplate, key);
try {
    if (redisLock.tryLock()) {
        // 获取到锁后再次判断一下是否有key
        hasKey = redisTemplate.hasKey(key);
        if (!hasKey) {
            // 初始化库存
            redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
        }
    }
} catch (Exception e) {
    logger.error(e.getMessage(), e);
} finally {
    redisLock.unlock();
}

return num;

然后是第二种情况了,就是redis中没有库存缓存了。所以就需要去初始化库存。因为初始化库存有一些非原子的操作,在分布式环境下不安全,所以这里先通过这个商品id获取分布式锁,拿到锁之后,再去判断一下redis中是否有这个缓存,确认没有,则可以进行初始化操作,然会返回数量,初始化操作可以从数据库查出真实库存的值,然后更新到缓存。

我这里的案列是直接把第一次传进来的库存数量进行初始化。

可能设计的问题

在对某key进行increment()方法时,可能会报错:

redis ERR value is not an integer or out of range 

这里库存新增我们使用的是RedisTemplateincrement的自增方法。

Spring对Redis序列化的策略有两种,分别是StringRedisTemplateRedisTemplate,其中StringRedisTemplate用于操作字符串,RedisTemplate使用的是JDK默认的二进制序列化。

大家都知道redis序列化是将key,value值先转换为流的形式,再存储到redis中。

RedisTemplate是使用的JdkSerializationRedisSerializer序列化,序列化后的值包含了对象信息,版本号,类信息等,是一串字符串,所以无法进行数值自增操作。

StringRedisTemplate序列化策略是字符串的值直接转为字节数组,所以存储到redis中是数值,所以可以进行自增操作。

所以问题出在这里,我们需要自定义序列化策略,在application启动类中添加如下:

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
   StringRedisTemplate template = new StringRedisTemplate(factory);
   //定义key序列化方式
   //RedisSerializer<String> redisSerializer = new StringRedisSerializer();//Long类型会出现异常信息;需要我们上面的自定义key生成策略,一般没必要
   //定义value的序列化方式
   Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
   ObjectMapper om = new ObjectMapper();
   om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
   om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
   jackson2JsonRedisSerializer.setObjectMapper(om);
 
   // template.setKeySerializer(redisSerializer);
   template.setValueSerializer(jackson2JsonRedisSerializer);
   template.setHashValueSerializer(jackson2JsonRedisSerializer);
   template.afterPropertiesSet();
   return template;

库存扣减思路

利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。

库存发生扣减操作,调用层一般传过来商品的id标识和扣减量,调用库存扣减服务

long stock = stock(key, num);

第一步是进行扣减操作,在正常情况下,如果缓存中存在库存数据,则会进行正常的扣减操作,并且返回结果。

// 初始化库存
if (stock == UNINITIALIZED_STOCK) {
    RedisLock redisLock = new RedisLock(redisTemplate, key);
    try {
        // 获取锁
        if (redisLock.tryLock()) {
            // 双重验证,避免并发时重复回源到数据库
            stock = stock(key, num);
            if (stock == UNINITIALIZED_STOCK) {
                // 获取初始化库存
                final int initStock = stockCallback.getStock();
                // 将库存设置到redis
                redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                // 调一次扣库存的操作
                stock = stock(key, num);
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
        redisLock.unlock();
    }

}

第二种是缓存中还没有数据的情况,则需要进行初始化操作。初始化库存存在非原子操作,所以需要使用分布式锁来实现。拿到锁之后,在进行一次库存扣减操作,看返回的结果还是不是没有缓存,这是进行一次双重验证,避免并发时重复回源到数据库。第二次验证的结果还是没有缓存的话,则需要进行一次初始化缓存操作。初始化操作可以从数据库查出真实库存的值,然后更新到缓存。然后再进行一次扣减操作。

可能存在的问题:

RedisTemplate执行lua脚本,集群模式下报错解决

在使用spring的RedisTemplate执行lua脚本时,报错EvalSha is not supported in cluster environment,不支持cluster。

但是redis是支持lua脚本的,只要拿到原redis的connection对象,通过connection去执行即可:

//spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本异常,此处拿到原redis的connection执行脚本
String result = (String)redisTemplate.execute(new RedisCallback<String>() {
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        Object nativeConnection = connection.getNativeConnection();
        // 集群模式和单点模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
        // 集群
        if (nativeConnection instanceof JedisCluster) {
            return (String) ((JedisCluster) nativeConnection).eval(LUA, keys, args);
        }

        // 单点
        else if (nativeConnection instanceof Jedis) {
            return (String) ((Jedis) nativeConnection).eval(LUA, keys, args);
        }
        return null;
    }
});

 

这篇关于Redis 实现库存扣减操作的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!