Java教程

跟我学习Java Spring boot 整合RabbitMQ(二):工作队列(Work queues)-B2B2C小程序电子商务

本文主要是介绍跟我学习Java Spring boot 整合RabbitMQ(二):工作队列(Work queues)-B2B2C小程序电子商务,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

现在,我们将发送一些字符串,把这些字符串当作复杂的任务。我们并没有一个真实的复杂任务,类似于图片大小被调整或 pdf 文件被渲染,所以我们通过 sleep () 方法来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时 1 秒钟。比如 “Hello…” 就会耗时 3 秒钟。


如果您尚未设置项目,请参阅第一个教程中的设置。我们将遵循与第一个教程相同的模式:创建一个包(tut2)并创建 Tut2Config、Tut2Receiver 和 Tut2Sender。


代码整合

首先创建一个新的包(tut2),我们将在这里放置我们的三个类。在配置类 Tut2Config 中,我们设置了两个配置文件 ——tut2 和 work-queues。我们利用 Spring 来将队列 Queue 暴露为一个 bean。我们配置消费者,并定义两个 bean 以对应于上图中的工作进程 receiver1 和 receiver2。

配置类

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

@Profile({"tut2", "work-queues"})

@Configuration

public class Tut2Config {

   @Bean

   public Queue queue() {

       return new Queue("work-queues");

   }

   /**

    * 定义两个消费者,并且给了他们不同的标识

    */

 @Profile ("receiver")

   private static class ReceiverConfig {

       @Bean

       public Tut2Receiver receiver1() {

           return new Tut2Receiver(1);

       }

       @Bean

       public Tut2Receiver receiver2() {

           return new Tut2Receiver(2);

       }

   }

   @Profile("sender")

    @Bean

   public Tut2Sender sender() {

       return new Tut2Sender();

   }

}

生产者
我们简单修改一下生产者的代码,以添加点号(.)的方式来人为的增加该任务的时长,字符串中的每个点号(.)都会增加 1s 的耗时。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

public class Tut2Sender {

@Autowired

private AmqpTemplate template;

@Autowired  

private Queue queue;

   int dots = 0;

   int count = 0;

  @Scheduled(fixedDelay = 1000, initialDelay = 500)

   public void send(){

       StringBuilder builder = new StringBuilder("Hello");

       if (dots++ == 3) {

           dots = 1;

       }

       for (int i = 0; i < dots; i++) {

           builder.append('.');

       }

       builder.append(Integer.toString(++count));

       String message = builder.toString();

       template.convertAndSend(queue.getName(), message);

       System.out.println(" [x] Sent '" + message + "'");

   }

}

消费者
我们的消费者 Tut2Receiver 通过 doWork () 方法模拟了一个耗时的虚假任务,它需要为消息体中每一个点号(.)模拟 1 秒钟的操作。并且我们为消费者增加了一个实例编号,以知道是哪个实例消费了消息和处理的时长。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

@RebbitListener(queues = "work-queues")

public class Tut2Receiver {

   private int instance;

   public Tut2Receiver(int instance) {

       this.instance = instance;

   }

   @RabbitHandler

   public void receive(String in) throws InterruptedException {

       StopWatch watch = new StopWatch();

       watch.start();

       System.out.println("instance " + this.instance +

               " [x] Received '" + in + "'");

       doWork(in);

       watch.stop();

       System.out.println("instance " + this.instance +

               " [x] Done in " + watch.getTotalTimeSeconds() + "s");

   }

   private void doWork(String in) throws InterruptedException {

       for (char ch : in.toCharArray()) {

           if (ch == '.') {

               Thread.sleep(1000);

           }

       }

   }

}

运行

maven 编译


mvn clean package -Dmaven.test.skip=true

运行


java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut2,sender --tutorial.client.duration=60000

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut2,receiver --tutorial.client.duration=60000

输出


// Sender

Ready … running for 10000ms

[x] Sent ‘Hello.1’

[x] Sent ‘Hello…2’

[x] Sent ‘Hello…3’

[x] Sent ‘Hello.4’

[x] Sent ‘Hello…5’

