MQ,Message Queue,基于TCP协议构建的简单协议,区别于具体的通信协议。
基于通信协议定义和抽象的更高层次的通信模型,一般都是生产者和消费者模型,又或者说服务端和客户端模型。
生产者/消费者模型:一般通过定义生产者和消费者实现消息通信从而屏蔽复杂的底层通信协议。应用于分布式应用系统,而且为之提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量堆积,高吞吐和可靠性重试机制的特性。
数据结构:消息队列的数据结构采用FIFO方式来定义与实现
设计模式:采用观察者模式
观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新
NameServer:名称服务器[MQ命名空间服务器],大致相当于 jndi技术,更新和发现 broker服务。用于保存Broker相关元信息,并给生产者和消费者查找Broker消息。每个Broker在启动都会在名称服务器[NameServer]注册,生产者在发送消息前会根据消息主题到名称服务器查询获取Broker路由消息,消费者也会定时获取主题的路由消息。
ps:属于无状态服务设计,可横向扩展,节点之间无通信,可以部署多台机器来标识伪集群。
Broker:消息存储中心[消息中转角色],负责存储和转发消息。接收来自生产者的消息并进行存储,消费者从这拉取消息。存储与消息相关的元数据,主要包括用户组,消息进度偏移量,队列消息等。其中Broker分为Master和Slave节点:
ps:一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master和Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义。BrokerId为0表示Master,BrokerId非0表示Slave。然后所有的Broker和Name Server上的节点建立长连接,定时注册Topic信息到所有Name Server。
Producer:消息生产者->负责生产消息,生产者向消息服务器发送业务应用程序生成的消息。主要有同步发送和异步发送方式两种,其中:
ps:Producer与Name Server其中一个节点建立连接。定期从Name Server取Topic信息。并与提供该Topic信息的Master建立长连接。Producer也可以集群部署。
Consumer:消息消费者 负责消费消息,从消息服务器拉取消息并将其输入用户应用程序中。主要分为拉取型消费者和推送型消费者:
拉取型消费者:Pull Consumer->主动从消息服务器拉取消息,只要批量拉取消息,用户就会启动消费过程
推送型消费者:Push Consumer->封装消息的拉取,消费进度和其它内部维护工作,消息到达之后便执行回调接口留给用户应用程序来实现。属于被动消费类型,Push拉取时需要注册消息费者监听器,当监听器被触发之后开始消费消息。
ps:Consumer 与Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
拉取rocketmq镜像:docker pull foxiswho/rocketmq
1.查询镜像images:docker search rocketmq
[root@centos-meteor ~]# docker search rocketmq NAME DESCRIPTION STARS OFFICIAL AUTOMATED styletang/rocketmq-console-ng rocketmq-console-ng 20 rocketmqinc/rocketmq Image repository for Apache RocketMQ 17 foxiswho/rocketmq rocketmq 14 laoyumi/rocketmq 10 [OK] xlxwhy/rocketmq alibaba's rocketmq 4 huanwei/rocketmq-broker 2 2019liurui/rocketmq-broker RocketMQ broker image for RocketMQ-Operator 1 2019liurui/rocketmq-namesrv RocketMQ name service image for RocketMQ-Ope… 1 apacherocketmq/rocketmq Docker Image for Apache RocketMQ 1 rocketmqinc/rocketmq-operator The Kubernetes operator for RocketMQ 0 2019liurui/rocketmq-operator Kubernetes Operator for RocketMQ ! 0 apacherocketmq/rocketmq-operator RocketMQ Operator is to manage RocketMQ serv… 0 coder4/rocketmq rocketmq 0 [OK] rocketmqinc/rocketmq-namesrv Customized RocketMQ Name Server Image for Ro… 0 rocketmqinc/rocketmq-broker Customized RocketMQ Broker Image for RocketM… 0 slpcat/rocketmq-console-ng 0 huanwei/rocketmq 0 huanwei/rocketmq-broker-k8s 0 king019/rocketmq rocketmq 0 pengzu/rocketmq-console-ng web console for rocketmq ,this code is from … 0 fengzt/rocketmq-broker apache rocketmq 4.2.0 broker server(官方文档… 0 huanwei/rocketmq-operator 0 slpcat/rocketmq 0 fengzt/rocketmq-nameserver apache rocketmq 4.2.0 nameserver 0 icyblazek/rocketmq RocketMQ 0 [root@centos-meteor ~]#
2.执行:docker pull foxiswho/rocketmq
3.创建docker存储根目录并且授权:
mkdir docker && chmod -R 777 docker/
[root@centos-meteor /]# cd docker/ [root@centos-meteor docker]# pwd /docker [root@centos-meteor docker]#
4.部署名称服务器rocketmq-namesrv-server[9876]:
#rocketmq-namesrv-server
docker run -itd --restart=always --privileged=true -p 9876:9876 --name rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e “MAX_POSSIBLE_HEAP=100000000” -e “JAVA_OPTS=-Duser.home=/opt” -e “JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m” rocketmqinc/rocketmq sh mqnamesrv
docker run -itd --restart=always --privileged=true -p 9876:9876 --name rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqnamesrv
ps:创建/docker/rocketmq/namesrv/logs和/docker/rocketmq/namesrv/store目录
5.部署消息服务器rocketmq-broker-server[10911]:
#rocketmq-broker-server
docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e “NAMESRV_ADDR=namesrv:9876” -e “MAX_POSSIBLE_HEAP=200000000” -e “JAVA_OPTS=-Duser.home=/opt” -e “JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m” rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
ps:
1.创建/docker/rocketmq/broker/logs和/docker/rocketmq/broker/store目录
2.编写broker.conf配置文件:
brokerClusterName = rocketmq-cluster brokerName = broker-server brokerId = 0 deleteWhen = 04 fileReservedTime = 48 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRote=ASYNC_MASTER brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP brokerIP1=Server-IP #限制的消息大小 maxMessageSize=65536 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 #并发send线程数,多线程来发送消息可能会出现broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=8 highSpeedMode=false commercialBaseCount=1 maxErrorRateOfBloomFilter=20 accessMessageInMemoryMaxRatio=40 #无读写客户端存活时间 clientChannelMaxIdleTimeSeconds=120 flushDelayOffsetInterval=10000 serverSocketRcvBufSize=131072 #单次 Pull 消息(内存)传输的 最大字节数 maxTransferBytesOnMessageInMemory=262144 clientManageThreadPoolNums=32 serverChannelMaxIdleTimeSeconds=120 serverCallbackExecutorThreads=0 enablePropertyFilter=false transientStorePoolSize=5 enableConsumeQueueExt=false #rocketmq server config serverPooledByteBufAllocatorEnable=true serverSocketRcvBufSize=131072 #rocketmq client config
6.部署控制后台rocketmq-consloe-server[8082]:
#rocketmq-consloe-server
docker run -itd -p 8082:8080 --restart=always --privileged=true --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e “JAVA_OPTS=-Drocketmq.namesrv.addr=47.104.22.10:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false” -e “JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt” styletang/rocketmq-console-ng:latest
docker run -itd -p 8082:8080 --restart=always --privileged=true --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e "JAVA_OPTS=-Drocketmq.namesrv.addr=Server-IP:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt" styletang/rocketmq-console-ng:latest
ps:
1.创建/docker/rocketmq/console/data目录
2.Server-IP为broker.conf配置brokerIP1值
7.最终部署结果:
1.配置Rocketmq的Maven依赖:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
ps:
1.目前rocketmq最新版本是4.7.1:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
2.这里选择的是rocketmq版本[4.4.0],因为部署是4.4.0版本,推荐部署版本相同的rocketmq
2.创建pivotal-cloud-queue工程:
ps:创建消息队列统一模块工程,在业务应用程序服务模块依赖该工程,并封装消息生产者[Producer]和消息消费者[Consumer]处理类。
3.封装rocketmq属性配置类:
添加spring-boot-configuration-processor依赖:
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
编写RocketmqProperties属性配置类:
package com.pivotal.cloud.queue.properties; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @className: com.pivotal.cloud.queue.properties.RocketmqProperties * @title: RocketmqProperties * @description: 封装Pivotal项目RocketmqProperties类 * @content: PivotalCloud项目系统RocketmqProperties自定义属性配置类 * @author: marklin * @datetime: 2020-07-07 01:32 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ @Data @Configuration @ConfigurationProperties(prefix = "pivotal.cloud.rocketmq") public class RocketmqProperties { /** * rocketmq消息队列生产者-Producer */ private final Producer producer = new Producer(); /** * rocketmq消息队列消费者-Consumer */ private final Consumer consumer = new Consumer(); @Data public static class Producer { } @Data public static class Consumer { } }
编写RocketmqConfiguration配置类:
package com.pivotal.cloud.queue.configuration; import com.pivotal.cloud.queue.properties.RocketmqProperties; import lombok.AllArgsConstructor; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @className: com.pivotal.cloud.queue.configuration.RocketmqConfiguration * @title: RocketmqConfiguration * @description: 封装Pivotal项目RocketmqConfiguration类 * @content: //TODO * @author: marklin * @datetime: 2020-07-07 02:25 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ @AllArgsConstructor @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties({RocketmqProperties.class}) public class RocketmqConfiguration { }
编写RocketmqTemplate生产者模板类:
package com.pivotal.cloud.queue.template; /** * @className: com.pivotal.cloud.queue.template.RocketmqTemplate * @title: RocketmqTemplate * @description: 封装Pivotal项目RocketmqTemplate类 * @content: PivotalCloud项目系统RocketmqTemplate生产者模板类 * @author: marklin * @datetime: 2020-07-07 02:48 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ public class RocketmqTemplate { }
编写RocketmqListener消费者监听器类:
package com.pivotal.cloud.queue.listener; /** * @className: com.pivotal.cloud.queue.listener.RocketmqListener * @title: RocketmqListener * @description: 封装Pivotal项目RocketmqListener类 * @content: PivotalCloud项目系统RocketmqListener消费者监听器类 * @author: marklin * @datetime: 2020-07-07 02:52 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ public interface RocketmqListener { }
编写META-INF/spring.factories工厂类:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.pivotal.cloud.queue.configuration.RocketmqConfiguration,\ com.pivotal.cloud.queue.configuration.RabbitmqConfiguration,\ com.pivotal.cloud.queue.configuration.ActivemqConfiguration,\ com.pivotal.cloud.queue.configuration.KafkaConfiguration,
版权声明:本文为博主原创文章,遵循相关版权协议,如若转载或者分享请附上原文出处链接和链接来源。