基于Lettuce连接Redis单机和集群的客户端代码,在这篇文章中,给出了使用 Lettuce API 的大致流程。
本文将着力分析一下背后的源码。
首先,回顾一下使用 Lettuce 客户端调用 get 命令的大致流程:
// 步骤1:连接信息 RedisURI redisURI = RedisURI.create("localhost", 6379); // 步骤2:创建Redis客户端 RedisClient client = RedisClient.create(redisURI); // 步骤3:建立连接 StatefulRedisConnection<String, String> connection = client.connect(); // 步骤4:异步调用命令 RedisAsyncCommands<String, String> asyncCommands = connection.async(); // asyncCommands 是类 RedisAsyncCommandsImpl 的实例对象 RedisFuture<String> future = asyncCommands.get("hello");
RedisAsyncCommandsImpl 是 AbstractRedisAsyncCommands 子类,以下就是其 get 方法
@Override public RedisFuture<V> get(K key) { return dispatch(commandBuilder.get(key)); }
Command<K, V, V> get(K key) { notNullKey(key); // 第一个参数类型是 CommandType // 第二个参数类型是 CommandOutput,ValueOutput 是它的子类 // 第三个参数类型是 泛型,表示 Key 的类型 return createCommand(GET, new ValueOutput<>(codec), key); }
BaseRediscreateCommand 是 RedisCommandBuilder 的基类,看它的 createCommand 方法:
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, K key) { // addKey 会把参数 K 保存到 singularArguments 列表中,列表的成员类型是 SingularArgument // KeyArgument 是 SingularArgument 的子类,用来存放 Key // ValueArgument 也是 SingularArgument 的子类,用来存放 Value CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key); // 第一个参数类型是 CommandType // 第二个参数类型是 CommandOutput,表示命令的输出结果 // 第三个参数类型是 CommandArgs,表示命令的参数 return createCommand(type, output, args); } protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) { // Command // - ProtocolKeyword,是 CommandType 的父类,用于把命令关键字转化为 byte数组 // - CommandOutput,用来保存命令的response // - CommandArgs // - singularArguments,保存命令的request参数 return new Command<>(type, output, args); }
RedisAsyncCommandsImpl 是 AbstractRedisAsyncCommands 子类,所以实际上是调用了 RedisAsyncCommandsImpl 对象的 dispatch 方法:
public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) { // 用 AsyncCommand 增强 Command 类:异步redis命令及其结果。 // 所有成功执行的命令最终都将返回 CommandOutput AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd); // StatefulRedisConnectionImpl,这是单机模式的连接对象,如果是Redis集群或者主从或者哨兵还有不同的实现 RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand); if (dispatched instanceof AsyncCommand) { return (AsyncCommand<K, V, T>) dispatched; } return asyncCommand; }
@Override public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) { // AUTH / SELECT / READONLY / READWRITE / DISCARD / EXEC / MULTI // 以上这些命令会执行该方法,其他命令什么都不做 RedisCommand<K, V, T> toSend = preProcessCommand(command); // MULTI 命令会执行该方法,其他命令什么都不做 potentiallyEnableMulti(command); // 继续发送 return super.dispatch(toSend); }
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) { if (debugEnabled) { logger.debug("dispatching command {}", cmd); } // 提供追踪功能 if (tracingEnabled) { RedisCommand<K, V, T> commandToSend = cmd; TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class); if (provider == null) { commandToSend = new TracedCommand<>(cmd, clientResources.tracing().initialTraceContextProvider().getTraceContext()); } return channelWriter.write(commandToSend); } // 接下来调用 RedisChannelWriter 的写方法,发送Redis命令 return channelWriter.write(cmd); }
RedisChannelWriter 的实现类根据Redis部署模式不同,对应的实现类也不同。例如单机对应的是 DefaultEndpoint,集群对应的是 ClusterDistributionChannelWriter。
// DefaultEndpoint.java public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null"); RedisException validation = validateWrite(1); if (validation != null) { // 校验失败,指令异常退出 command.completeExceptionally(validation); return command; } try { sharedLock.incrementWriters(); if (inActivation) { command = processActivationCommand(command); } // 是否自动刷新缓冲区 if (autoFlushCommands) { // 判断是否与Redis服务器保持连接 if (isConnected()) { // 最终会调用 Netty 的 NioSocketChannel 的 writeAndFlush 方法,完成命令的发送 writeToChannelAndFlush(command); } else { // 添加到 disconnectedBuffer 中,不立即发送 writeToDisconnectedBuffer(command); } } else { // 保存到 commandBuffer 中,不立即发送 writeToBuffer(command); } } finally { sharedLock.decrementWriters(); if (debugEnabled) { logger.debug("{} write() done", logPrefix()); } } return command; }
通过 Netty 的 NioSocketChannel 发送对象,自然会经过配置到的管道,管道中配置有编码器,用来把 Redis 命令对象编码为 byte数组再通过 TCP 通道发送出去。
通过 Netty 调用 MessageToByteEncoder
的 write(ChannelHandlerContext, Object, ChannelPromise)
方法中的
try { this.encode(ctx, cast, buf); } finally { ReferenceCountUtil.release(msg); }
CommandEncoder 是 MessageToByteEncoder 的子类,实现了抽象方法 encode
:
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { out.touch("CommandEncoder.encode(…)"); if (msg instanceof RedisCommand) { RedisCommand<?, ?, ?> command = (RedisCommand<?, ?, ?>) msg; // 把单个RedisCommand编码成byte数组保存到ByteBuf中 encode(ctx, out, command); } // 把多个RedisCommand编码成byte数组保存到ByteBuf中 if (msg instanceof Collection) { Collection<RedisCommand<?, ?, ?>> commands = (Collection<RedisCommand<?, ?, ?>>) msg; for (RedisCommand<?, ?, ?> command : commands) { encode(ctx, out, command); } } }
private void encode(ChannelHandlerContext ctx, ByteBuf out, RedisCommand<?, ?, ?> command) { try { // 保存ByteBuf写入前的位置,方便出现异常后回滚 out.markWriterIndex(); // 调用RedisCommand的encode方法,编码并写入ByteBuf command.encode(out); } catch (RuntimeException e) { out.resetWriterIndex(); command.completeExceptionally(new EncoderException( "Cannot encode command. Please close the connection as the connection state may be out of sync.", e)); } if (debugEnabled) { logger.debug("{} writing command {}", logPrefix(ctx.channel()), command); if (traceEnabled) { logger.trace("{} Sent: {}", logPrefix(ctx.channel()), out.toString(Charset.defaultCharset()).trim()); } } }
类 Command 实现了 RedisCommand 接口,其 encode 方法源码如下:
public void encode(ByteBuf buf) { buf.touch("Command.encode(…)"); // * 在RESP协议中表示数组的第一个字节 buf.writeByte('*'); // 接下来是数组长度 CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0)); // 在RESP中,协议的不同部分始终以\r\n(CRLF)终止 buf.writeBytes(CommandArgs.CRLF); // 对命令关键字编码 CommandArgs.BytesArgument.writeBytes(buf, type.getBytes()); // 对命令参数编码 if (args != null) { args.encode(buf); } }
以下是 CommandArgs 内部类 BytesArgument 的 writeBytes 方法
static void writeBytes(ByteBuf buffer, byte[] value) { // 在RESP协议中,对于大容量字符串(Bulk String),应答的第一个字节是$ buffer.writeByte('$'); // 大容量字符串的长度 IntegerArgument.writeInteger(buffer, value.length); buffer.writeBytes(CRLF); // 字符串的内容 buffer.writeBytes(value); buffer.writeBytes(CRLF); }
然后是对命令参数逐个进行编码:
public void encode(ByteBuf buf) { buf.touch("CommandArgs.encode(…)"); // singularArguments 列表保存了命令的所有参数 for (SingularArgument singularArgument : singularArguments) { singularArgument.encode(buf); } }
以 KeyArgument 的 encode 方法为例:
void encode(ByteBuf target) { if (codec instanceof ToByteBufEncoder) { ToByteBufEncoder<K, V> toByteBufEncoder = (ToByteBufEncoder<K, V>) codec; // 开辟一个临时的缓冲区,用来保存 单参数 编码成功后的byte数组 // 调用 target.alloc() 是为了拿到和目标缓冲区的 ByteBufAllocator,保证分配在相同的内存区中 ByteBuf temporaryBuffer = target.alloc().buffer(toByteBufEncoder.estimateSize(key) + 6); try { // 比如StringCodec 就是 ToByteBufEncoder 的子类 // StringCodec 把字符串用 utf8或ascii或者其他编码方式 编码成 byte数组 toByteBufEncoder.encodeKey(key, temporaryBuffer); // 把临时缓冲区中的byte数组写入到目标缓冲区内 ByteBufferArgument.writeByteBuf(target, temporaryBuffer); } finally { temporaryBuffer.release(); } return; } ByteBufferArgument.writeByteBuffer(target, codec.encodeKey(key)); }