关于 spring-data-redis 和 lettuce,笔者写过不少文章:
最近,私信还有留言中,网友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通过抓包一看,并没有生效,这个如何配置才能生效呢?
首先,在上面的文章中,我们分析过 Spring-data-redis + Lettuce 的基本原理,在这种环境下 RedisTemplate 使用的连接内部包括:
execute(RedisCallback)
,流程是:
对于 executePipelined(RedisCallback)
,如果使用正确的话,会使用 asyncDedicatedConn
私有连接执行。那么怎么算使用正确呢?
需要使用回调的连接进行 Redis 调用,不能直接使用 redisTemplate
调用,否则 pipeline 不生效:
Pipeline 生效:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { connection.get("test".getBytes()); connection.get("test2".getBytes()); return null; } });
Pipeline 不生效:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { redisTemplate.opsForValue().get("test"); redisTemplate.opsForValue().get("test2"); return null; } });
这样我们就能使用保证 API 层正确使用 pipeline 了,但是默认配置的情况下, 底层还是没有执行 Pipeline,这是怎么回事呢?
Redis Pipeline 是 Redis 中的 批量操作,它能将一组 Redis 命令进行组装,通过一次传输给 Redis 并返回结果集,大大减少了如果命令时一条条单独传输需要的 RTT 时间(包括 Redis 客户端,Redis 服务端切换系统调用发送接收数据的时间,以及网络传输时间)。
如果原来的命令是这么发送的:
Client -> Server: INCR X\r\n Server -> Client: 1 Client -> Server: INCR X\r\n Server -> Client: 2 Client -> Server: INCR X\r\n Server -> Client: 3 Client -> Server: INCR X\r\n Server -> Client: 4
那么使用 PIPELINE 之后,命令就是类似于这么发送的
Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n Server -> Client: 1\r\n2\r\n3\r\n4
我们可以看出,其实它的原理,就是客户端先将所有命令拼接在一起然后本地缓存起来,之后统一发到服务端,服务端执行所有命令之后,统一响应。
Lettuce 的连接有一个 AutoFlushCommands 配置,就是指在这个连接上执行的命令,如果发送到服务端。默认是 false,即收到一个命令就发到服务端一个。如果配置为 false,则将所有命令缓存起来,手动调用 flushCommands 的时候,将缓存的命令一起发到服务端,这样其实就是实现了 Pipeline。
Spring-data-redis 从 2.3.0 版本开始,对于 Lettuce 也兼容了 Pipeline 配置,参考:
我们可以这样配置:
@Bean public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() { return new BeanPostProcessor() { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //在 LettuceConnectionFactory 这个 Bean 初始化之后,设置 PipeliningFlushPolicy 为 flushOnClose if (bean instanceof LettuceConnectionFactory) { LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean; lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose()); } return bean; } }; }
我们来看下这个 PipeliningFlushPolicy 的源码就知道这个 flushOnClose 的意义:
public interface PipeliningFlushPolicy { //其实就是默认的每个命令都直接发到 Redis Server static PipeliningFlushPolicy flushEachCommand() { return FlushEachCommand.INSTANCE; } //在连接关闭的时候,将命令一起发到 Redis static PipeliningFlushPolicy flushOnClose() { return FlushOnClose.INSTANCE; } //手动设置在多少条命令之后,统一发到 Redis,但是同样的,连接关闭的时候也会发到 Redis static PipeliningFlushPolicy buffered(int bufferSize) { return () -> new BufferedFlushing(bufferSize); } }
这三个类也都实现了 PipeliningFlushState
接口:
public interface PipeliningFlushState { //对于 executePipelined,刚开始就会调用 connection.openPipeline(); 开启 pipeline,里面会调用这个方法 void onOpen(StatefulConnection<?, ?> connection); //对于 executePipelined 中的每个命令都会调用这个方法 void onCommand(StatefulConnection<?, ?> connection); //在 executePipelined 的最后会调用 connection.closePipeline(),里面会调用这个方法 void onClose(StatefulConnection<?, ?> connection); }
默认的每个命令都直接发到 Redis Server 的实现是:其实就是方法里什么都不做。
private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState { INSTANCE; @Override public PipeliningFlushState newPipeline() { return INSTANCE; } @Override public void onOpen(StatefulConnection<?, ?> connection) {} @Override public void onCommand(StatefulConnection<?, ?> connection) {} @Override public void onClose(StatefulConnection<?, ?> connection) {} }
对于 flushOnClose:
private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState { INSTANCE; @Override public PipeliningFlushState newPipeline() { return INSTANCE; } @Override public void onOpen(StatefulConnection<?, ?> connection) { //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis connection.setAutoFlushCommands(false); } @Override public void onCommand(StatefulConnection<?, ?> connection) { //收到命令时什么都不做 } @Override public void onClose(StatefulConnection<?, ?> connection) { //在 pipeline 关闭的时候发送所有命令 connection.flushCommands(); //恢复默认配置,这样连接如果退回连接池不会影响后续使用 connection.setAutoFlushCommands(true); } }
对于 buffered:
private static class BufferedFlushing implements PipeliningFlushState { private final AtomicLong commands = new AtomicLong(); private final int flushAfter; public BufferedFlushing(int flushAfter) { this.flushAfter = flushAfter; } @Override public void onOpen(StatefulConnection<?, ?> connection) { //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis connection.setAutoFlushCommands(false); } @Override public void onCommand(StatefulConnection<?, ?> connection) { //如果命令达到指定个数,就发到 Redis if (commands.incrementAndGet() % flushAfter == 0) { connection.flushCommands(); } } @Override public void onClose(StatefulConnection<?, ?> connection) { //在 pipeline 关闭的时候发送所有命令 connection.flushCommands(); //恢复默认配置,这样连接如果退回连接池不会影响后续使用 connection.setAutoFlushCommands(true); } }
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer: