详解RocketMQ中的Producer
2016-04-30 15:07
316 查看
上一篇博客讲解了如何安装RocketMQ,并且也简单的介绍了一下相关RocketMq的概念,那么这篇博客,来剖析一下MQ中的producer的角色,看看它是来干什么的?
上图就是MQ中Producer的有关结构图,下面来着重分析一下每个类的用途
1.MQAdmin:作为MQ应用层最底层的类,为我们提供了所有公共的方法,常用的有如下 根据key、主题名和队列来创建Topic
void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
查询消息队列中的偏移量 long maxOffset(final MessageQueue mq) throws MQClientException; 根据各种条件来查询Message信息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
2.MQProducer:用来发送生产者中的消息,包含了start和shutdown以及各种send方法,其中send方法返回值为sendResult,里面包含着SendStatus也就是发送的状态。send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义。
SEND_OK
消息发送成功
FLUSH_DISK_TIMEOUT
消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT
消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消
息才会丢失
SLAVE_NOT_AVAILABLE
消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
3.ClientConfig:Client端公共的配置信息,例如心跳数、持久化的时间间隔等 4.DefaultMQProducer:基础的MQProducer,有一些基本的默认设置,供我们使用。例如默认的队列数目、默认的超时时间等
下面通过一个实例来了解一下Producer中常用的操作 [java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**
* @FileName: Producer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:32:22
* @version V1.0
*/
package com.test;
import java.util.List;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
/**
* @ClassName: Producer
* @Description: 模拟生产者
* @author: LUCKY
* @date:2015年12月28日 下午2:32:22
*/
public class ProducerTest {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
// 必须要设置nameserver地址
producer.setNamesrvAddr("100.66.154.81:9876");
try {
// producer.setClientIP("**");
//设置实例名称
producer.setInstanceName("dd");
//设置重试的次数
producer.setRetryTimesWhenSendFailed(3);
//开启生产者
producer.start();
//创建一条消息
Message msg = new Message("PushTopic", "push", "1",
"内容一".getBytes());
//发送消息
SendResult result = producer.send(msg);
//发送,并触发回调函数
producer.send(msg, new SendCallback() {
@Override
//成功的回调函数
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
System.out.println("成功了");
}
@Override
//出现异常的回调函数
public void onException(Throwable e) {
System.out.println("失败了"+e.getMessage());
}
});
//获取某个主题的消息队列
List<MessageQueue> messageQueues = producer
.fetchPublishMessageQueues("PushTopic");
System.out.println(messageQueues.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
</span>
上图就是MQ中Producer的有关结构图,下面来着重分析一下每个类的用途
1.MQAdmin:作为MQ应用层最底层的类,为我们提供了所有公共的方法,常用的有如下 根据key、主题名和队列来创建Topic
void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
查询消息队列中的偏移量 long maxOffset(final MessageQueue mq) throws MQClientException; 根据各种条件来查询Message信息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
2.MQProducer:用来发送生产者中的消息,包含了start和shutdown以及各种send方法,其中send方法返回值为sendResult,里面包含着SendStatus也就是发送的状态。send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义。
SEND_OK
消息发送成功
FLUSH_DISK_TIMEOUT
消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT
消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消
息才会丢失
SLAVE_NOT_AVAILABLE
消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
3.ClientConfig:Client端公共的配置信息,例如心跳数、持久化的时间间隔等 4.DefaultMQProducer:基础的MQProducer,有一些基本的默认设置,供我们使用。例如默认的队列数目、默认的超时时间等
下面通过一个实例来了解一下Producer中常用的操作 [java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**
* @FileName: Producer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:32:22
* @version V1.0
*/
package com.test;
import java.util.List;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
/**
* @ClassName: Producer
* @Description: 模拟生产者
* @author: LUCKY
* @date:2015年12月28日 下午2:32:22
*/
public class ProducerTest {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
// 必须要设置nameserver地址
producer.setNamesrvAddr("100.66.154.81:9876");
try {
// producer.setClientIP("**");
//设置实例名称
producer.setInstanceName("dd");
//设置重试的次数
producer.setRetryTimesWhenSendFailed(3);
//开启生产者
producer.start();
//创建一条消息
Message msg = new Message("PushTopic", "push", "1",
"内容一".getBytes());
//发送消息
SendResult result = producer.send(msg);
//发送,并触发回调函数
producer.send(msg, new SendCallback() {
@Override
//成功的回调函数
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
System.out.println("成功了");
}
@Override
//出现异常的回调函数
public void onException(Throwable e) {
System.out.println("失败了"+e.getMessage());
}
});
//获取某个主题的消息队列
List<MessageQueue> messageQueues = producer
.fetchPublishMessageQueues("PushTopic");
System.out.println(messageQueues.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
</span>
相关文章推荐
- allegro Disable custom colors is enabled
- 背包系列第六篇----完全背包(求解最大价值的个数)
- RocketMq相关介绍
- ARP协议
- 复制指定目录下的指定文件,并修改后缀名
- Windbg、dump分析类资源链接
- 有一个文本文件中存储了几个名称,写一个程序实现随机获取一个人的名字(抽奖)
- test
- 负载均衡算法
- HNOI2016 大数(number)<莫队>
- opencv的CV_EXPORT
- hrbust/哈理工oj 1877 区间【水题】
- 文科状元转CS
- 数据结构相同情况下数据表之间数据的快速"copy"
- maven学习系列3----仓库
- Java注解(二) 系统注解
- 将文件中字符串赋值到ArrayList中
- 关于Wireshark无法启动 一直在加载 loading configrue 或者 initializing解决
- IClient for js开发之地图的加载
- 满满的干货:推荐提升 Android 性能的建议