和起航瞎几把学之RocketMQ学习总结(3):RocketMQ消息模式总结(自用)
2018-12-25 16:29
288 查看
1:同步发送消息
简单来说就是你发消息后接收方立马收到消息
public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}
2:异步发送消息
可以再setRetryTimesWhenSendAsyncFailed()方法设置时间;主要应用响应时间敏感的场景
public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}
3:单向模式发送消息
单向传输用于需要中等可靠性的情况,例如日志收集。
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Shut down once the producer instance is not longer in use. producer.shutdown(); }}
4:订单消息
实现订单顺序排序
public class OrderedProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. MQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } //server shutdown producer.shutdown(); }}
5:广播
public class BroadcastProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 100; i++){ Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); }}
6:预定信息
预定消息与普通消息的不同之处在于,它们将在稍后提供的时间之前发送
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown(); } }
相关文章推荐
- rocketmq学习笔记 二 官方实例<消息过滤>
- RocketMQ源码学习--消息存储篇
- rocketmq学习笔记 六 流程之取消息
- (转)RocketMQ源码学习--消息存储篇
- RocketMq学习一:RocketMq的安装
- 和起航瞎几把学之Dubbo学习总结(1):发展介绍
- RocketMq的学习二:广播模式
- 和起航瞎几把学之Dubbo学习总结(2):zookeeper的下载安装
- 和起航瞎几把学之Dubbo学习总结(3):监控中心下载安装
- RocketMQ——Consumer篇:PUSH模式下的消息拉取(DefaultMQPushConsumer)
- 和起航瞎几把学之Dubbo学习总结(4):简单的demo测试
- rocketmq学习笔记 五 源码之rocketmq-tools
- RocketMQ——Consumer篇:PULL模式下的消息消费(DefaultMQPullConsumer)
- rocketmq学习笔记 五 源码之rocketmq-broker
- rocketmq学习笔记 五 源码之rocketmq-filtersrv
- rocketmq学习笔记 六 流程之拉消息
- WCF学习总结——WCF消息交换模式
- 小熊STM32学习总结:STM32的定时器--输入捕捉模式
- 设计模式学习笔记七:常用设计模式原则总结
- Halcon PDF文档(hdevelop_users_guide)学习总结之四——关于3D显示模式