问题现象:测试环境单台部署,没有问题,生产环境多台部署订单都是2条重复数据。
问题描述:我们把每个服务都部署了2台,订单产生后,有redisson的mq发布,如果MQListener监听到就会执行后面的业务逻辑。现实的问题是2台MQListener都会监听到,会重复处理我们的逻辑,插入数据库或修改数据库或写入ES等都会执行2遍。
本文的DEMO中使用的是redisson的mq来测试的,同时RabbitMQ,ActiveMQ,RocketMQ也会有同样的问题,处理逻辑大家可以参照,应该都是大同小异。
解决方法:redisson公平锁【谁先抢到谁先锁,其余需要等待】加锁处理,只需要一台来处理。
------------------------------------------------核心代码----------------------------------------------------------
1、pom依赖及redisson插件
<!-- 插件地址https://gitee.com/ztp/redisson-spring-boot-starter --> <dependency> <groupId>com.zengtengpeng</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>1.0.8</version> </dependency>
2、application.yml配置文件
// 集群配置,先进先出原则 redisson: multiple-server-config: node-addresses[0]: 192.168.1.57:7000 node-addresses[1]: 192.168.1.57:7001 node-addresses[2]: 192.168.1.117:7000 node-addresses[3]: 192.168.1.57:7002 node-addresses[4]: 192.168.1.117:7001 node-addresses[5]: 192.168.1.117:7002 loadBalancer: org.redisson.connection.balancer.RoundRobinLoadBalancer readMode: MASTER subscriptionMode: MASTER password: redismima123 model: CLUSTER // 单节点配置 redisson: singleServerConfig: address: 192.168.1.119:6380 database: 3 password: redisMiMa123 model: SINGLE
3、springboot启动程序
package com; import com.zengtengpeng.annotation.EnableMQ; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; @Slf4j // 这里的注解要加上,不然MQ不生效 @EnableMQ public class OrderJobApplication { public static void main(String[] args) { SpringApplication.run(OrderJobApplication.class, args); log.info("OrderJob项目启动成功!!!"); } }
4、mq生产者
package com.services.impl; /** * * @author: renkai721@163.com * @date: 2021年09月16日 20:41:54 * @description: */ import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.Wrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.github.pagehelper.util.StringUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.redisson.api.*; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.net.URL; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; @Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private RedissonClient redissonClient; @Override public void createOrder(OrderParamReqVo vo) { // 虽然order的服务也部署了2台,但是从用户点击创建订单到后台网关, // 所有的nginx都只会转发到一台机器上,所以生产者不需要单独处理。 Long orderId = 0L; // 查询ID RLock idLock = redissonClient.getFairLock("orderIdLock"); try{ idLock.lock(); RBucket<Long> idBucket = redissonClient.getBucket("orderId"); synchronized (idBucket){ Long redisId = idBucket.get(); if(redisId != null){ redisId+=1; }else{ redisId = 1L; } orderId = redisId; idBucket.set(redisId); log.info("生成的的orderId="+orderId); } } catch (Exception e){ e.printStackTrace(); } finally { idLock.unlock(); } // topic名字和监听中的名字要一致,写法也有很多,大家按照自己喜欢的方式去写 RTopic orderMq = redissonClient.getTopic("orderTopic"); OrderParamRespVo obj = OrderParamRespVo.builder() .id(orderId) .userId(vo.getUserId()) .status(0) .createTime(new Date()) .build(); orderMq.publish(obj); log.info("order订单的MQ生成了,快快接收处理吧,obj={}",obj); } }
5、消费者
package com.mq.listener; import com.alibaba.druid.util.StringUtils; import com.zengtengpeng.annotation.MQListener; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBucket; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.concurrent.TimeUnit; /** * * @Description: * @author: renkai721@163.com * @date: 2021年09月19日 11:28 上午 */ @Component @Slf4j public class OrderMqListener { @Resource private RedissonClient redissonClient; @MQListener(name = "orderTopic") public void orderTopicSave(CharSequence charSequence, OrderParamRespVo vo, Object object){ // 这里的MQListener大家要注意,部署了3台,那么3台都会监听, // 如果有数据下发,3台会同时触发。 String value = getProcessId(); log.info("value={}",value); RBucket<String> idBucket = redissonClient.getBucket("mqOrderId"+vo.getOrderId); // redisson公平锁,谁先锁住谁使用 RLock idLock = redissonClient.getFairLock("mqLockOrderId"+vo.getOrderId); // 锁2秒,其余的处于等待,2秒后锁会自动解锁,也就是finally不需要单独处理 idLock.lock(2,TimeUnit.SECONDS); try { if(idLock.isLocked()){ synchronized (idBucket) { if(StringUtils.isEmpty(idBucket.get())){ // 这里的逻辑是使用了机器的进程ID+机器名来判断唯一标识的 // 如果最简单的就是idBucket.set("1"); // 只要idBucket有值就说明有人已经锁住在处理了。 idBucket.set(value,5, TimeUnit.MINUTES); } log.info("idBucket.get()={}",idBucket.get()); } } if(value.equals(idBucket.get())){ log.info("让我来处理吧,其它小伙伴休息一下吧!"); // 自己的写库或写redis逻辑处理 OrderDao.save(vo); log.info("orderJob MQ收到消息, 处理完毕。"); idBucket.delete(); }else { log.info("已经有人处理啦"); } } catch (Exception e) { e.printStackTrace(); } finally { if(idLock.isLocked() && idLock.isHeldByCurrentThread()){ idLock.unlock(); } } } public static final String getProcessId() { RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); return runtimeMXBean.getName(); } }