我首先用 SpringBoot Initializer 创建一个简单的 Demo,然后在 Demo 上进行修改,这样更便捷。项目结构如下图所示:
项目结构也很简单
PS:作为 Maven 项目,肯定还要有 pom.xml,图片中没有反映出来,所以我补充一下。
项目需要引入的依赖包括:
完整的 pom.xml 文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example.demo</groupId> <artifactId>pubsub</artifactId> <version>0.0.1-SNAPSHOT</version> <name>pubsub</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <slf4j.version>1.7.32</slf4j.version> <logback.version>1.2.6</logback.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
我们收到发布的消息后,需要处理逻辑,这部分逻辑写在 PrintMessageListener 中:
package com.example.demo.pubsub.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * 功能描述:打印收到的Redis信息 * * @author geekziyu * @version 1.0.0 */ @Slf4j public class PrintMessageListener implements MessageListener { private StringRedisSerializer stringRedisSerializer; public PrintMessageListener(StringRedisSerializer stringRedisSerializer) { this.stringRedisSerializer = stringRedisSerializer; } @Override public void onMessage(Message message, byte[] pattern) { String channel = stringRedisSerializer.deserialize(message.getChannel()); String body = stringRedisSerializer.deserialize(message.getBody()); handleMessage(channel, body); } private void handleMessage(String channel, String body) { log.info("消费Redis消息\n channel:{}\n body:{}", channel, body); } }
前面也说过了,我们要使用 spring-boot-starter-data-redis 中提供的API实现Redis发布和订阅消息,就需要用到 RedisTemplate 和 RedisMessageListenerContainer,现在就来把他们注入Spring容器:
package com.example.demo.pubsub.config; import com.example.demo.pubsub.listener.PrintMessageListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * 功能描述:Redis 配置 * * @author geekziyu * @version 1.0.0 */ @Configuration public class RedisConfiguration { @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer result = new RedisMessageListenerContainer(); result.setConnectionFactory(redisConnectionFactory); return result; } @Bean("redisTemplate") public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, String> result = new RedisTemplate<>(); result.setConnectionFactory(factory); result.setKeySerializer(stringRedisSerializer()); result.setHashKeySerializer(stringRedisSerializer()); result.setValueSerializer(stringRedisSerializer()); result.setHashValueSerializer(stringRedisSerializer()); return result; } @Bean public PrintMessageListener printMessageListener() { return new PrintMessageListener(stringRedisSerializer()); } @Bean public StringRedisSerializer stringRedisSerializer() { return new StringRedisSerializer(); } }
需要注意的有以下几点:
第一、如果不调用 setConnectionFactory(RedisConnectionFactory),给 RedisMessageListenerContainer 设置连接工厂,在调用 addMessageListener 执行订阅时,会出现空指针异常,具体发生异常的位置如下图:
第二、如果不调用 RedisTemplate 的 setConnectionFactory 方法设置Redis连接工厂,会在启动时就发生异常,如下图所示:
// 说明 RedisConnectionFactory 对于 RedisTemplate 而言是必需的! Caused by: java.lang.IllegalStateException: RedisConnectionFactory is required at org.springframework.util.Assert.state(Assert.java:76) at org.springframework.data.redis.core.RedisAccessor.afterPropertiesSet(RedisAccessor.java:38) at org.springframework.data.redis.core.RedisTemplate.afterPropertiesSet(RedisTemplate.java:128) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)
我这里用 AdminController 来接受发布和订阅/取消订阅的请求,源代码如下:
package com.example.demo.pubsub.controller; import com.example.demo.pubsub.listener.PrintMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; /** * 功能描述:后台控制器 * * @author geekziyu * @version 1.0.0 */ @RestController @RequestMapping("/admin") public class AdminController { @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private RedisMessageListenerContainer container; private Map<String, MessageListener> registeredListener = new HashMap<>(); @Autowired private StringRedisSerializer stringRedisSerializer; @GetMapping("/pub") public String publish(String channel, String body) { redisTemplate.convertAndSend(channel, body); return "ok"; } @GetMapping("/sub") public String subscribe(String channel) { MessageListener listener = registeredListener.computeIfAbsent(channel, ch -> new PrintMessageListener(stringRedisSerializer)); container.addMessageListener(listener, new ChannelTopic(channel)); return "ok"; } @GetMapping("/unsub") public String unsubscribe(String channel) { MessageListener messageListener = registeredListener.get(channel); if (messageListener != null) { container.removeMessageListener(messageListener, new ChannelTopic(channel)); } return "ok"; } }
为了顺利的在控制台输出日志,你可能需要 logback.xml 的完整代码:
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <root level="info"> <appender-ref ref="STDOUT"/> </root> </configuration>
这样,我们就已经可以实现发布订阅了。
首先订阅一下:
http://localhost:8080/admin/sub?channel=dream
再发布一下:
http://localhost:8080/admin/pub?channel=dream&body=engineer
检查控制台,Redis消息消费成功:
需要注意,你的 application.properties 中Redis的连接默认为 localhost:6379
:
spring.redis.host=localhost spring.redis.port=6379
你需要确保本地已经启动了Redis,且服务端口是6379。如果你不熟悉如何搭建Redis,那么你需要修改 Redis 连接到一个可用的 Redis 服务上去。
SpringBoot整合Redis实现消息发布订阅