MQ :message queue ,消息队列(消息中间件),遵守JMS(java message service)规范的一种软件。
1)异步处理请求。
对于不需要实时响应的请求,可以将消息发送到MQ上,等待消费者在有处理能力的时候再对MQ上的请求进行处理,而不需要实时返回,对于突然暴增的请求或者大量的请求,可以全部堵塞在MQ上,而不会占用更多的系统资源,防止系统资源耗尽而宕机。
2)对系统架构解耦。
如果一个系统依赖多个服务,使用MQ可以将依赖的服务换成MQ,则只需要依赖MQ即可。
activemq、rocketmq,rabbitmq,kafka
https://blog.csdn.net/qq_36761831/article/details/99841388
P2P:消息域使用 queue 作为 Destination,消息可以被同步或异步的发送和接收,每个消息只会给一个 Consumer 传送一次。
Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。
多个 Consumer 可以注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,然后由该 Consumer 来确认消息(持久化)。并且在这种情况下,Provider 对所有注册的 Consumer 以轮询的方式发送消息。
PUB/SUB:消息域使用 topic 作为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给所有订阅者。接收方式(同步和异步)与 P2P 域相同。
除非显式指定,否则 topic 不会为订阅者保留消息。当然,这可以通过持久化(Durable)订阅来实现消息的保存。这种情况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者重新连接时,将会受到所有的断连期间未消费的消息。
- 完全支持JMS 1.1和J2EE 1.4规范(持久化,XA消息,事务)
- 支持多种传输协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 可插拔的体系结构,可以灵活制定,如:消息存储方式,安全管理等
- 很容易和Application Server集成使用
- 多种语言和协议编写客户端,如:Java,C,C++,C#,Ruby,Perl,Python,PHP
- 从设计上保证了高性能的集群,客户端—服务器,点对点
- 可以很容易的和Spring结合使用
- 支持通过 JDBC 和 journal 提供高速的消息持久化
- 支持和Axis的整合
生产者: //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(queue); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-queue"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); 消费者: //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Queue queue = session.createQueue("test-queue"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(queue); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close();
1、调用consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。
这种情况下,消息返回给方法调用者之后就自动被确认了
2、采用listener回调函数,在有消息到达时,会调用listener接口的onMessage方法。
在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认
为了防止突然的宕机后的消息数据丢失,需要对MQ消息进行持久化。
在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动后首先要检查指定的存储位置,若有未发送成功的消息,则需要把消息发送出去。
KahaDB是基于日志文件的持久性数据库,是自ActiveMQ 5.4以来的默认存储机制
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址:(文件在apache-activemq\data\kahadb下)
1、db-<number>.log(主要存数据)是KahaDB存储消息到预定义大小的数据记录文件,文件命名为db-<number>.log,当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随之消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log……当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
2、db.data文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-<number>.log里面存储的消息。
3、db.free文件表示当前db.data文件哪些页面是空闲的,文件具体内容是所有空闲页的ID。
4、db.redo文件是用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引。
5、lock文件表示当前获得KahaDB读写权限的broker。
JMS规范除了为消息生产者端提供事务支持以外,还为消费服务端准备了事务的支持。
可以通过在消费者端操作事务的commit和rollback方法,向服务器告知一组消息是否处理完成。采用事务的意义在于,一组消息要么被全部处理并确认成功,要么全部被回滚并重新处理。
使用方式:
开启本地事务: Connection connection = connectionFactory.createConnection(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); ...... session.commit();
ActiveMQ有两种通信方式,点到点形式和发布订阅模式。
1、如果是点到点模式的话,如果消息发送不成功,此消息默认会保存到ActiveMQ服务端直到有消费者将其消费,所以此消息是不会丢失的。
2、如果是发布订阅模式的通信方式,默认情况只通知一次,如果接受不到此消息就没有了,这种场景使用于对消息发送率要求不高的情况,如果要求消息必须送达不可以丢失的话,需要配置持久订阅。每个订阅端定义一个id,在订阅是向ActiveMQ注册,发布消息和接受消息时需要配置发送模式为持久化,此时如果客户端接受不到消息,消息会持久化到服务端,直到客户端正常接收后为止。
1、每条消息新增一个唯一标识uuid,分布式缓存存储uuid,如果接收到相同的标识,将不再处理此条消息。
简单点说就是当网络发送方发送一堆数据,然后调用close关闭连接之后。这些发送的数据都在接收者的缓存里,接收者如果调用read方法仍旧能从缓存中读取这些数据,尽管对方已经关闭了连接。但是当接收者尝试发送数据时,由于此时连接已关闭,所以会发生异常,这个很好理解。不过需要注意的是,当发生SocketException后,原本缓存区中数据也作废了,此时接收者再次调用read方法去读取缓存中的数据,就会报Software caused connection abort: recv failed错误。
ActiveMQ会每隔10秒发送一个心跳包,这个心跳包是服务器发送给客户端的,用来判断客户端死没死。非持久化消息堆积到一定程度会写到文件里,这个写的过程会阻塞所有动作,而且会持续20到30秒,并且随着内存的增大而增大。当客户端发完消息调用connection.close()时,会期待服务器对于关闭连接的回答,如果超过15秒没回答就直接调用socket层的close关闭tcp连接了。这时客户端发出的消息其实还在服务器的缓存里等待处理,不过由于服务器心跳包的设置,导致发生了java.net.SocketException异常,把缓存里的数据作废了,没处理的消息全部丢失。
1、用持久化消息【可以使用对数据进行持久化JDBC,AMQ(日志文件),KahaDB和LevelDB】。
2、非持久化消息及时处理不要堆积。
3、启动事务,启动事务后,commit()方法会负责任的等待服务器的返回,也就不会关闭连接导致消息丢失了。
非持久化的消息是异步发送的,持久化的消息是同步发送的,遇到慢一点的硬盘,发送消息的速度是无法忍受的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升。
所以在发送持久化消息时,请务必开启事务模式。其实发送非持久化消息时也建议开启事务,因为根本不会影响性能。
由于ActiveMQ的prefetch机制,当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。
这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除,如果消费者崩溃,则这些消息会被重新分配给新的消费者。
但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。更通常的情况是,消费这些消息非常耗时,你开了10个消费者去处理,结果发现只有一台机器吭哧吭哧处理,另外9台啥事不干。
解决方案:将prefetch设为1,每次处理1条消息。
预领取(prefetch)的目的:不限制每次push给消费者的消息个数将会是危险的,过多的消息会导致消费者客户端资源耗尽。尤其是当消息的消费速度显著慢于消息的分发速度时。
当broker分发给一个消费者prefetch limit个消息之后,在消费者ACK了至少50%的消息(具体来说,是prefetch limit/2个消费者接收到的消息)之前不会再次分发消息至该消费者。当broker接收到这些消息的ACK之后,会再次分发prefetch limit/2个消息到消费者的消息缓冲区。
消息量很大,为了获得高性能,推荐使用大预取值(prefetch value)。
消息量较小,且每个消息需要处理很长时间时,预取值(prefetch value)应该设置为1。这样可以确保一个消费者一次仅处理一个消息。
指定预取极限(prefetch limit)为0,broker将不会向消费者push消息,取而代之的是,消费者从broker poll消息,每次拉取一条。
需要注意的是,有些消费者不能缓存消息,针对这类消费者,预取值(prefetch value)必须设置为1。例如一些通过Ruby这类脚本语言实现的消费者,通过STOMP协议连接,就没有客户端消息缓冲区的概念。
进入死信队列条件:当一条消息没有被确认并重新退回到服务器重新分配6次之后,仍然没有被确认,则会进入到死信队列中(ActiveMQ.DLQ)。