C/C++教程

RabbitMQ的 RPC 消息模式你会了吗?

本文主要是介绍RabbitMQ的 RPC 消息模式你会了吗?,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前文学习了如何使用工作队列在多个工作者之间分配耗时的任务。若需要在远程计算机上运行一个函数并等待结果呢?这种模式通常被称为远程过程调用 (RPC)。

本节使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有耗时的任务可以分配,因此我们将创建一个返回斐波那契数的虚拟 RPC 服务。

客户端接口

创建一个简单的客户端类,暴露 call 方法,该方法发送一个 RPC 请求并阻塞,直到收到响应:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println("fib(4) 是 " + result);

虽然 RPC 是计算中很常见的模式,但它经常受到批评。问题在于当程序员不确定函数调用是本地调用还是缓慢的 RPC 调用时,会引发困惑。这种混淆会导致系统不可预测,并增加调试的复杂性。错误使用 RPC 不仅没有简化软件,反而可能导致难以维护的“代码结构混乱”。
鉴于此,请遵循以下建议:
确保明确区分本地函数调用和远程函数调用。
记录你的系统,使组件之间的依赖关系清晰。
处理错误情况。例如,当 RPC 服务器长时间不可用时,客户端应如何响应?
如有疑虑,请尽量避免使用 RPC。如果可能,应该使用异步管道——与 RPC 类似的阻塞操作不同,结果将被异步推送到下一个计算阶段。

回调队列

在 RabbitMQ 上实现 RPC 很简单。客户端发送一个请求消息,服务器通过响应消息进行回复。为接收响应,需要在请求中附上一个“回调”队列地址。可用默认的队列(在 Java 客户端中是独占的)。试试这个代码:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties.Builder()
    .replyTo(callbackQueueName)
    .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ...然后从 callback_queue 读取响应消息...

需要导入:

import com.rabbitmq.client.AMQP.BasicProperties;

消息属性
AMQP 0-9-1 协议预定义了一组 14 个与消息一起发送的属性。大多数属性很少使用,以下属性是常用的:
deliveryMode:标记消息为持久 (值为 2) 或瞬时 (其他值) 的模式
contentType:用于描述编码的 mime 类型。例如,对于常用的 JSON 编码,建议将此属性设置为:application/json
replyTo:通常用于命名回调队列
correlationId:用于将 RPC 响应与请求相关联

Correlation Id

在前面提到的方法中,我们建议为每个 RPC 请求创建一个回调队列。这很低效,但幸好有一个更好的方法——为每个客户端创建一个回调队列。

这会引发一个新问题:在回调队列中收到响应时,不清楚该响应属于哪个请求。这时 correlationId 属性派上用场。为每个请求设置一个唯一值。稍后,回调队列中收到消息时,看此属性,并根据它来匹配响应和请求。如看到一个未知 correlationId 值,可以安全地丢弃消息——它不属于我们的请求。

为啥应该忽略回调队列中的未知消息,而不非直接失败?因为服务器端可能会发生竞态条件。虽然不太可能,但可能 RPC 服务器在发送完答案后崩溃,但在为请求发送确认消息之前就崩溃了。如果发生这种情况,重启后的 RPC 服务器将重新处理该请求。因此,客户端的我们必须优雅地处理重复的响应,RPC 最好是幂等的。

总结

RPC模式工作流程:

  • 对于一个 RPC 请求,客户端发送一条带有两个属性的消息:replyTo,其值设置为为该请求创建的匿名独占队列;correlationId,其值为每个请求设置的唯一标识。
  • 请求被发送到 rpc_queue 队列。
  • RPC 工作者(即服务器)在该队列上等待请求。一旦收到请求,它将完成任务,并通过 replyTo 字段指定的队列将结果发送回客户端。
  • 客户端在回复队列中等待数据。当消息到达时,它检查 correlationId 属性。如果匹配请求中的值,它将响应返回给应用程序。

实现全流程

斐波那契任务:

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我们定义了斐波那契函数。该函数假设只接收有效的正整数输入。(对于较大数字,该算法效率较低,它可能是最慢的递归实现。)

服务器代码可在此处找到:RPCServer.java。

客户端代码略显复杂,完整的示例源代码可参考 RPCClient.java。

编译并设置类路径:

javac -cp $CP RPCClient.java RPCServer.java

我们的 RPC 服务现在已准备就绪。启动服务器:

java -cp $CP RPCServer
# => [x] 正在等待 RPC 请求

要请求斐波那契数,运行客户端:

java -cp $CP RPCClient
# => [x] 请求 fib(30)

此处展示的设计并非 RPC 服务的唯一实现方式,但它有以下优势:

  • 如果 RPC 服务器太慢,你可以通过运行另一个服务器实例进行扩展。试着在新的控制台中运行第二个 RPCServer
  • 在客户端,RPC 只需发送和接收一条消息。无需像 queueDeclare 这样的同步调用。因此,RPC 客户端只需一个网络往返即可完成一次 RPC 请求。

代码仍然相对简单,并未尝试解决更复杂但重要的问题,如:

  • 如果没有服务器运行,客户端应该如何响应?
  • RPC 是否需要某种超时机制?
  • 如果服务器发生故障并引发异常,是否应该将其转发给客户端?
  • 在处理消息前,是否应检查其有效性(如范围、类型)以防止无效消息的进入?

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。

各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。

负责:

  • 中央/分销预订系统性能优化
  • 活动&券等营销中台建设
  • 交易平台及数据中台等架构和开发设计
  • 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
  • LLM Agent应用开发
  • 区块链应用开发
  • 大数据开发挖掘经验
  • 推荐系统项目

目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

  • 编程严选网
这篇关于RabbitMQ的 RPC 消息模式你会了吗?的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!