一、SpringBoot整合AvtiveMQ部分
1.引入ActiveMQ依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
2.在application中加入配置参数,放在spring配置下
activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin in-memory: false packages: trust-all: true pool: max-connections: 10 enabled: false idle-timeout: 30000 jms: pub-sub-domain: true
3.创建topic
package com.mengxiangnongfu.smart_party_school.framework.configure; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.EnableJms; import org.springframework.stereotype.Component; import javax.jms.Topic; /** * @author Yang * @version 1.0 * @date 2022/6/23 13:51 */ @Component @EnableJms public class ActiveMqTopicConfig { /** * 照明的topic * @return */ @Bean(name = "lighting") public Topic lig() { return new ActiveMQTopic("lighting-topic"); } }
4.发送数据
@Autowired @Qualifier(value = "lighting") //注入配置topice的bean 因为可能配置多个topice的bean所以加入 @Qualifier指定name private Topic lighting;
@Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired @Qualifier(value = "lighting") private Topic lighting; @Release @GetMapping("/active-mq/send") public AjaxResult activeMqSend() { log.debug("【debug】......."); jmsMessagingTemplate.convertAndSend(lighting, "F1 F2 F3 F4 F4"); return AjaxResult.success(); }
二、C#整合ActiveMQ
1.通过NuGet引入Active依赖
2.建立与activemq的通讯,建议用多线程,在新建的线程中做
public void Receive() { try { //创建连接池 IConnectionFactory factory = new ConnectionFactory("tcp://127.0.0.1:61616/"); IConnection connection = factory.CreateConnection(); connection.ClientId = SystemConstants.LIGHTING_CLIENT_NAME; connection.Start(); ISession session = connection.CreateSession(); //创建Consumer IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("lighting-topic"), "Consumer1", null, false); consumer.Listener += new MessageListener(consumer_Listener); /* connection.Stop(); connection.Close();*/ } catch (System.Exception e) { rtx_result.Text += e.Message; } }
void consumer_Listener(IMessage message)
{
try
{
ITextMessage msg = (ITextMessage)message;
//Invoke(new DelegateRevMessage(RevMessage), msg);
rtx_result.Text = rtx_result.Text + msg.Text+"\n";
}
catch (System.Exception e)
{
Console.WriteLine(e.Message);
}
}
public delegate void DelegateRevMessage(ITextMessage message);
public void RevMessage(ITextMessage message)
{
rtx_result.Text += string.Format(@"接收到:{0}{1}", message.Text, Environment.NewLine);
}