消息队列MQ

RabbitMQ

本文主要是介绍RabbitMQ,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Rabbitmq基本概念

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。


 

搭建Rabbitmq服务器:

rabbitmq安装

1.使用docker环境,下载rabbitmq:management镜像

有压缩包的直接使用即可

docker pull rabbitmq:management

将压缩包放入root目录下并进行导入镜像:

docker load -i rabbit-image.gz   #导入rabbit镜像

docker images   #查看 

2.关闭防火墙

systemctl stop firewalld
systemctl disable firewalld
 
# 重启 docker 系统服务
systemctl restart docker

3.配置管理员用户名和密码

mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf

# 添加两行配置:
default_user = admin
default_pass = admin

4.启动Rabbitmq

docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management

访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin

 

 Rabbitmq 六中工作模式 在idea中应用

添加依赖

 <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>

简单模式:只有一个消费者

生产者发送消息:

package rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		//创建连接工厂,并设置连接信息
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.141");
		f.setPort(5672);//可选,5672是默认端口
		f.setUsername("admin");
		f.setPassword("admin");

		/*
		 * 与rabbitmq服务器建立连接,
		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
		 * 并开辟多个信道与客户端通信
		 * 以减轻服务器端建立连接的开销
		 */
		Connection c = f.newConnection();
		//建立信道
		Channel ch = c.createChannel();

		/*
		 * 声明队列,会在rabbitmq中创建一个队列
		 * 如果已经创建过该队列,就不能再使用其他参数来创建
		 * 
		 * 参数含义:
		 *   -queue: 队列名称
		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
		 *   -exclusive: 排他,true表示限制仅当前连接可用
		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
		 *   -arguments: 其他参数
		 */
		ch.queueDeclare("helloworld", false,false,false,null);

		/*
		 * 发布消息
		 * 这里把消息向默认交换机发送.
		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
		 * 
		 * 参数含义:
		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
		 * 	-props: 其他参数,例如头信息
		 * 	-body: 消息内容byte[]数组
		 */
		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());

		System.out.println("消息已发送");
		c.close();
	}
}

消费者接收消息:

 

package rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		//连接工厂
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.141");
		f.setUsername("admin");
		f.setPassword("admin");
		//建立连接
		Connection c = f.newConnection();
		//建立信道
		Channel ch = c.createChannel();
		//声明队列,如果该队列已经创建过,则不会重复创建
		ch.queueDeclare("helloworld",false,false,false,null);
		System.out.println("等待接收数据");
		
		//收到消息后用来处理消息的回调对象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
			}
		};
		
		//消费者取消时的回调对象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume("helloworld", true, callback, cancel);
	}
}

FANOUT群发模式

生产者发送消息时会发送给每一个接收者

生产者:

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
//当你生产者有一个队列时,消费者也必须一个队列进行接收。
//通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道
        //创建Fanout交换机: logs为消息队列的名字   发布和订阅模式 群发机制
        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

        while (true){
            System.out.println("输入传递的信息:");
            String s = new Scanner(System.in).nextLine();
            c.basicPublish("logs","",null,s.getBytes());
        }

    }
}

消费者:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道

        //1.创建队列 2.创建交换机 3.进行消息队列的绑定
        String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息
                    //队列名     非持久     独占     自动删除
        c.queueDeclare(queue,false,true,true,null);
        //创建交换机  生产者是什么消息模式就创造什么消息模式  fanout交换机
        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        //进行绑定  对fanout交换机来说 第三个参数是无效的
        c.queueBind(queue,"logs","");

        //正常接收消息 创建回调对象
        DeliverCallback deliverCallback =(consumerTag,message) -> {

            //此处处理消息
            String s = new String(message.getBody());
            System.out.println("收到:"+s);
        };
        CancelCallback cancelCallback =consumerTag -> {
        };
        //开始接受消息,把消息传递给一个回调对象进行处理
        c.basicConsume(queue,true,deliverCallback,cancelCallback);

    }

}

 路由模式  Direct

路由模式,设定关键词,会根据生产者发送的关键词进行接收消息。如果关键词不匹配是不会接收消息的。