[x] Sent ‘Hello…6’

[x] Sent ‘Hello.7’

[x] Sent ‘Hello…8’

[x] Sent ‘Hello…9’


// Receiver

Ready … running for 10000ms

instance 1 [x] Received ‘Hello.1’

instance 2 [x] Received ‘Hello…2’

instance 1 [x] Done in 1.005s

instance 1 [x] Received ‘Hello…3’

instance 2 [x] Done in 2.007s

instance 2 [x] Received ‘Hello.4’

instance 2 [x] Done in 1.005s

instance 1 [x] Done in 3.01s

instance 1 [x] Received ‘Hello…5’

instance 2 [x] Received ‘Hello…6’

instance 1 [x] Done in 2.006s

instance 1 [x] Received ‘Hello.7’

instance 1 [x] Done in 1.002s

instance 1 [x] Received ‘Hello…9’

instance 2 [x] Done in 3.01s

instance 2 [x] Received ‘Hello…8’

prefetch

从消费者这端的输出可以看出来,instance 1 得到的任务编号始终是奇数(Hello.1,Hello…3,Hello…5,Hello.7),而 instance 2 得到的任务编号始终是偶数。了解springcloud架构可以加求求:三五三六二四七二五九

如果感觉这次的输出只是巧合,可以多试几次或通过 --tutorial.client.duration= 调整时长得到更多的输出,而结果肯定都是一样的。


这里设计的问题就是之前在基础概念里讲到的调度策略的问题了。要实现公平调度(Fair dispatch)就是设置 prefetch 的值,实现方式有两种。


全局设置

在 application.yml 中设置 spring.rabbitmq.listener.simple.prefetch=1 即可,这会影响到本 Spring Boot 应用中所有使用默认 SimpleRabbitListenerContainerFactory 的消费者。


网上很多人说改配置 pring.rabbitmq.listener.prefetc,实测已经无效,应该是版本的问题。我所使用的版本(RabbitMQ:3.7.4,Spring Boot: 2.0.1.RELEASE),除了 spring.rabbitmq.listener.simple.prefetch,还有一个 spring.rabbitmq.listener.direct.prefetch 可以配置。


改了配置后再运行,可以看到 instance 1 可以获取到”Hello…6”、”Hello…12” 了。


Ready … running for 60000ms

instance 1 [x] Received ‘Hello.1’

instance 2 [x] Received ‘Hello…2’

instance 1 [x] Done in 1.004s

instance 1 [x] Received ‘Hello…3’

instance 2 [x] Done in 2.008s

instance 2 [x] Received ‘Hello.4’

instance 2 [x] Done in 1.004s

instance 2 [x] Received ‘Hello…5’

instance 1 [x] Done in 3.012s

instance 1 [x] Received ‘Hello…6’

instance 2 [x] Done in 2.007s

instance 2 [x] Received ‘Hello.7’

instance 2 [x] Done in 1.004s

instance 2 [x] Received ‘Hello…8’

instance 1 [x] Done in 3.011s

instance 1 [x] Received ‘Hello…9’

instance 2 [x] Done in 2.007s

instance 2 [x] Received ‘Hello.10’

instance 2 [x] Done in 1.006s

instance 2 [x] Received ‘Hello…11’

instance 1 [x] Done in 3.01s

instance 1 [x] Received ‘Hello…12’

特定消费者

上边是改了全局的消费者,如果只针对特定的消费者的话,又怎么处理呢?

我们可以通过自定义 RabbitListenerContainerFactory 来实现。

1

2

3

4

5

6

7

@Bean

public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {

   SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

   factory.setConnectionFactory(rabbitConnectionFactory);

   factory.setPrefetchCount(1);

   return factory;

}

然后在特定的消费者上指定 containerFactory

1

2

3

4

@RebbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")

public void receive(String in) {

   System.out.println(" [x] Received '" + in + "'")

}



作者:mblib
链接:https://juejin.im/post/5e60d47351882548fd306a90
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


点击查看更多内容
这篇关于跟我学习Java Spring boot 整合RabbitMQ(二):工作队列(Work queues)-B2B2C小程序电子商务的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!