MQ消息队列是一种软件系统,用于在不同应用程序或组件之间异步传递和分发消息。它提供了可靠的消息传递服务,支持多种消息传递协议,确保跨平台和跨语言的兼容性。MQ消息队列在系统解耦、异步处理和削峰填谷等方面发挥着重要作用,同时具备高可用性和可扩展性。
MQ消息队列是一种软件系统,用于在不同应用程序或组件之间异步传递和分发消息。它提供了可靠的消息传递服务,允许发送方和接收方在不直接连接的情况下进行通信。MQ消息队列支持多种消息传递协议,如AMQP、JMS、Kafka协议等,以确保跨平台和跨语言的兼容性。
消息发送与接收的流程如下:
发送消息时,发送方需要创建一个消息对象,设置消息体和消息类型,然后将消息发送到指定的消息队列。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
接收消息时,消费者需要从消息队列中获取消息,并处理消息体。然后根据消息队列的确认机制发送确认消息,确认消息已经被处理。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
消息对象是消息队列的基本单元,它包含两部分:
消息队列是一个数据结构,用于存储多个消息对象。消息队列可以支持多种队列类型,如点对点队列(P2P)和发布/订阅队列(Pub/Sub)。
发送者是发送消息的客户端或应用,它可以将消息直接发送到消息队列。
接收者是消费消息的客户端或应用,它可以从前端队列中获取消息并进行处理。
消息队列通常由中间件支持,中间件负责消息的传递、路由和存储。常见的消息队列中间件包括RabbitMQ、Kafka、ActiveMQ等。
Apache Kafka是一个分布式流处理平台,它既可以用于实时分析,也可以作为消息队列使用。
Kafka集群由多个节点(Broker)组成,每个节点可以运行在不同的机器上。每个节点可以管理多个主题(Topic),每个主题可以由多个分区(Partition)组成。消费者可以根据需要读取一个或多个分区的消息。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("my-topic", "key", "value")); producer.close(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } } }
RabbitMQ是一个基于AMQP协议的消息队列实现,它支持多种消息传递模式,并提供了一系列可靠的特性来确保消息的传递。
RabbitMQ由多个节点(Node)组成,每个节点可以运行在不同的机器上。每个节点可以管理多个队列(Queue),每个队列可以由多个交换机(Exchange)组成。生产者和消费者可以连接到不同的节点上。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
ActiveMQ是一个基于JMS规范的消息队列实现,它支持多种消息传递模式,并提供了一系列可靠性和安全性特性。
ActiveMQ由多个节点(Broker)组成,每个节点可以运行在不同的机器上。每个节点可以管理多个队列(Queue),每个队列可以由多个主题(Topic)组成。生产者和消费者可以连接到不同的节点上。
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMQProducer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue"); MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("Hello World!"); producer.send(message); session.close(); connection.close(); } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMQConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = (TextMessage) consumer.receive(); System.out.println("Received: " + message.getText()); session.close(); connection.close(); } }
安装MQ消息队列的步骤如下:
bin/rabbitmq-server
、bin/kafka-server-start.sh
等。安装RabbitMQ的步骤如下:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.1/rabbitmq-server_3.10.1-1_all.deb
sudo dpkg -i rabbitmq-server_3.10.1-1_all.deb
sudo service rabbitmq-server start
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz tar -xzf kafka_2.13-2.8.0.tgz cd kafka_2.13-2.8.0 bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties
wget http://archive.apache.org/dist/activemq/5.16.0/apache-activemq-5.16.0-bin.tar.gz tar -xzf apache-activemq-5.16.0-bin.tar.gz cd apache-activemq-5.16.0 bin/activemq start
RabbitMQ可以通过配置文件或命令行参数进行配置。配置文件通常位于/etc/rabbitmq/
目录下,如rabbitmq.conf
。
# 管理插件 plugins.rabbitmq_management = enabled # 集群设置 cluster_formation.node_type = disc cluster_formation.type = multicast cluster_formation.node_port = 25672 cluster_formation.autodiscovery_address = 192.168.1.100 # 网络设置 networking.tcp_listen = 0.0.0.0:5672 networking.ssl_listen = 0.0.0.0:5671
Kafka可以通过配置文件进行配置。配置文件通常位于<KAFKA_HOME>/config
目录下,如server.properties
。
# Kafka集群设置 listeners=PLAINTEXT://:9092 # 日志设置 log.dirs=/var/log/kafka # 日志保留设置 log.retention.hours=168 log.retention.check.interval.ms=300000
ActiveMQ可以通过XML配置文件进行配置。配置文件通常位于<ACTIVEMQ_HOME>/conf
目录下,如activemq.xml
。
<beans xmlns="http://activemq.apache.org/schema/core"> <broker xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://0.0.0.0:61616"/> </transportConnectors> </broker> </beans>
发送消息是消息队列中最基本的操作之一,通过发送消息可以将数据发送到消息队列中。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("my-topic", "key", "value")); producer.close(); } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMQProducer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue"); MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("Hello World!"); producer.send(message); session.close(); connection.close(); } }
接收消息是消息队列中最基本的操作之一,通过接收消息可以消费消息队列中的数据。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMQConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = (TextMessage) consumer.receive(); System.out.println("Received: " + message.getText()); session.close(); connection.close(); } }
消息确认机制用于保证消息的可靠性传递。发送方发送消息后,接收方需要确认消息已经被正确处理,然后发送确认消息给发送方。如果接收方没有发送确认消息,发送方会重新发送消息。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMQConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = (TextMessage) consumer.receive(); System.out.println("Received: " + message.getText()); session.commit(); session.close(); connection.close(); } }
使用Kafka可以实时聚合和存储日志数据。日志数据可以由多个日志生成器生成,然后通过Kafka进行聚合和存储。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class LogProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("logs", "key", "log message")); producer.close(); } }
使用RabbitMQ可以实现系统的异步通信。发送方和接收方可以通过消息队列进行异步通信,以解耦系统组件。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='async_communication') channel.basic_publish(exchange='', routing_key='async_communication', body='Async message') connection.close()
使用ActiveMQ可以实现任务队列。任务队列可以用于处理批量任务或定时任务。
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TaskProducer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("task"); MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("Task message"); producer.send(message); session.close(); connection.close(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); } } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMQConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(queue); TextMessage message = (TextMessage) consumer.receive(); System.out.println("Received: " + message.getText()); session.commit(); session.close(); connection.close(); } }