生产者:

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Producer1 {

    public static void main(String[] args) throws IOException, TimeoutException {
//当你生产者有一个队列时,消费者也必须一个队列进行接收。
//通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道

        //创建路由模式交换机 : Direct
        c.exchangeDeclare("direct_logs",BuiltinExchangeType.DIRECT);
        //向交换机发送消息,并携带路由关键词
        while (true){
            System.out.println("输入消息:");
            String s = new Scanner(System.in).nextLine();
            System.out.println("输入路由键:");
            String k = new Scanner(System.in).nextLine();
            //对默认交换机“”,会自动使用队列名作为路由键
            c.basicPublish("direct_logs",k,null,s.getBytes());
        }


    }
}

 消费者:

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory f = new ConnectionFactory();  //通过连接工厂进行连接
        f.setHost("192.168.64.141");    //进行连接对应服务的ip
        f.setPort(5672);                //访问的消息服务的端口号
        f.setUsername("admin");         //账号密码
        f.setPassword("admin");

        Connection connection = f.newConnection();
        Channel c = connection.createChannel();     //通信通道

        //1.创建队列 2.创建交换机 3.进行消息队列的绑定
        String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息
                    //队列名     非持久     独占     自动删除
        c.queueDeclare(queue,false,true,true,null);
        c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//创建路由交换机
        System.out.println("输入绑定的关键词,用空格隔开");
        String s = new Scanner(System.in).nextLine();
        String[] a = s.split("\\s+");  //  \s是空白格 + 指一到多个
        for (String k : a){
            c.queueBind(queue,"direct_logs",k);  //进行遍历绑定
        }
        //从队列接收消息
        DeliverCallback deliverCallback =(consumerTag, message) -> {

            //此处处理消息
            String msg = new String(message.getBody());
            String key = message.getEnvelope().getRoutingKey();//得到路由键
            System.out.println(key+"----"+msg);
        };
        CancelCallback cancelCallback =consumerTag -> {
        };
        //开始接受消息,把消息传递给一个回调对象进行处理
        c.basicConsume(queue,true,deliverCallback,cancelCallback);

    }
}

 主题模式:

发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

* 可以通配单个单词。
# 可以通配零个或多个单词

生产者:

public class Producer5 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //生产者创建交换机   消费者创建队列
        //连接

        //连接
        ConnectionFactory f = new ConnectionFactory();//连接工厂
        f.setHost("192.168.64.129"); //进行连接对应服务的ip
        f.setPort(5672);            //访问的消息服务端口号
        f.setUsername("admin");     //账号
        f.setPassword("admin");    //密码
        Connection con = f.newConnection();
        Channel c = con.createChannel();//通信通道

        //创建Topic交换机 :topic_logs    会自动使用队列作为关键词
        c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        //向交换机发送消息,并携带路由关键词
        while (true) {
            System.out.println("输入信息:");
            String s = new Scanner(System.in).nextLine();
            System.out.println("输入路由键:");
            String k = new Scanner(System.in).nextLine();
            //对默认交换机“”,会自动使用队列名作为路由键
            c.basicPublish("topic_logs", k, null, s.getBytes());

        }

    }
}

消费者:

public class Consumer5 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //连接
        ConnectionFactory f = new ConnectionFactory();//连接工厂
        f.setHost("192.168.64.141"); //进行连接对应服务的ip
        f.setPort(5672);            //访问的消息服务端口号
        f.setUsername("admin");     //账号
        f.setPassword("admin");    //密码
        Connection con = f.newConnection();
        Channel c = con.createChannel();//通信通道

        //1.创建随机队列  2.创建交换机 3.使用绑定建关键词绑定
        String queue = UUID.randomUUID().toString();
        //非持久,独占,自动删除
        c.queueDeclare(queue, false, true, true, null);
        c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//创建交换机
        System.out.println("输入绑定建关键词,用空格隔开:");
        String s = new Scanner(System.in).nextLine();
        String[] a = s.split("\\s+");  //   \s是空白字符  + 指一到多个
        for (String k : a) {
            c.queueBind(queue, "topic_logs", k);  //进行循环遍历绑定
        }

        //正常接收消息
        //正常从队列接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //此处处理消息
            String msg = new String(message.getBody());
            String key = message.getEnvelope().getRoutingKey();//得到路由键
            System.out.println(key + "---" + msg);
        };
        CancelCallback cancelCallback = consumerTag -> {
        };
        c.basicConsume(queue, true, deliverCallback, cancelCallback);

    }
}

 队列的持久化,消息持久化

将创建的队列第二个false改成true即可变成持久操作。

已经创建的队列参数是不能修改的,因为已经上传到服务器中了,可以创建一个新的队列再进行修改后面的参数。生产者与消费者必须一直。

这篇关于RabbitMQ的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!