C/C++教程

Lettuce源码阅读(一)之普通的get命令

本文主要是介绍Lettuce源码阅读(一)之普通的get命令,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、简介

基于Lettuce连接Redis单机和集群的客户端代码,在这篇文章中,给出了使用 Lettuce API 的大致流程。
本文将着力分析一下背后的源码。

首先,回顾一下使用 Lettuce 客户端调用 get 命令的大致流程:

 // 步骤1:连接信息
RedisURI redisURI = RedisURI.create("localhost", 6379);

// 步骤2:创建Redis客户端
RedisClient client = RedisClient.create(redisURI);

// 步骤3:建立连接
StatefulRedisConnection<String, String> connection = client.connect();

// 步骤4:异步调用命令
RedisAsyncCommands<String, String> asyncCommands = connection.async();
// asyncCommands 是类 RedisAsyncCommandsImpl 的实例对象
RedisFuture<String> future = asyncCommands.get("hello");

RedisAsyncCommandsImplAbstractRedisAsyncCommands 子类,以下就是其 get 方法

@Override
public RedisFuture<V> get(K key) {
    return dispatch(commandBuilder.get(key));
}

二、构建Redis命令对象

2.1 RedisCommandBuilder.get

Command<K, V, V> get(K key) {
  notNullKey(key);
  // 第一个参数类型是 CommandType
  // 第二个参数类型是 CommandOutput,ValueOutput 是它的子类
  // 第三个参数类型是 泛型,表示 Key 的类型
  return createCommand(GET, new ValueOutput<>(codec), key);
}

2.1 RedisCommandBuilder.createCommand

BaseRediscreateCommandRedisCommandBuilder 的基类,看它的 createCommand 方法:

protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, K key) {
  // addKey 会把参数 K 保存到 singularArguments 列表中,列表的成员类型是 SingularArgument
  // KeyArgument 是 SingularArgument 的子类,用来存放 Key
  // ValueArgument 也是  SingularArgument 的子类,用来存放 Value
  CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key);
  // 第一个参数类型是 CommandType
  // 第二个参数类型是 CommandOutput,表示命令的输出结果
  // 第三个参数类型是 CommandArgs,表示命令的参数
  return createCommand(type, output, args);
}

protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
  // Command
  //   - ProtocolKeyword,是 CommandType 的父类,用于把命令关键字转化为 byte数组
  //   - CommandOutput,用来保存命令的response
  //   - CommandArgs
  //     - singularArguments,保存命令的request参数
  return new Command<>(type, output, args);
}

三、发送Redis命令对象

3.1 AbstractRedisAsyncCommands.dispatch

RedisAsyncCommandsImplAbstractRedisAsyncCommands 子类,所以实际上是调用了 RedisAsyncCommandsImpl 对象的 dispatch 方法:

public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
  // 用 AsyncCommand 增强 Command 类:异步redis命令及其结果。
  // 所有成功执行的命令最终都将返回 CommandOutput
  AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd);
  // StatefulRedisConnectionImpl,这是单机模式的连接对象,如果是Redis集群或者主从或者哨兵还有不同的实现
  RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);
  if (dispatched instanceof AsyncCommand) {
    return (AsyncCommand<K, V, T>) dispatched;
  }
  return asyncCommand;
}

3.2 StatefulRedisConnectionImpl.dispatch

@Override
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
    // AUTH / SELECT / READONLY / READWRITE / DISCARD / EXEC / MULTI
    // 以上这些命令会执行该方法,其他命令什么都不做
    RedisCommand<K, V, T> toSend = preProcessCommand(command);
    // MULTI 命令会执行该方法,其他命令什么都不做
    potentiallyEnableMulti(command);
    // 继续发送
    return super.dispatch(toSend);
}

3.3 RedisChannelHandler.dispatch

protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
  if (debugEnabled) {
    logger.debug("dispatching command {}", cmd);
  }
  // 提供追踪功能
  if (tracingEnabled) {
    RedisCommand<K, V, T> commandToSend = cmd;
    TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);
    if (provider == null) {
      commandToSend = new TracedCommand<>(cmd,
                    clientResources.tracing().initialTraceContextProvider().getTraceContext());
    }
    return channelWriter.write(commandToSend);
  }
  // 接下来调用 RedisChannelWriter 的写方法,发送Redis命令
  return channelWriter.write(cmd);
}

3.4 RedisChannelWriter.write

RedisChannelWriter 的实现类根据Redis部署模式不同,对应的实现类也不同。例如单机对应的是 DefaultEndpoint,集群对应的是 ClusterDistributionChannelWriter

// DefaultEndpoint.java
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
  LettuceAssert.notNull(command, "Command must not be null");
  RedisException validation = validateWrite(1);
  if (validation != null) {
    // 校验失败,指令异常退出
    command.completeExceptionally(validation);
    return command;
  }
  try {
    sharedLock.incrementWriters();
    if (inActivation) {
      command = processActivationCommand(command);
    }
    // 是否自动刷新缓冲区
    if (autoFlushCommands) {
      // 判断是否与Redis服务器保持连接
      if (isConnected()) {
        // 最终会调用 Netty 的 NioSocketChannel 的 writeAndFlush 方法,完成命令的发送
        writeToChannelAndFlush(command);
      } else {
        // 添加到 disconnectedBuffer 中,不立即发送
        writeToDisconnectedBuffer(command);
      }
    } else {
      // 保存到 commandBuffer 中,不立即发送
      writeToBuffer(command);
    }
  } finally {
    sharedLock.decrementWriters();
    if (debugEnabled) {
      logger.debug("{} write() done", logPrefix());
    }
  }
  return command;
}

