原标题:Spring认证中国教育管理中心-Spring Data Redis框架教程二
Redis Streams 以抽象方法对日志数据结构进行建模。通常,日志是仅附加的数据结构,从一开始就在随机位置或通过流式传输新消息使用。
在Redis 参考文档 中了解有关 Redis Streams 的更多信息。
Redis Streams 大致可以分为两个功能领域:
尽管此模式与Pub/Sub有相似之处,但主要区别在于消息的持久性以及它们的消费方式。
虽然 Pub/Sub 依赖于瞬时消息的广播(即,如果您不听,就会错过一条消息),而 Redis Stream 使用持久的、仅附加的数据类型,该数据类型会保留消息,直到流被修剪为止。消费的另一个区别是 Pub/Sub 注册了服务器端订阅。Redis 将到达的消息推送到客户端,而 Redis Streams 需要主动轮询。
在
org.springframework.data.redis.connection和org.springframework.data.redis.stream软件包提供了对Redis的数据流的核心功能。
要发送记录,您可以像其他操作一样使用低级RedisConnection或高级StreamOperations. 两个实体都提供add( xAdd) 方法,该方法接受记录和目标流作为参数。虽然RedisConnection需要原始数据(字节数组),但StreamOperations允许将任意对象作为记录传入,如下例所示:
// append message through connection RedisConnection con = … byte[] stream = … ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream); con.xAdd(record); // append message through RedisTemplate RedisTemplate template = … StringRecord record = StreamRecords.string(…).withStreamKey("my-stream"); template.streamOps().add(record);
流记录携带一个Map, 键值元组,作为它们的有效负载。将记录附加到流会返回RecordId可用作进一步参考的 。
在消费方面,一个人可以消费一个或多个流。Redis Streams 提供读取命令,允许从已知流内容内和流端之外的任意位置(随机访问)消费流以消费新的流记录。
在底层,RedisConnection提供了xRead和xReadGroup方法,分别映射了消费组内读取和读取的Redis命令。请注意,多个流可以用作参数。
Redis 中的订阅命令可能会阻塞。也就是说,调用xRead连接会导致当前线程在开始等待消息时阻塞。只有在读取命令超时或收到消息时才会释放线程。
要使用流消息,可以在应用程序代码中轮询消息,或者通过消息侦听器容器使用两种异步接收之一,命令式或反应式。每次有新记录到达时,容器都会通知应用程序代码。
虽然流消费通常与异步处理相关联,但也可以同步消费消息。重载StreamOperations.read(…)方法提供此功能。在同步接收期间,调用线程可能会阻塞,直到消息可用。该属性StreamReadOptions.block指定接收者在放弃等待消息之前应等待多长时间。
// Read message through RedisTemplate RedisTemplate template = … List<MapRecord<K, HK, HV>> messages = template.streamOps().read(StreamReadOptions.empty().count(2), StreamOffset.latest("my-stream")); List<MapRecord<K, HK, HV>> messages = template.streamOps().read(Consumer.from("my-group", "my-consumer"), StreamReadOptions.empty().count(2), StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
由于其阻塞性质,低级轮询没有吸引力,因为它需要对每个消费者进行连接和线程管理。为了缓解这个问题,Spring Data 提供了消息侦听器,它可以完成所有繁重的工作。如果您熟悉 EJB 和 JMS,您应该会发现这些概念很熟悉,因为它被设计为尽可能接近 Spring Framework 及其消息驱动的 POJO (MDP) 中的支持。
Spring Data 附带了两种针对所使用的编程模型量身定制的实现:
StreamMessageListenerContainer并StreamReceiver负责消息接收和分派到侦听器中进行处理的所有线程。消息侦听器容器/接收器是 MDP 和消息提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等。这让您作为应用程序开发人员可以编写与接收消息(并对其作出反应)相关的(可能很复杂)业务逻辑,并将样板 Redis 基础设施问题委托给框架。
这两个容器都允许运行时配置更改,以便您可以在应用程序运行时添加或删除订阅,而无需重新启动。此外,容器使用惰性订阅方法,RedisConnection仅在需要时使用。如果所有侦听器都取消订阅,它会自动执行清理,并释放线程。
以类似于 EJB 世界中的消息驱动 Bean (MDB) 的方式,流驱动 POJO (SDP) 充当流消息的接收器。SDP 的一个限制是它必须实现
org.springframework.data.redis.stream.StreamListener接口。还请注意,在您的 POJO 在多个线程上接收消息的情况下,确保您的实现是线程安全的很重要。
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> { @Override public void onMessage(MapRecord<String, String, String> message) { System.out.println("MessageId: " + message.getId()); System.out.println("Stream: " + message.getStream()); System.out.println("Body: " + message.getValue()); } }
StreamListener 表示一个函数式接口,因此可以使用它们的 Lambda 形式重写实现:
message -> { System.out.println("MessageId: " + message.getId()); System.out.println("Stream: " + message.getStream()); System.out.println("Body: " + message.getValue()); };
一旦你实现了你的StreamListener,就可以创建一个消息监听器容器并注册一个订阅:
RedisConnectionFactory connectionFactory = … StreamListener<String, MapRecord<String, String, String>> streamListener = … StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions .builder().pollTimeout(Duration.ofMillis(100)).build(); StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory, containerOptions); Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);
请参阅各种消息侦听器容器的 Javadoc,以获取每个实现支持的功能的完整描述。
流数据源的反应性消费通常通过一系列Flux事件或消息发生。反应式接收器实现提供了StreamReceiver及其重载的receive(…)消息。与
StreamMessageListenerContainer利用驱动程序提供的线程资源相比,反应式方法需要更少的基础设施资源,例如线程。接收流是一个需求驱动的发布者StreamMessage:
Flux<MapRecord<String, String, String>> messages = … return messages.doOnNext(it -> { System.out.println("MessageId: " + message.getId()); System.out.println("Stream: " + message.getStream()); System.out.println("Body: " + message.getValue()); });
现在我们需要创建StreamReceiver并注册一个订阅来消费流消息:
ReactiveRedisConnectionFactory connectionFactory = … StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100)) .build(); StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options); Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));
请参阅各种消息侦听器容器的 Javadoc,以获取每个实现支持的功能的完整描述。
需求驱动的消费使用背压信号来激活和停用轮询。StreamReceiver如果需求得到满足,订阅将暂停轮询,直到订阅者发出进一步的需求信号。根据ReadOffset策略,这可能会导致消息被跳过。
当您通过 a 阅读消息时Consumer Group,服务器将记住给定的消息已传递并将其添加到待处理条目列表 (PEL)。已发送但尚未确认的消息列表。
消息必须通过确认
StreamOperations.acknowledge才能从待处理条目列表中删除,如下面的片段所示。
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ... container.receive(Consumer.from("my-group", "my-consumer"), StreamOffset.create("my-stream", ReadOffset.lastConsumed()), msg -> { // ... redisTemplate.opsForStream().acknowledge("my-group", msg); });
从组my-group 中读取为my-consumer。接收到的消息不被确认。
处理后确认消息。
要在接收时自动确认消息,请使用receiveAutoAck而不是receive.
流读取操作接受读取偏移量规范以从给定偏移量开始消费消息。ReadOffset表示读取偏移规范。Redis 支持 3 种偏移量变体,具体取决于您是独立使用流还是在消费者组中使用流:
在基于消息容器的消费上下文中,我们需要在消费消息时提前(或增加)读取偏移量。推进取决于请求ReadOffset和消费模式(有/没有消费者组)。以下矩阵解释了容器如何前进ReadOffset:
从特定的消息 ID 和最后消费的消息中读取可以被视为安全操作,可确保消费附加到流的所有消息。使用最新的消息进行读取可以跳过轮询操作处于死时间状态时添加到流中的消息。轮询引入了一个死区时间,其中消息可以在各个轮询命令之间到达。流消费不是线性连续读取,而是拆分为重复XREAD调用。
发送到流的任何记录都需要序列化为其二进制格式。由于流与散列数据结构的接近性,流键、字段名称和值使用在RedisTemplate.
请确保检查RedisSerializers in use 并注意,如果您决定不使用任何序列化程序,则需要确保这些值已经是二进制的。
StreamOperations允许通过 将简单值ObjectRecord直接附加到流中,而无需将这些值放入Map结构中。然后将该值分配给_有效载荷_字段,并且可以在读回该值时提取该值。
ObjectRecord<String, String> record = StreamRecords.newRecord() .in("my-stream") .ofObject("my-value"); redisTemplate() .opsForStream() .add(record); List<ObjectRecord<String, String>> records = redisTemplate() .opsForStream() .read(String.class, StreamOffset.fromStart("my-stream"));
XADD my-stream * “_class” “java.lang.String” “_raw” “my-value”
ObjectRecords 与所有其他记录都经过完全相同的序列化过程,因此也可以使用返回 a 的无类型读取操作获取 Record MapRecord。
可以通过 3 种方式向流中添加复杂值:
第一个变体是最直接的变体,但忽略了流结构提供的字段值功能,流中的值仍然可以被其他消费者读取。第二个选项与第一个选项具有相同的好处,但可能会导致非常具体的消费者限制,因为所有消费者都必须实现完全相同的序列化机制。该HashMapper方法使用蒸汽散列结构稍微复杂一点,但将源扁平化。只要选择了合适的序列化程序组合,其他消费者仍然能够读取记录。
HashMappers 将有效负载转换为Map具有特定类型的 a。确保使用能够(反)序列化散列的散列键和散列值序列化程序。
ObjectRecord<String, User> record = StreamRecords.newRecord() .in("user-logon") .ofObject(new User("night", "angel")); redisTemplate() .opsForStream() .add(record); List<ObjectRecord<String, User>> records = redisTemplate() .opsForStream() .read(User.class, StreamOffset.fromStart("user-logon"));
XADD 用户登录 * “_class” “com.example.User” “firstname” “night” “lastname” “angel”
StreamOperations默认使用ObjectHashMapper。您可以HashMapper在获取时提供一个适合您的要求StreamOperations。
redisTemplate() .opsForStream(new Jackson2HashMapper(true)) .add(record);
XADD 用户登录 * “firstname” “night” “@class” “com.example.User” “lastname” “angel”
AStreamMessageListenerContainer可能不知道@TypeAlias域类型上使用的任何内容,因为那些需要通过MappingContext. 确保RedisMappingContext使用initialEntitySet.
@Bean RedisMappingContext redisMappingContext() { RedisMappingContext ctx = new RedisMappingContext(); ctx.setInitialEntitySet(Collections.singleton(Person.class)); return ctx; } @Bean RedisConverter redisConverter(RedisMappingContext mappingContext) { return new MappingRedisConverter(mappingContext); } @Bean ObjectHashMapper hashMapper(RedisConverter converter) { return new ObjectHashMapper(converter); } @Bean StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) { StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder() .objectMapper(hashMapper) .build(); return StreamMessageListenerContainer.create(connectionFactory, options); }
Redis的提供支持的交易通过multi,exec和discard命令。这些操作在 上可用RedisTemplate。但是,RedisTemplate不能保证在同一个连接中运行事务中的所有操作。
Spring Data Redis 提供了SessionCallback接口,供需要对同一个 执行多个操作connection时使用,例如使用Redis 事务时。以下示例使用该multi方法:
//execute a transaction List<Object> txResults = redisTemplate.execute(new SessionCallback<List<Object>>() { public List<Object> execute(RedisOperations operations) throws DataAccessException { operations.multi(); operations.opsForSet().add("key", "value1"); // This will contain the results of all operations in the transaction return operations.exec(); } }); System.out.println("Number of items added to set: " + txResults.get(0));
RedisTemplate使用其值、散列键和散列值序列化器exec在返回之前反序列化所有结果。还有一种exec方法可以让您为事务结果传递自定义序列化程序。
从 1.1 版开始,exec对RedisConnection和的方法进行了重要更改RedisTemplate。以前,这些方法直接从连接器返回事务的结果。这意味着数据类型通常与从 的方法返回的数据类型不同RedisConnection。例如,zAdd返回一个布尔值,指示元素是否已添加到排序集中。大多数连接器将此值返回为 long,并且 Spring Data Redis 执行转换。另一个常见的区别是,大多数连接器OK为诸如set. 这些回复通常会被 Spring Data Redis 丢弃。在 1.1 之前,未对exec. 此外,结果没有反序列化RedisTemplate,因此它们通常包含原始字节数组。如果此更改破坏了您的应用程序,请设置
convertPipelineAndTxResults为falseon 您RedisConnectionFactory以禁用此行为。
默认情况下,RedisTemplate不参与托管 Spring 事务。如果您想RedisTemplate在使用@Transactional或时使用 Redis 事务TransactionTemplate,则需要RedisTemplate通过设置为每个显式启用事务支持
setEnableTransactionSupport(true)。启用事务支持绑定RedisConnection到由ThreadLocal. 如果事务完成且没有错误,Redis 事务将使用 提交EXEC,否则使用 回滚DISCARD。Redis 事务是面向批处理的。在正在进行的事务期间发出的命令被排队,并且仅在提交事务时应用。
Spring Data Redis 在正在进行的事务中区分只读和写命令。只读命令,例如KEYS,通过管道传输到新的(非线程绑定)RedisConnection以允许读取。写入命令由RedisTemplate提交排队并在提交时应用。
以下示例显示了如何配置事务管理:
示例 3. 启用事务管理的配置
@Configuration @EnableTransactionManagement public class RedisTxContextConfiguration { @Bean public StringRedisTemplate redisTemplate() { StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory()); // explicitly enable transaction support template.setEnableTransactionSupport(true); return template; } @Bean public RedisConnectionFactory redisConnectionFactory() { // jedis || Lettuce } @Bean public PlatformTransactionManager transactionManager() throws SQLException { return new DataSourceTransactionManager(dataSource()); } @Bean public DataSource dataSource() throws SQLException { // ... } }
配置 Spring Context 以启用声明式事务管理。
配置RedisTemplate为通过将连接绑定到当前线程来参与事务。
事务管理需要一个
PlatformTransactionManager. Spring Data Redis 不附带PlatformTransactionManager实现。假设您的应用程序使用 JDBC,Spring Data Redis 可以使用现有的事务管理器参与事务。
以下示例分别演示了使用限制:
示例 4. 使用限制
// must be performed on thread-bound connection template.opsForValue().set("thing1", "thing2"); // read operation must be run on a free (not transaction-aware) connection template.keys("*"); // returns null as values set within a transaction are not visible template.opsForValue().get("thing1");
Redis 提供对流水线的支持,这涉及向服务器发送多个命令而无需等待回复,然后一步读取回复。当您需要连续发送多个命令时,流水线可以提高性能,例如将许多元素添加到同一个 List。
Spring Data Redis 提供了多种RedisTemplate在管道中运行命令的方法。如果你不关心流水线操作的结果,你可以使用标准execute方法,传递true的pipeline参数。这些executePipelined方法运行提供的RedisCallback或SessionCallback在管道中并返回结果,如以下示例所示:
//pop a specified number of items from a queue List<Object> results = stringRedisTemplate.executePipelined( new RedisCallback<Object>() { public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisConnection stringRedisConn = (StringRedisConnection)connection; for(int i=0; i< batchSize; i++) { stringRedisConn.rPop("myqueue"); } return null; } });
前面的示例从管道中的队列中批量右弹出项目。在results List包含了所有的弹出项目。RedisTemplate在返回之前使用其值、哈希键和哈希值序列化器对所有结果进行反序列化,因此前面示例中的返回项是字符串。还有其他executePipelined方法可让您为流水线结果传递自定义序列化程序。
请注意,从 返回的值RedisCallback必须是null,因为为了返回流水线命令的结果而丢弃该值。
Lettuce 驱动程序支持细粒度的刷新控制,允许在命令出现时刷新、缓冲或在连接关闭时发送它们。
LettuceConnectionFactory factory = // …
factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3));
在本地缓冲并在每第三个命令后刷新。
从 1.1 版开始,exec对RedisConnection和的方法进行了重要更改RedisTemplate。以前,这些方法直接从连接器返回事务的结果。这意味着数据类型通常与从 的方法返回的数据类型不同RedisConnection。例如,zAdd返回一个布尔值,指示元素是否已添加到排序集中。大多数连接器将此值返回为 long,并且 Spring Data Redis 执行转换。另一个常见的区别是,大多数连接器OK为诸如set. 这些回复通常会被 Spring Data Redis 丢弃。在 1.1 之前,未对exec. 此外,结果没有反序列化RedisTemplate,因此它们通常包含原始字节数组。如果此更改破坏了您的应用程序,请设置
convertPipelineAndTxResults为falseon 您RedisConnectionFactory以禁用此行为。
Redis 2.6 及更高版本支持通过eval和evalsha命令运行 Lua 脚本。Spring Data Redis 为运行脚本提供了高级抽象,该脚本处理序列化并自动使用 Redis 脚本缓存。
脚本可以通过调用运行execute的方法RedisTemplate和ReactiveRedisTemplate。两者都使用可配置ScriptExecutor(或ReactiveScriptExecutor)来运行提供的脚本。默认情况下,ScriptExecutor(or ReactiveScriptExecutor) 负责序列化提供的键和参数并反序列化脚本结果。这是通过模板的键和值序列化器完成的。还有一个额外的重载,允许您为脚本参数和结果传递自定义序列化程序。
默认ScriptExecutor通过检索脚本的 SHA1 并尝试首先运行来优化性能,如果脚本尚未出现在 Redis 脚本缓存中evalsha,eval则回退到。
以下示例使用 Lua 脚本运行常见的“检查并设置”场景。这是 Redis 脚本的理想用例,因为它需要原子地运行一组命令,并且一个命令的行为受另一个命令的结果影响。
@Bean public RedisScript<Boolean> script() { ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("META-INF/scripts/checkandset.lua")); return RedisScript.of(scriptSource, Boolean.class); }
public class Example { @Autowired RedisScript<Boolean> script; public boolean checkAndSet(String expectedValue, String newValue) { return redisTemplate.execute(script, singletonList("key"), asList(expectedValue, newValue)); } }
-- checkandset.lua local current = redis.call('GET', KEYS[1]) if current == ARGV[1] then redis.call('SET', KEYS[1], ARGV[2]) return true end return false
前面的代码配置了一个RedisScript指向名为 的文件checkandset.lua,它应该返回一个布尔值。该脚本resultType应该是一个Long,Boolean,List或反序列化的值类型。null如果脚本返回丢弃状态(特别是OK),也可能是这样。
最好DefaultRedisScript在应用程序上下文中配置单个实例,以避免在每次脚本运行时重新计算脚本的 SHA1。
然后checkAndSet上面的方法运行脚本。脚本可以SessionCallback作为事务或管道的一部分在内部运行。有关更多信息,请参阅“ Redis 事务”和“流水线”。
Spring Data Redis 提供的脚本支持还允许您使用 Spring Task 和 Scheduler 抽象来安排 Redis 脚本定期运行。有关更多详细信息,请参阅Spring 框架文档。
在 2.0 中更改
Spring Redis通过包提供了 Spring缓存抽象的
org.springframework.data.redis.cache实现。要将 Redis 用作支持实现,请添加RedisCacheManager到您的配置中,如下所示:
@Bean public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) { return RedisCacheManager.create(connectionFactory); }
RedisCacheManager可以使用 配置行为RedisCacheManagerBuilder,让您设置默认RedisCacheConfiguration、事务行为和预定义缓存。
RedisCacheManager cm = RedisCacheManager.builder(connectionFactory) .cacheDefaults(defaultCacheConfig()) .withInitialCacheConfigurations(singletonMap("predefined", defaultCacheConfig().disableCachingNullValues())) .transactionAware() .build();
如前面的示例所示,RedisCacheManager允许在每个缓存的基础上定义配置。
RedisCachecreated with的行为是用RedisCacheManager定义的RedisCacheConfiguration。该配置允许您设置密钥到期时间、前缀和RedisSerializer实现与二进制存储格式之间的转换,如以下示例所示:
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofSeconds(1)) .disableCachingNullValues();
RedisCacheManager默认为无锁RedisCacheWriter读取和写入二进制值。无锁缓存提高了吞吐量。缺少条目锁定可能会导致putIfAbsent和clean方法出现重叠的非原子命令,因为这些命令需要将多个命令发送到 Redis。锁定对应物通过设置显式锁定密钥并检查此密钥的存在来防止命令重叠,这会导致额外的请求和潜在的命令等待时间。
锁定适用于缓存级别,而不是每个缓存条目。
可以选择加入锁定行为,如下所示:
RedisCacheManager cm = RedisCacheManager.build(RedisCacheWriter.lockingRedisCacheWriter(connectionFactory)) .cacheDefaults(defaultCacheConfig()) ...
默认情况下,key缓存条目的any以实际缓存名称作为前缀,后跟两个冒号。此行为可以更改为静态和计算前缀。
以下示例显示了如何设置静态前缀:
// static key prefix RedisCacheConfiguration.defaultCacheConfig().prefixKeysWith("( ͡° ᴥ ͡°)"); The following example shows how to set a computed prefix: // computed key prefix RedisCacheConfiguration.defaultCacheConfig().computePrefixWith(cacheName -> "¯\_(ツ)_/¯" + cacheName);
缓存实现默认使用KEYS和DEL清除缓存。KEYS可能会导致大键空间的性能问题。因此,RedisCacheWriter可以使用 a 创建默认值BatchStrategy以切换到SCAN基于 - 的批处理策略。该SCAN策略需要批量大小以避免过多的 Redis 命令往返:
RedisCacheManager cm = RedisCacheManager.build(RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.scan(1000))) .cacheDefaults(defaultCacheConfig()) ...
该KEYS批次策略是使用任何驱动程序和Redis的操作模式(独立,集群)的全面支持。SCAN使用 Lettuce 驱动程序时完全支持。JedisSCAN仅支持非集群模式。
下表列出了 的默认设置RedisCacheManager:
默认情况下RedisCache,统计信息被禁用。使用
RedisCacheManagerBuilder.enableStatistics()收集当地的命中和未命中通过RedisCache#getStatistics(),返回所收集数据的快照。