小伙伴们,你们好呀!我是老寇!废话不多说,跟我一起抢红包
1.运行效果图(b站-地址)
2.技术架构
3.技术选型
4.业务逻辑(重点)
5.分布式锁的设计(个人理解)
6.分布式锁的实现(核心代码)
7.websocket消息推送(核心代码)
redis分布式锁实现抢红包
基础框架:springboot + springcloud
消息队列:rabbitMQ
数据缓存:redis
消息推送:websocket
为什么要用redis实现分布式锁的方案,放弃synchronized的方案,采用synchronized在分布式系统无法做到持有的锁是同一把锁(单体应用适用,不适用于微服务应用),因此这个方案被丢弃啦
* 进阶-> 采用gateway网关限流 -> 令牌桶算法 + redis,限制并发量,请求过多放入队列->rabbitmq * 1.判断红包是否还有剩余的(套路:先在redis里面读,redis没有数据,然后去mysql查询放到redis) * 2.判断该用户是否已经抢了红包 * 3.获取红包金额,并剩余红包数和红包金额写入redis * 4.保存抢红包的记录(记录放入mysql) * 5.响应给前端(调用消息服务,用websocket推送消息到前端->用户点进来的时候就必须初始化连接)
在分布式系统中,会遇到服务宕机的情况,服务宕机有很多情况,比如说内存不足,流量暴增等等都会让服务宕机,因此设计分布式锁就要考虑两点:
1.原子性:理解为一个线程持有锁的时候,其他线程只能等待,当该锁释放时,其他线程才有去抢占该锁的机会,最终只会有一个线程持有该锁,我举个例子:大家都有乘坐电梯或摩天轮的经历,排除满员的情况,在同一时刻,按下电梯按钮,每个人做到电梯的概率是相同的,停到某一层,就是说该人可以做电梯,也就该层的人都持有这个电梯。这一栋的楼层可以看做一个个的线程,电梯大致可以看成锁,特别是吃饭或下班的时候,都想快点做到电梯,这个也是一个高并发的场景。
2.服务宕机:服务宕机导致锁无法释放,然后就造成了死锁的情况,设置过期时间,服务重启后,会自动清除过期的数据
基于原子性设计,因此就用lua脚本语句,获取锁,不要忘了释放锁(大牛二虎上代码)
private static final RedisScript<String> SCRIPT_LOCK = new DefaultRedisScript<>( //当这个key不存在,则设置一个值,并设置过期时间(px表示毫秒) "return redis.call('set',KEYS[1],ARGV[1],'NX','PX',ARGV[2])",String.class ); private static final RedisScript<String> SCRIPT_UNLOCK = new DefaultRedisScript<>( //判断有这个key没有返回'false',有则删除 并返回'true' "if redis.call('get',KEYS[1]) == ARGV[1] then return tostring(redis.call('del',KEYS[1]) == 1) else return 'false' end",String.class ); private static final String LOCK_SUCCESS = "OK"; @Autowired private RedisTemplate redisTemplate; @Override public boolean acquireLock(String lockKey, String lockValue) { Object lockResult = redisTemplate.execute(SCRIPT_LOCK, redisTemplate.getStringSerializer(), redisTemplate.getStringSerializer(), Collections.singletonList(lockKey), lockValue); return LOCK_SUCCESS.equals(lockResult); } @Override public boolean releaseLock(String lockKey,String lockValue) { Object releaseResult = redisTemplate.execute(SCRIPT_UNLOCK, redisTemplate.getStringSerializer(), redisTemplate.getStringSerializer(), Collections.singletonList(lockKey), lockValue); return Boolean.valueOf(releaseResult.toString()); }
package io.laokou.chat.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author 寇申海 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
package io.laokou.chat.websocket; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.validation.constraints.NotNull; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; /** * @author 寇申海 */ @Component @Data @Slf4j @ServerEndpoint("/ws/{userId}") public class WebSocketServer { /** * 静态变量,用来记录当前在线连接数。设计成线程安全 */ private static int onlineCount = 0; /** * concurrent包的线程安全Set,用来存放每个客户端对应的websocketserver对象 */ private static CopyOnWriteArraySet<WebSocketServer> webSocketServerCopyOnWriteArraySet = new CopyOnWriteArraySet<>(); /** * 与某些客户端的连接会话,需要通过它来给客户打发送数据 */ private Session session; /** * 接收userId */ private Long userId; /** * 连接成功后回调方法 * @param session * @param userId * @throws IOException */ @OnOpen public void onOpen(Session session, @PathParam("userId")Long userId) throws IOException { this.session = session; //先设置在添加 this.userId = userId; boolean addFlag = webSocketServerCopyOnWriteArraySet.add(this); if (addFlag) { addOnlineCount(); } log.info("新加入:{}",userId,",在线人数:{}",getOnlineCount()); } /** * 连接关闭调用 * @throws IOException */ @OnClose public void onClose() throws IOException { boolean removeFlag = webSocketServerCopyOnWriteArraySet.remove(this); if (removeFlag) { subOnlineCount(); } log.info("当前在线人数:{}",getOnlineCount()); } /** * 收到客户端消息后调用 * @param message * @param session * @throws IOException */ @OnMessage public void onMessage(String message,Session session) throws IOException { log.info("收到来自:{}",this.userId,"的消息:{}",message); for (WebSocketServer webSocketServer:webSocketServerCopyOnWriteArraySet) { log.info("在线用户:{}" , webSocketServer.userId); webSocketServer.SendMessage(message); } } /** * 发生错误时调用 * @param session * @param throwable */ @OnError public void one rror(Session session, @NotNull Throwable throwable){ log.error("发生错误:{}",throwable.getMessage()); throwable.printStackTrace(); } /** * 发送消息 * @param message * @throws IOException */ private void SendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发送自定义消息 * @param message * @param userId * @throws IOException */ public void sendMessages(String message,Long userId)throws IOException{ for (WebSocketServer webSocketServer:webSocketServerCopyOnWriteArraySet){ if (userId == null) { log.info("推送消息给:{}" , webSocketServer.userId + ",推送内容:{}" , message); webSocketServer.SendMessage(message); } else if (userId.equals(webSocketServer.userId)) { log.info("推送消息给:{}" , webSocketServer.userId + ",推送内容:{}" , message); webSocketServer.SendMessage(message); } } } /** * 返回在线数 * @return */ private static synchronized int getOnlineCount(){ return onlineCount; } /** * 连接人数增加时 */ private static synchronized void addOnlineCount(){ WebSocketServer.onlineCount++; } /** * 连接人数减少时 */ private static synchronized void subOnlineCount(){ WebSocketServer.onlineCount--; } }
如果这都还没理解,那就一键三连,深夜扣我私聊 (咳咳,最近加班严重 那必须得深夜,安排)