在本文中,我们将实现另一个功能 —— 只订阅一部分消息。例如,我们只需要把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中
了解springcloud架构可以加求求:三五三六二四七二五九
绑定(Binding)
在之前的例子中,我们已经创建了绑定。可以在我们的 Tut3Config 文件中回忆一下这样的代码:
@Bean public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(fanout); }
绑定(binding)是指交换器(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换器(exchange)的消息感兴趣。
绑定可以使用一个额外的参数 routingKey。我们将交换器和队列传入到 BindingBuilder,并将 routingKey 绑定到交换器,如下所示:
@Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange"); }
routingKey 含义取决于交换类型。比如我们之前使用的 fanout exchange,会忽略它的值。
发布消息
我们将使用以上这个模型作为我们的路由系统,将消息发送到 direct exchange 而不是 fanout exchange。我们将使用颜色作为路由键,这样消费者将能通过选择想要接收(或订阅)的颜色来消费对应的消息。
我们在 Tut4Config 中做一些 Spring 启动配置,需要先建立一个交换器
@Bean public DirectExchange direct() { return new DirectExchange("tut.direct"); }
接收消息的方式与上一个教程中的一样,但也有一些不同 —— 我们需要为每个感兴趣的颜色创建一个新的绑定。
了解springcloud架构可以加求求:三五三六二四七二五九
@Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange");
生产者
public class Tut4Sender { @Autowird private AmqpTemplate template; @Autowird private DirectExchange direct; private int index; private int count; private final String[] keys = {"orange", "black", "green"}; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { StringBuilder builder = new StringBuilder("Hello to "); if (++this.index == 3) { this.index = 0; } String key = keys[this.index]; builder.append(key).append(' ').append(Integer.toString(++this.count)); String message = builder.toString(); template.convertAndSend(direct.getName(), key, message); System.out.println(" [x] Sent '" + message + "'"); } }
消费者
public class Tut4Receiver { @RabbitListener(queues = "#{autoDeleteQueue1.name}") public void receiver1(String in) throws InterruptedException { receiver(in, 1); } @RabbitListener(queues = "#{autoDeleteQueue2.name}") public void receiver2(String in) throws InterruptedException { receiver(in, 2); } private void receiver(String in, int instance) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); System.out.println("instance " + instance + " [x] Received '" + in + "'"); doWork(in); watch.stop(); System.out.println("instance " + instance + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
配置类
@Profile({"tut4", "routing"}) @Configuration public class Tut4Config { @Bean public DirectExchange direct() { return new DirectExchange("tut.direct"); } @Profile ("receiver") private static class ReceiverConfig { @Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); } @Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange"); } @Bean public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("black"); } @Bean public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("green"); } @Bean public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("black"); } @Bean public Tut4Receiver receiver() { return new Tut4Receiver(); } } @Profile("sender") @Bean public Tut4Sender sender() { return new Tut4Sender(); } }