消息队列解决什么问题?
模型
P:消息生产者
红色:阶列
C:消息消费者
耦合性高,生产者—消费者一一对应。队列名变更都得变理
模型
为什么会出现工作队列
Simple队列是一一对应的,而且我们实际开发,生产者发送消息是不费力的,而消费者一般是跟业务相结合的。,消息者接收到消息之后就需要处理。需要花费时间。队列就需要更多的消费者。
现象:
消费者1和2处理的消息是一样的
消费者1:奇数
消费者2:偶数
其实是一个轮询分发(roun-robin)
公平分发(Fail dispatch)
使用公平发分要关闭autoACK,改成手动。
公共的消费类
public class BHFailCustomConsumer extends DefaultConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(BHFailCustomConsumer.class); private String consumerName = "defaultConsumer"; private long delayTime = 100L; public BHFailCustomConsumer(Channel channel,long delayTime, String ...name) { super(channel); try { //接收消息,在没有应答前只接收1条消息 channel.basicQos(1); } catch (IOException e) { e.printStackTrace(); } this.delayTime = delayTime; if(name!=null) { this.consumerName = name[0]; }else{ this.consumerName = "defaultConsumer "+ UUID.randomUUID().toString(); } } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); LOGGER.debug(this.consumerName +"_"+msg); try { Thread.sleep(delayTime); } catch (InterruptedException e) { e.printStackTrace(); } //手动应答 getChannel().basicAck(envelope.getDeliveryTag(),false); } }
C1,C2,使用不是的delayTime延迟
Connection connection = messageBrokerHelper.getConnection(); Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME); Consumer consumer = new BHFailCustomConsumer(channel,delayTime,name); try { LOGGER.info("Consumer {} waiting consume,delay time {}",name,delayTime); //关掉自动应答,由消费者手动处理应答 channel.basicConsume(QUEUE_NAME,false,consumer); } catch (IOException e) { LOGGER.debug("consumer listener failure",e); }
W 生产者
public void sender(int senderCount){ Connection connection = messageBrokerHelper.getConnection(); Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME); try { int senderTime = senderCount < 1 ? 1 : senderCount; for (int i=0;i<senderTime;i++) { String body = "hello,rabbit mq" + i; channel.basicPublish("", QUEUE_NAME, null, body.getBytes()); } } catch (IOException e) { e.printStackTrace(); }finally { messageBrokerHelper.closeChannel(channel); messageBrokerHelper.closeConnection(connection); } }
模型
场景:
注册->邮件->短信
生产者
消费者
exchange(交换机 转发器)
接收生产者消息,并接收到的消转发给队列
fanout:不处理路由键
Direct:处理路由键
‘# 匹配一个或者多个
*匹配一个
两种方式:
AMQP实现了事务机制
Confirm模式
txselect
用户将当前channel设置成transation模式
txCommit
用于提交事务
回滚事务
Confirm模式
生产者的实现原理
开启confirm模式