消息队列MQ

RabbitMQ-消息队列的收发模式(二)

本文主要是介绍RabbitMQ-消息队列的收发模式(二),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

为了测试方便代码复用这里封装了一个简单的连接mq的工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName RabbitMqUtil
 * @Author ZC
 * @Date 2022/7/2 21:21
 * @Version 1.0
 * @Description
 */
public class RabbitMqUtil {
    private final String host = "192.168.232.119";
    private final int port = 5672;
    private final String username = "test";
    private final String password = "test12";
    private final String virtualHost = "test_mq";

    /**
     * 连接RabbitMq
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public Connection getConnection() throws IOException, TimeoutException {
        //连接Mq服务器、主机、端口、用户名、密码
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        //设置虚拟主机
        factory.setVirtualHost(virtualHost);
        Connection connection = factory.newConnection();
        return connection;
    }
}

1、简单队列模式

默认 direct 类型交换机,队列名充当路由键。

默认模式-------------消费者间交替接收消息

  • 弊端:

    无法做到性能好的消费者多消费、性能差点的少消费(资源的浪费)

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName SimpleProvider
 * @Author ZC
 * @Date 2022/7/2 21:33
 * @Version 1.0
 * @Description  消息提供者
 */
public class SimpleProvider {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMqUtil rabbitMqUtil = new RabbitMqUtil();
        //1、连接mq服务
        Connection connection = rabbitMqUtil.getConnection();
        //2、通过连接对象获得一个连接通道
        Channel channel = connection.createChannel();
        //3、声明队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                  Map<String, Object> arguments)
         * queue: 队列名称
         * durable: 是否持久化(即服务重启时是否还存在)
         * exclusive: 是否独占(即当前队列是否只被这一个队列消费)
         * autoDelete: 是否自动删除(即当该队列没有被连接使用后是否删除)
         * arguments: 队列其他参数的设置
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4、发送信息到mq
        String message  = "hello rabbitMq";
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机
         * routingKey: 如果交换机是默认交换机那么这个相当于队列名称
         * props:消息相关参数信息
         * body: 具体消息
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("send: message<"+message+">发送成功");
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName SimpleConsumer
 * @Author ZC
 * @Date 2022/7/2 22:02
 * @Version 1.0
 * @Description
 */
public class SimpleConsumer {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、获取连接
        Connection connection = new RabbitMqUtil().getConnection();
        //2、获取一个连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //3、获取队列中的消息
        //3。1、处理消息-回调之后使用
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //监听队列
        /**
         * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
         *
         * queue: 队列名称
         * autoAck:
         * deliverCallback: 消息传递时回调
         * cancelCallback: 消费者取消时回调
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

        //非lambada表达式接收消息
//        Consumer consumer = new DefaultConsumer(channel){
//            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String message = new String(body,"utf-8");
//                System.out.println(" [x] Received '" + message + "'");
//            }
//        };
//        //监听队列
//        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

2、work-模式

同一队列不同消费者的消息分发机制。

解决简单队列模式中无法实现能者多劳的问题

简单队列和work 模式的不同:

  • 简单队列只要消息从队列中获取,无论消费者获取到消息后是否成功消费,比如遇到状况:断电,都认为是消息已经成功消费;

  • work模式消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者反馈,如果消费这一直没有反馈,则该消息一直处于不可用状态。

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName WorkProvider
 * @Author ZC
 * @Date 2022/7/3 10:11
 * @Version 1.0
 * @Description
 */
public class WorkProvider {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息
        for (int i=0;i<30;i++){
            String message = "hellwork"+i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            Thread.sleep(1000);
            System.out.println("send ["+(i+1)+"]:"+message);
        }
    }
}

消费者

消费者1,接收一次消息休息1s

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.sql.SQLOutput;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName WorkConsumer_1
 * @Author ZC
 * @Date 2022/7/3 10:24
 * @Version 1.0
 * @Description
 */
public class WorkConsumer_1 {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //工作模式的话,设置一直只接收一条,接收后手动回复
        channel.basicQos(1);
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                String message = new String(body,"utf-8");
                System.out.println("消费者(1)接收到的消息:"+message);
                //接收消息完成后需要手动回复
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //监听消息队列-手动回复时候关闭自动回复
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}

消费者2,接收消息后休息2s

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName WorkConsumer_2
 * @Author ZC
 * @Date 2022/7/3 10:38
 * @Version 1.0
 * @Description
 */
public class WorkConsumer_2 {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);//跟简单模式的区别
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                String message = new String(body,"utf-8");
                System.out.println("消费者(2)接收到的消息:"+message);
                //手动回复
                channel.basicAck(envelope.getDeliveryTag(),false);//跟简单模式的区别
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,false,consumer); //跟简单模式的区别
     }
}

消费者1比消费者2性能高,所以消费者处理消息快

3、发布订阅模式[*]

同一个交换机绑定多个队列,实现广播消息。

交换机类型为:fanout

一个消息同时发送给多个消费者消费,

消息发送到交换机(变换机中绑定了多个队列)服务消费者绑定了队列,

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Provider
 * @Author ZC
 * @Date 2022/7/3 16:17
 * @Version 1.0
 * @Description 发布订阅模式下的
 */
