1 @SpringBootApplication 2 public class RabbitApplication { 3 4 public static void main(String[] args) { 5 SpringApplication.run(RabbitApplication.class, args); 6 } 7 8 @PostConstruct 9 public void producterTest() throws IOException, TimeoutException { 10 // 创建连接工厂 11 ConnectionFactory factory = new ConnectionFactory(); 12 // 设置参数 13 factory.setHost("localhost"); 14 factory.setPort(5672); 15 factory.setUsername("guest"); 16 factory.setPassword("guest"); 17 // 创建连接 18 Connection connection = factory.newConnection(); 19 // 创建channel 20 Channel channel = connection.createChannel(); 21 // testProducter(channel,connection); 22 testConsumer(channel); 23 } 24 25 public void testProducter( Channel channel,Connection connection)throws IOException, TimeoutException{ 26 /** 27 * queue:队列名称 28 * durable :是否持久化,请mq重启之后,还存在 29 * exclusive 30 * 是否独占,只能有一个消费者监听这队列 31 * 当connection关闭时,是否删除队列 32 * autoDelete :是否自动删除,当没有consumer时,自动删除掉 33 */ 34 // channel.queueDeclare("Hello world queue",true,false,false,null); 35 // channel.queueDeclare("fanout_queue1",true,false,false,null); 36 // channel.queueDeclare("fanout_queue2",true,false,false,null); 37 channel.queueDeclare("topic_queue1",true,false,false,null); 38 channel.queueDeclare("topic_queue2",true,false,false,null); 39 /** 40 * String exchange,交换机名称 41 * BuiltinExchangeType type, 交换机类型 42 * boolean durable, 是否持久化 43 * boolean autoDelete, 是否自动删除 44 * boolean internal, 是否内部使用 45 * Map<String, Object> arguments 46 */ 47 // 路由模式 48 // channel.exchangeDeclare("",BuiltinExchangeType.DIRECT,true,false,false,null); 49 // 广播模式 50 // channel.exchangeDeclare("test_fanout",BuiltinExchangeType.FANOUT,true,false,false,null); 51 // 通配符模式 52 channel.exchangeDeclare("test_topic",BuiltinExchangeType.TOPIC,true,false,false,null); 53 // 绑定队列和交换机 54 /** 55 * String queue 队列名称 56 * String exchange 交换机名称 57 * String routingKey 路由键,绑定规则 58 * 如果交换机的类型为fanout,routingKey设置为"" 59 */ 60 // channel.queueBind("fanout_queue1","test_fanout",""); 61 // channel.queueBind("fanout_queue2","test_fanout",""); 62 // channel.queueBind("fanout_queue1","test_fanout",""); 63 // channel.queueBind("fanout_queue2","test_fanout",""); 64 65 channel.queueBind("topic_queue1","test_topic","*.do"); 66 channel.queueBind("topic_queue2","test_topic","info"); 67 68 /** 69 * String exchange:交换机名称,简单模式下的交换机默认是"" 70 * String routingKey 路由名称 71 * BasicProperties props 配置信息 72 * byte[] body 发送的信息数据 73 */ 74 // 发送消息 75 // 简单模式:如果使用默认的交换机,路由名称需要与队列的名称保持一致 76 String msg="Hello world test topic model *.do"; 77 // channel.basicPublish("","Hello world queue",null,msg.getBytes()); 78 // channel.basicPublish("test_fanout","",null,msg.getBytes()); 79 channel.basicPublish("test_topic","*.do",null,msg.getBytes()); 80 channel.close(); 81 connection.close(); 82 } 83 84 public void testConsumer( Channel channel)throws IOException{ 85 86 Consumer consumer=new DefaultConsumer(channel){ 87 /** 88 * 89 * @param consumerTag 标识 90 * @param envelope 获取一些信息 交换机,路由key 91 * @param properties 配置信息 92 * @param body 接收的数据 93 * @throws IOException 94 */ 95 @Override 96 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 97 super.handleDelivery(consumerTag, envelope, properties, body); 98 99 System.out.println(new String(body)); 100 } 101 }; 102 /** 103 * queue 队列名称 104 * autoAck, 是否自动确认 105 * Consumer 回调对象 106 */ 107 channel.basicConsume("topic_queue1",true,consumer); 108 } 109 110 }
rabbitmq工作模式:https://blog.csdn.net/qq_42006120/article/details/100772847