这是本人根据黑马视频学习 Redis 的相关笔记,系列文章导航:《Redis设计与实现》笔记与汇总
业务流程:
在 service 层中完成相应的逻辑,即上图左侧的逻辑:
@Service @Slf4j public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService { @Override public Result sendCode(String phone, HttpSession session) { // 1. 校验手机号 if (RegexUtils.isPhoneInvalid(phone)) { // 2. 如果不符合,返回错误信息 return Result.fail("手机格式错误"); } // 3. 符合,生成验证码 String code = RandomUtil.randomNumbers(6); // 4. 保存验证码到 Session session.setAttribute("code", code); // 5. 模拟发送验证码 log.debug("验证码为: {}", code); // 6. 返回 ok return Result.ok(); } }
对应中间的流程图
@Override public Result login(LoginFormDTO loginForm, HttpSession session) { // 1. 校验手机号和验证码 if (RegexUtils.isPhoneInvalid(loginForm.getPhone())) { return Result.fail("手机格式错误"); } // 2. 校验验证码 Object code = session.getAttribute("code"); if (loginForm.getCode() == null || !loginForm.getCode().equals(code)) { // 3. 不一致,报错 return Result.fail("验证码格式错误"); } // 4. 一致,根据手机号查询用户 User user = query().eq("phone", loginForm.getPhone()).one(); // 5. 判断用户是否存在 if (user == null) { user = createUserWithPhone(loginForm.getPhone()); } session.setAttribute("user", user); return Result.ok(); } private User createUserWithPhone(String phone) { User user = new User(); user.setPhone(phone); user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10)); save(user); return user; }
public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1. 获取 session HttpSession session = request.getSession(); // 2. 获取 session 中的用户 User user = (User) session.getAttribute("user"); // 3. 判断用户是否存在 if (user == null) { response.setStatus(401); // 4. 不存在,拦截 return false; } // 5. 存在,保存用户信息到 ThreadLocal UserDTO userDTO = new UserDTO(); userDTO.setId(user.getId()); userDTO.setNickName(user.getNickName()); UserHolder.saveUser(userDTO); // 6. 放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
@Configuration public class MvcConfig implements WebMvcConfigurer { @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor()).excludePathPatterns( "/user/code", "/user/login", "/blog/hot", "/shop/**", "/shop-type/**", "/upload/**", "/voucher/**" ); } }
@Override public Result sendCode(String phone, HttpSession session) { // 1. 校验手机号 if (RegexUtils.isPhoneInvalid(phone)) { // 2. 如果不符合,返回错误信息 return Result.fail("手机格式错误"); } // 3. 符合,生成验证码 String code = RandomUtil.randomNumbers(6); // 4. 保存验证码到 Session //session.setAttribute("code", code); stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES); // 5. 模拟发送验证码 log.debug("验证码为: {}", code); // 6. 返回 ok return Result.ok(); }
@Override public Result login(LoginFormDTO loginForm, HttpSession session) { // 1. 校验手机号和验证码 if (RegexUtils.isPhoneInvalid(loginForm.getPhone())) { return Result.fail("手机格式错误"); } // 2. 校验验证码 // Object code = session.getAttribute("code"); String code = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + loginForm.getPhone()); if (loginForm.getCode() == null || !loginForm.getCode().equals(code)) { // 3. 不一致,报错 return Result.fail("验证码格式错误"); } // 4. 一致,根据手机号查询用户 User user = query().eq("phone", loginForm.getPhone()).one(); // 5. 判断用户是否存在 if (user == null) { user = createUserWithPhone(loginForm.getPhone()); } // 保存用户信息到 Redis 当中 // session.setAttribute("user", user); String token = UUID.randomUUID().toString(); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((filedName, filedValue) -> filedValue.toString() )); String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, userMap); stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES); return Result.ok(); }
第一个用来刷新过期时间
public class RefreshTokenInterceptor implements HandlerInterceptor { private final StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1. 获取 session // HttpSession session = request.getSession(); String token = request.getHeader("authorization"); if (StrUtil.isBlank(token)) { return true; } String tokenKey = RedisConstants.LOGIN_USER_KEY + token; Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey); // 3. 判断用户是否存在 if (userMap.isEmpty()) { return true; } // 2. 获取 session 中的用户 UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); // 5. 存在,保存用户信息到 ThreadLocal UserHolder.saveUser(userDTO); // 刷新过期时间 stringRedisTemplate.expire(tokenKey, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES); // 6. 放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
第二个用来判断是否需要登录
public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1. 判断是否需要拦截 if (UserHolder.getUser() == null) { response.setStatus(401); return false; } return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
配置一下加载顺序:
@Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor()).excludePathPatterns( "/user/code", "/user/login", "/blog/hot", "/shop/**", "/shop-type/**", "/upload/**", "/voucher/**" ).order(1); registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0); } }
order
设置了拦截器加载的顺序@Service public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result queryById(Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; // 尝试从Redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } // 2. 不存在,根据id查询数据库 Shop shop = getById(id); if (shop == null) { return Result.fail("店铺不存在"); } // 写入 Redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop)); return Result.ok(shop); } }
主动更新 的三种策略:
左侧方案的三个问题:
先删除缓存还是先操作数据库?
都有问题,右边可能性低,选右边
这里的策略是:当更新时顺便删除缓存
@Override @Transactional public Result update(Shop shop) { Long id = shop.getId(); if (id == null) { return Result.fail("店铺ID不能为空"); } // 1. 更新数据库 updateById(shop); // 2. 删除缓存 String key = RedisConstants.CACHE_SHOP_KEY + id; stringRedisTemplate.delete(key); return Result.ok(); }
这里采用缓存空对象的方式进行解决,需要改动的地方有:
public Result queryById(Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; // 尝试从Redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } if (shopJson != null) { // 说明之前被加入到Redis中了! return Result.fail("店铺信息不存在"); } // 2. 不存在,根据id查询数据库 Shop shop = getById(id); if (shop == null) { // 添加空值到 Redis stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES); return Result.fail("店铺不存在"); } // 写入 Redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); return Result.ok(shop); }
两种解决方案:
两种解决方案的对比:
下面来代码实践一下
这里的流程如下:
所谓的锁不是在学 JUC 时用到的各种锁,而是用 Redis 的 setnx 来设置
private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock(String key) { stringRedisTemplate.delete(key); }
业务流程:
public Shop queryWithPassThroughMutex(Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; // 尝试从Redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { return JSONUtil.toBean(shopJson, Shop.class); } if (shopJson != null) { return null; } // 2. 不存在,根据id查询数据库 // 获取互斥锁 String lockKey = null; Shop shop = null; try { lockKey = RedisConstants.LOCK_SHOP_KEY + id; if (!tryLock(lockKey)) { Thread.sleep(50); queryWithPassThroughMutex(id); } shop = getById(id); // 模拟等待 Thread.sleep(200); if (shop == null) { stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES); return null; } // 写入 Redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放互斥锁 unlock(lockKey); } return shop; }
我们将基于 StringRedisTemplate 封装一个工具类,满足如下需求:
代码地址:huyi612/hm-dianping - Gitee.com
(上面代码有部分有问题, 见下面的评论)
利用 Redis 的 icr 方法, 加上时间戳等信息生成 id
@Component public class RedisIdWorker { private final StringRedisTemplate stringRedisTemplate; public RedisIdWorker(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } private static final long BEGIN_TIMESTAMP = 1640995200L; private static final int COUNT_BITS = 32; public long nextId(String keyPrefix) { // 1. 生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; System.out.println(timestamp); // 2. 生成序列号 String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd")); long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); System.out.println(count); // 3. 拼接返回 return timestamp << COUNT_BITS | count; } }
@Override @Transactional public Result seckillVoucher(Long voucherId) { // 1. 查询 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2. 判断秒杀是否开始或结束 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } // 3. 判断库存 if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } // 4. 扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1").eq("voucher_id", voucherId).update(); if (!success) { return Result.fail("库存不足!"); } // 5. 创建订单 VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(UserHolder.getUser().getId()); voucherOrder.setPayType(1); voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 6. 返回订单 return Result.ok(orderId); }
用 JMeter 模拟多个用户同时参与秒杀, 有可能会出现线程安全问题(不过很奇怪, 我这里自己测试的时候并没有出现这个问题)
乐观锁的实现方案:
CAS, 这里的场景似乎也不需要担心 ABA 问题, 所以只用一个库存量就行了, 不用专门搞一个版本
当然, 这里 eq 显然有些严格, 很容易造成还有券, 但是都没抢到的问题 (这里可没有失败了重试之类的说法)
所以实践中我们只需让 库存大于 0 即可
.gt("stock", 0);
同样会有线程安全的问题, 这里似乎不太适合用乐观锁, 毕竟是添加不同的信息, 而不是对一条信息的多次修改, 所以用悲观锁
@Override public Result seckillVoucher(Long voucherId) { // 1. 查询 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2. 判断秒杀是否开始或结束 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!"); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!"); } // 3. 判断库存 if (voucher.getStock() < 1) { return Result.fail("库存不足!"); } // 6. 返回订单 return createVoucherOrder(voucherId); } @Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); VoucherOrder voucherOrder = null; synchronized (userId.toString().intern()) { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("用户已经购买过了!"); } // 4. 扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1").eq("voucher_id", voucherId).update(); if (!success) { return Result.fail("库存不足!"); } // 5. 创建订单 voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setPayType(1); voucherOrder.setVoucherId(voucherId); save(voucherOrder); } return Result.ok(voucherOrder); }
这段代码有两个学到的地方:
但是集群状态下, 仅依靠 JVM 自身的锁是不能实现的! 要用分布式锁
定义接口:
public interface ILock { boolean tryLock(long timeoutSec); void unlock(); }
实现类:
public class SimpleRedisLock implements ILock{ private final String name; private final StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; @Override public boolean tryLock(long timeoutSec) { // 获取线程标识 long threadId = Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); // 防止空指针的风险 return Boolean.TRUE.equals(success); } @Override public void unlock() { stringRedisTemplate.delete(KEY_PREFIX + name); } }
修改业务
@Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); boolean isLock = simpleRedisLock.tryLock(1200); if (!isLock) { // 获取锁失败 return Result.fail("不允许重复下单"); } VoucherOrder voucherOrder = null; try { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { return Result.fail("用户已经购买过了!"); } // 4. 扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1").eq("voucher_id", voucherId).update(); if (!success) { return Result.fail("库存不足!"); } // 5. 创建订单 voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setPayType(1); voucherOrder.setVoucherId(voucherId); save(voucherOrder); } catch (Exception e) { e.printStackTrace(); } finally { simpleRedisLock.unlock(); } return Result.ok(voucherOrder); }
当前方案的问题一:
超时释放后, 线程会在运行结束后把别人的锁解开了
解决方法: 解锁前判断一下标识
private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; @Override public boolean tryLock(long timeoutSec) { // 获取线程标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); // 防止空指针的风险 return Boolean.TRUE.equals(success); } @Override public void unlock() { // 获取线程标识 String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)) { stringRedisTemplate.delete(KEY_PREFIX + name); } }
当前的问题二:
如何保证原子性? 比如判断锁和释放锁两个步骤是分开的, 没有一致性, 所以仍然有可能把别人的锁给释放了
答: 借助 Lua 脚本 Lua 的基础教程入门 : Lua基础教程笔记
Redis 提供了 Lua 脚本功能, 在一个脚本中编写多条 Redis 命令, 确保多条命令执行时的原子性
Lua 脚本用如下方式执行 redis 的命令:
redis.call("set", "name", "rose");
Redis 用如下方式执行脚本: (1 表示有一对参数)
EVAL "return redis.call(xx, xx, xx)" 1 name rose
创建一个 unlock.lua
local id = redis.call('get', KEYS[1]) if (id == ARGV[1]) then return redis.call('del', KEYS[1]) end return 0
在 Lock 类中添加相关逻辑:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public void unlock() { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }
目录 · redisson/redisson Wiki
自己实现的锁的问题:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
创建一个 bean
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.137.112:6379").setPassword("abc123"); return Redisson.create(config); } }
获取锁:
RLock lock = redissonClient.getLock("lock:order:" + userId);
原来的程序, 当用户进行秒杀操作时, 许多操作都在一个线程中完成, 用户必须要等待所有操作完成后才能看到结果, 并且其中很多操作要直接和 MySQL 打交道:
考虑现实生活中的情况, 假如你去学校食堂吃饭, 某一家的饭特别好吃, 供不应求, 那么一种方法是排一个长长的队, 付款后就站着等, 等饭好了再端走.
我们上面的实现就是这种情况. 而现在更多的一种情况是, 下完单后, 给你一张小票, 之后等做好了会叫号让你来取餐.
所以另一种思路是: 把一些操作放在 Redis 中完成
可以用 Lua 脚本来完成一些逻辑功能, 用 Set 结构来判断是否购买过
在创建秒杀券时同时保存到 redis 当中
@Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存优惠券 save(voucher); // 保存秒杀信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); // 秒杀优化: 保存优惠券信息到 Redis 当中 stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
创建 lua 脚本
-- 1. 参数列表 -- 1.1 优惠券id local voucherId = ARGV[1] -- 1.2 用户 Id local userId = ARGV[2] -- 2. 数据 key -- 2.1 库存 key local stockKey = "seckill:stock:" .. voucherId -- 2.2 订单 key local orderKey = "seckill:order:" .. voucherId -- 3. 脚本业务 -- 3.1 库存不足 if (tonumber(redis.call('get', stockKey)) <= 2) then return 1 end -- 3.2 判断用户是否下单 if (redis.call('sismember', orderKey, userId) == 1) then return 2 end -- 3.3 扣库存, 加用户 redis.call('incrby', stockKey, -1) redis.call('sadd', orderKey, userId) return 0
更改逻辑为直接修改 Redis 中的数据
private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); // 1. 执行 lua 脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); // 2. 判断结果 assert result != null; int r = result.intValue(); if (r != 0) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 3. 返回 long orderId = redisIdWorker.nextId("order"); // TODO 保存到阻塞队列 return Result.ok(orderId); }
利用一个阻塞队列来异步处理耗时的操作
private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { VoucherOrder take = orderTasks.take(); createVoucherOrder(take); } catch (InterruptedException e) { log.error("订单处理异常",e); } } } } private void createVoucherOrder(VoucherOrder voucherOrder) { long userId = voucherOrder.getUserId(); long voucherId = voucherOrder.getVoucherId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock) { // 获取锁失败 log.error("不允许重复下单"); return; } try { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); if (count > 0) { log.error("用户已经购买过了!"); return; } // 4. 扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1").eq("voucher_id", voucherId).update(); if (!success) { log.error("库存不足!"); return; } save(voucherOrder); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }
但是 BlockQueue 是基于内存的,有如下的问题
可以借助一些成熟的方案来解决这个问题
Redis 提供了三种不同的方式来实现消息队列:
list结构
:基于 List 结构模拟消息队列PubSub
:基本的点对点消息模型Stream
:比较完善的消息队列模型Redis 的 list 数据结构是一个双向链表,很容易模拟出队列效果
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP 来实现
不过要注意的是,当队列中没有消息时 RPOP 或 LPOP 操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。因此这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果
优点:
缺点:
优点:
缺点:
特点:
特点:
在 Redis 中创建一个组:
XGROUP CREATE stream.orders g1 0 MKSTREAM
修改之前的 Lua 脚本:
业务逻辑方面:
添加到消息队列的逻辑放在了 Lua 脚本中
@Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1. 执行 lua 脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); // 2. 判断结果 assert result != null; int r = result.intValue(); if (r != 0) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 3. 返回 return Result.ok(orderId); }
处理消息队列的代码如下:
private class VoucherOrderHandler implements Runnable { @Override public void run() { String queueName = "stream.orders"; while (true) { try { // 1. 获取消息队列中的订单信息 // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); // 2. 判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null 没有消息, 继续循环 continue; } System.out.println("=================" + list.size()); // 3. 解析消息 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 4. 创建订单 createVoucherOrder(voucherOrder); // 5. 确认消息 // XACK s1 g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("订单处理异常",e); handlePendingList(); } } } }
处理 Pending 队列的代码如下,和上面的逻辑相仿:
private void handlePendingList() { String queueName = "stream.orders"; while (true) { try { // 1. 获取PendingList 中的订单信息 // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create(queueName, ReadOffset.from("0")) ); // 2. 判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null 没有消息, 继续循环 break; } // 3. 解析消息 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 4. 创建订单 createVoucherOrder(voucherOrder); // 5. 确认消息 // XACK s1 g1 id stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("Pending 订单处理异常",e); try { Thread.sleep(1000); } catch (InterruptedException ex) { ex.printStackTrace(); } } } }