public class Provider {
    private final static String EXCHANGE_NAME="test_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        //发布订阅模式,消费者的消息发送到交换机上,routingkey为空
        //声明交换机,RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。
        /**
         * 路由模式使用: direct
         * 通配符模式使用: topic
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //发送消息
        for(int i=0;i<5;i++){
            String message = "测试订阅/发布模式"+i;
            System.out.println("send [x]"+message);
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        }
    }
}

消费者

消费者1

import com.rabbitmq.client.*;
import com.sun.javaws.jnl.RContentDesc;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer1
 * @Author ZC
 * @Date 2022/7/3 16:35
 * @Version 1.0
 * @Description
 */
public class Consumer1 {
    private final static String QUEUE_NAME = "ex_queue_1";
    private final static String EXCHANGE_NAME="test_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //队列绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

消费者2

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer2
 * @Author ZC
 * @Date 2022/7/3 16:45
 * @Version 1.0
 * @Description
 */
public class Consumer2 {
    private final static String QUEUE_NAME="ex_queue_2";
    private final static String EXCHANGE_NAME = "test_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者2接收到消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

4、路由模式

交换机根据分发规则将消息分发到不同的队列。

交换机类型为:direct

发布订阅模式下:提供者发送消息时的routingkey为“ ”,消费者队列绑定交换机时也没有指定routingkey,那么绑定在交换机上的所有队列都将消费消息

ps: 路由模式下,指定routingkey ,消费者消费绑定的队列与发布时绑定的routingkey对应时才能消费消息

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import jdk.nashorn.internal.ir.CallNode;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName RoutingProvider
 * @Author ZC
 * @Date 2022/7/3 17:31
 * @Version 1.0
 * @Description
 */
public class RoutingProvider {
    private final static String EXCHANGE_NAME = "test_routing";
    private final static String ROUTING_KEY = "rk";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //发送消息
        for(int i = 0;i<5;i++){
            String message = "测试routing模式"+i;
            channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes());
            System.out.println("send [x]:"+message);
        }
    }
}

消费者

消费者1 routingkey = rk,根据routingkey指定绑定交换机中的队列

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Routingconsumer1
 * @Author ZC
 * @Date 2022/7/3 17:32
 * @Version 1.0
 * @Description
 */
public class Routingconsumer1 {
    private final static String QUEUE_NAME = "rk";
    private final static String EXCHANGE_NAME = "test_routing";
    private final static String ROUTING_KEY = "rk";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者1接收到消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

消费者2 routingkey=rk2,

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Routingconsumer2
 * @Author ZC
 * @Date 2022/7/3 17:32
 * @Version 1.0
 * @Description
 */
public class Routingconsumer2 {
    private final static String QUEUE_NAME = "rk1";
    private final static String EXCHANGE_NAME = "test_routing";
    private final static String ROUTING_KEY = "rk1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //队列绑定交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //接收消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者2接收到的消息:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

5、通配符模式[主题模式]

交换机可以根据模糊的分发规则将消息分发到不同的队列。

交换机类型为: topic

消费者routingKey中使用*或#,实现交换机跟队列中的模糊匹配

实例:test.add、test.del、test.other.other。

  • *单层匹配

    • test.*,只能匹配到test.add、test.del
  • 多成匹配

    • test.# 能匹配所有以test开头的

提供者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName provider
 * @Author ZC
 * @Date 2022/7/3 21:02
 * @Version 1.0
 * @Description
 */
public class Provider {
    private final static String EXCHANGE_NAME="ex_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        String message = "测试通配符模式";
        channel.basicPublish(EXCHANGE_NAME,"test.topic",null,message.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"test.routing",null,message.getBytes());
        channel.basicPublish(EXCHANGE_NAME,"test.other.other",null,message.getBytes());
        System.out.println("send[x]:"+message);
    }
}

消费者

消费者1,*匹配

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer1
 * @Author ZC
 * @Date 2022/7/3 21:13
 * @Version 1.0
 * @Description
 */
public class Consumer1{
    private final static String EXCHANGE_NAME="ex_topic";
    private final static String QUEUE_NAME="queue_topic_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"test.*"); //*匹配
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者1:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

消费者2,#匹配

import com.rabbitmq.client.*;
import org.example.utils.RabbitMqUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer2
 * @Author ZC
 * @Date 2022/7/3 21:13
 * @Version 1.0
 * @Description
 */
public class Consumer2 {
    private final static String EXCHANGE_NAME="ex_topic";
    private final static String QUEUE_NAME="queue_topic_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = new RabbitMqUtil().getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"test.#"); //#匹配
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("消费者2:"+message);
            }
        };
        channel.basicConsume(QUEUE_NAME,consumer);
    }
}

6、远程调用模式

客户端发送请求到请求队列,并设置临时的响应队列,服务端订阅请求队列,并发送响应到临时的响应队列。

7、发布确认模式

消息可靠发送。

总结

交换机类型

  • fanout 直连交换机:交换机忽略路由键,直接将消息分发到队列中。
  • direct 路由交换机:交换机通过路由键,将消息分发到不同的队列中。(默认交互机使用队列名作为路由键)
  • topic 主题交换机:交换机可以和队列绑定模糊的分发规则,以匹配不同的路由键。
  • headers 头部交换机:不依赖于路由键,而是绑定时指定一组键值对,并提取消息的键值对是否完全匹配绑定的键值对,如果完全匹配则会把消息路由到该队列中。
这篇关于RabbitMQ-消息队列的收发模式(二)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!