<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency>
// 1.连接RamabbitMQ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.189.137"); // 连接地址 factory.setPort(5672); // 设置端口号 factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列) Connection connection = factory.newConnection();// 创建连接 // 2.通过连接对象获取channel对象(队列,绑定,消息都是通过这个对象操作) Channel channel = connection.createChannel(); // 3.通过channel创建一个队列(1:队列名称[如果队列不存在就创建]) channel.queueDeclare("myqueue",false,false,false,null); // 4.发送消息到指定的队列(1:交换机,2:队列的名称,3:其他属性,4:消息体) String msg = "hello 中文 2"; channel.basicPublish("","myqueue",null,msg.getBytes("utf-8")); // 5.断开连接 connection.close();
public class ConnectionUtils { private static ConnectionFactory factory =null; static { // 1.连接RamabbitMQ factory = new ConnectionFactory(); factory.setHost("192.168.189.137"); // 连接地址 factory.setPort(5672); // 设置端口号 factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列) } public static Connection getConnection(){ Connection connection = null;// 创建连接 try { connection = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return connection; } }
消费者
// 1.获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2.获取Channel Channel channel = connection.createChannel(); // 3.通过channel创建一个队列(1:队列名称) channel.queueDeclare("myqueue",false,false,false,null); // 3.设置监听的队列(1:设置监听的队列,2:,3:消费者的回调方法 channel.basicConsume("myqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者:"+new String(body,"utf-8")); } }); // 消费者不能关闭连接
// 创建一个线程池 private static ExecutorService executorService = Executors.newFixedThreadPool(10); public static void main(String[] args) throws IOException { // 1.获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2.获取Channel Channel channel = connection.createChannel(); // 3.通过channel创建一个队列(1:队列名称) channel.queueDeclare("myqueue",false,false,false,null); // 3.设置监听的队列(1:设置监听的队列,2:true为自动回复模式,3:消费者的回调方法) channel.basicConsume("myqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { executorService.submit(new Runnable() { @Override public void run() { try { System.out.println("消费者:"+new String(body,"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }); }
work模式和简单模式类似,区别在于它有多个消费者交替获取队列中的内容。实现起来也比简单,以上面的代码为例,重新拷贝一个消费者变成两个消费者即可。
// 1.得到连接对象 Connection connection = ConnectionUtils.getConnection(); // 2.获取channel对象 Channel channel = connection.createChannel(); // 3.创建交换机(fanout:广播类型) channel.exchangeDeclare("myexchange1","fanout"); // 4.发送消息给交换机(1:交换机名称,2:队列名称,3:其他属性,4:消息内容) String msg ="hello RebbitMQ"; channel.basicPublish("myexchange","",null,msg.getBytes("utf-8")); connection.close();
Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 创建一个队列 channel.queueDeclare("queue1",false,false,false,null); // 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键) channel.queueBind("queue1","myexchange1",""); channel.basicConsume("queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumer1:"+new String(body,"utf-8")); } });
Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 创建一个队列 channel.queueDeclare("queue2",false,false,false,null); // 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键) channel.queueBind("queue2","myexchange1",""); // 设置监听的队列 channel.basicConsume("queue2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumer2:"+new String(body,"utf-8")); } });
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.rabbitmq.host=192.168.189.137 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
import org.springframework.amqp.core.*; @Configuration public class RabbitMQConfig { // 创建一个搜索的队列 @Bean public Queue getSeachQueue() { return new Queue("search_queue"); } // 创建一个静态页面的广播 @Bean public Queue getItemQueue(){ return new Queue("item_queue"); } // 创建一个交换机 @Bean public FanoutExchange getFanoutExchange(){ return new FanoutExchange("goods_exchange"); } // 把搜索队列绑定到交换机 @Bean public Binding getBinDing1(){ return BindingBuilder.bind(getSeachQueue()).to(getFanoutExchange()); } // 把详情页面绑定到交换机 @Bean public Binding getBinDing2(){ return BindingBuilder.bind(getItemQueue()).to(getFanoutExchange()); } }
@Autowired private RabbitTemplate rabbitTemplate; @Override public int add(Goods goods) { int i = goodsMapper.insertSelective(goods); System.out.println("GoodsServiceImpl.add:"+goods.getId()); // 2.同步到索引库 // searchService.addGoods(goods); // 3.创建静态模板 // HttpUtils.sendRequset("http://localhost:8083/createHtml?gID="+goods.getId()); // 发送消息给交换机(1:交换机的名称,2:路由键,3:发送的对象[实现序列化接口]) rabbitTemplate.convertAndSend("goods_exchange","",goods); return i; }
@Component public class MyRabbitMQListener { @Autowired private ISearchService searchService; @RabbitListener(queues = "search_queue") public void goodsSysncToSolr(Goods goods){ searchService.addGoods(goods); } }
@Component public class MyRabbitListener { @Reference private IGoodsService goodsService; @Autowired private Configuration configuration; @RabbitListener(queues = "item_queue") public void createItemPage(Goods goodsParam) throws Exception { //生成静态页面的代码 } catch (Exception e) { e.printStackTrace(); } } }