常见部署模式: 1.单点模式 2.主从模式 3.双主模式 4.双主双从模式、多主多从模式
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /* * Instantiate with a producer group name. */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ /* * Launch the instance. */ producer.start(); for (int i = 0; i < 1000; i++) { try { /* * Create a message instance, specifying topic, tag and message body. */ Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); /* * Call send message to deliver message to one of brokers. */ SendResult sendResult = producer.send(msg); /* * There are different ways to send message, if you don't care about the send result,you can use this way * {@code * producer.sendOneway(msg); * } */ /* * if you want to get the send result in a synchronize way, you can use this send method * {@code * SendResult sendResult = producer.send(msg); * System.out.printf("%s%n", sendResult); * } */ /* * if you want to get the send result in a asynchronize way, you can use this send method * {@code * * producer.send(msg, new SendCallback() { * @Override * public void onSuccess(SendResult sendResult) { * // do something * } * * @Override * public void onException(Throwable e) { * // do something * } *}); * *} */ System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /* * Shut down once the producer instance is not longer in use. */ producer.shutdown(); } }
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { /* * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ /* * Specify where to start in case the specific consumer group is a brand-new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more topic to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start(); System.out.printf("Consumer Started.%n"); } }