通过 Netty 的 NioSocketChannel 发送对象,自然会经过配置到的管道,管道中配置有编码器,用来把 Redis 命令对象编码为 byte数组再通过 TCP 通道发送出去。

四、把Redis命令对象编码为byte数组

通过 Netty 调用 MessageToByteEncoderwrite(ChannelHandlerContext, Object, ChannelPromise) 方法中的

try {
    this.encode(ctx, cast, buf);
} finally {
    ReferenceCountUtil.release(msg);
}

CommandEncoderMessageToByteEncoder 的子类,实现了抽象方法 encode

protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
  out.touch("CommandEncoder.encode(…)");
  if (msg instanceof RedisCommand) {
    RedisCommand<?, ?, ?> command = (RedisCommand<?, ?, ?>) msg;
    // 把单个RedisCommand编码成byte数组保存到ByteBuf中
    encode(ctx, out, command);
  }
  // 把多个RedisCommand编码成byte数组保存到ByteBuf中
  if (msg instanceof Collection) {
    Collection<RedisCommand<?, ?, ?>> commands = (Collection<RedisCommand<?, ?, ?>>) msg;
    for (RedisCommand<?, ?, ?> command : commands) {
      encode(ctx, out, command);
    }
  }
}

4.1 CommandEncoder.encode

private void encode(ChannelHandlerContext ctx, ByteBuf out, RedisCommand<?, ?, ?> command) {
  try {
    // 保存ByteBuf写入前的位置,方便出现异常后回滚
    out.markWriterIndex();
    // 调用RedisCommand的encode方法,编码并写入ByteBuf
    command.encode(out);
  } catch (RuntimeException e) {
    out.resetWriterIndex();
    command.completeExceptionally(new EncoderException(
                "Cannot encode command. Please close the connection as the connection state may be out of sync.", e));
  }
  if (debugEnabled) {
    logger.debug("{} writing command {}", logPrefix(ctx.channel()), command);
    if (traceEnabled) {
      logger.trace("{} Sent: {}", logPrefix(ctx.channel()), out.toString(Charset.defaultCharset()).trim());
    }
  }
}

4.2 Command.encode

Command 实现了 RedisCommand 接口,其 encode 方法源码如下:

public void encode(ByteBuf buf) {
    buf.touch("Command.encode(…)");
    // * 在RESP协议中表示数组的第一个字节
    buf.writeByte('*');
    // 接下来是数组长度
    CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0));
    // 在RESP中,协议的不同部分始终以\r\n(CRLF)终止
    buf.writeBytes(CommandArgs.CRLF);
    // 对命令关键字编码
    CommandArgs.BytesArgument.writeBytes(buf, type.getBytes());
    // 对命令参数编码
    if (args != null) {
        args.encode(buf);
    }
}

以下是 CommandArgs 内部类 BytesArgumentwriteBytes 方法

static void writeBytes(ByteBuf buffer, byte[] value) {
  // 在RESP协议中,对于大容量字符串(Bulk String),应答的第一个字节是$
  buffer.writeByte('$');
  // 大容量字符串的长度
  IntegerArgument.writeInteger(buffer, value.length);
  buffer.writeBytes(CRLF);
  // 字符串的内容
  buffer.writeBytes(value);
  buffer.writeBytes(CRLF);
}

然后是对命令参数逐个进行编码:

public void encode(ByteBuf buf) {
  buf.touch("CommandArgs.encode(…)");
  // singularArguments 列表保存了命令的所有参数
  for (SingularArgument singularArgument : singularArguments) {
    singularArgument.encode(buf);
  }
}

KeyArgument 的 encode 方法为例:

void encode(ByteBuf target) {
  if (codec instanceof ToByteBufEncoder) {
    ToByteBufEncoder<K, V> toByteBufEncoder = (ToByteBufEncoder<K, V>) codec;
    // 开辟一个临时的缓冲区,用来保存 单参数 编码成功后的byte数组
    // 调用 target.alloc() 是为了拿到和目标缓冲区的 ByteBufAllocator,保证分配在相同的内存区中
    ByteBuf temporaryBuffer = target.alloc().buffer(toByteBufEncoder.estimateSize(key) + 6);
    try {
      // 比如StringCodec 就是 ToByteBufEncoder 的子类
      // StringCodec 把字符串用 utf8或ascii或者其他编码方式 编码成 byte数组
      toByteBufEncoder.encodeKey(key, temporaryBuffer);
      // 把临时缓冲区中的byte数组写入到目标缓冲区内
      ByteBufferArgument.writeByteBuf(target, temporaryBuffer);
    } finally {
      temporaryBuffer.release();
    }
    return;
  }
  ByteBufferArgument.writeByteBuffer(target, codec.encodeKey(key));
}
这篇关于Lettuce源码阅读(一)之普通的get命令的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!