您的位置:首页 > 其它

详解RocketMQ中的Producer

2016-04-30 15:07 316 查看
上一篇博客讲解了如何安装RocketMQ,并且也简单的介绍了一下相关RocketMq的概念,那么这篇博客,来剖析一下MQ中的producer的角色,看看它是来干什么的?
 


 上图就是MQ中Producer的有关结构图,下面来着重分析一下每个类的用途
 1.MQAdmin:作为MQ应用层最底层的类,为我们提供了所有公共的方法,常用的有如下 根据key、主题名和队列来创建Topic
 void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
 查询消息队列中的偏移量 long maxOffset(final MessageQueue mq) throws MQClientException; 根据各种条件来查询Message信息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
                             final long end) throws MQClientException, InterruptedException;

 2.MQProducer:用来发送生产者中的消息,包含了start和shutdown以及各种send方法,其中send方法返回值为sendResult,里面包含着SendStatus也就是发送的状态。send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义。 
  SEND_OK 
消息发送成功 
  FLUSH_DISK_TIMEOUT 
消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失 
  FLUSH_SLAVE_TIMEOUT 
消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消
息才会丢失 
  SLAVE_NOT_AVAILABLE 
消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
 3.ClientConfig:Client端公共的配置信息,例如心跳数、持久化的时间间隔等 4.DefaultMQProducer:基础的MQProducer,有一些基本的默认设置,供我们使用。例如默认的队列数目、默认的超时时间等
 下面通过一个实例来了解一下Producer中常用的操作 [java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**      
 * @FileName: Producer.java    
 * @Package:com.test    
 * @Description: TODO   
 * @author: LUCKY     
 * @date:2015年12月28日 下午2:32:22    
 * @version V1.0      
 */  
package com.test;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
import com.alibaba.rocketmq.client.producer.SendCallback;  
import com.alibaba.rocketmq.client.producer.SendResult;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
  
/** 
 * @ClassName: Producer 
 * @Description: 模拟生产者 
 * @author: LUCKY 
 * @date:2015年12月28日 下午2:32:22 
 */  
public class ProducerTest {  
    public static void main(String[] args) throws Exception {  
  
        DefaultMQProducer producer = new DefaultMQProducer("Producer");  
        // 必须要设置nameserver地址  
        producer.setNamesrvAddr("100.66.154.81:9876");  
        try {  
//          producer.setClientIP("**");  
            //设置实例名称  
            producer.setInstanceName("dd");  
            //设置重试的次数  
            producer.setRetryTimesWhenSendFailed(3);  
            //开启生产者  
            producer.start();  
            //创建一条消息  
            Message msg = new Message("PushTopic", "push", "1",  
                    "内容一".getBytes());  
            //发送消息  
            SendResult result = producer.send(msg);  
            //发送,并触发回调函数  
            producer.send(msg, new SendCallback() {  
                  
                @Override  
                //成功的回调函数  
                public void onSuccess(SendResult sendResult) {  
                    System.out.println(sendResult.getSendStatus());  
                    System.out.println("成功了");  
                }  
                  
                @Override  
                //出现异常的回调函数  
                public void onException(Throwable e) {  
                System.out.println("失败了"+e.getMessage());  
                      
                }  
            });  
          
              
            //获取某个主题的消息队列  
            List<MessageQueue> messageQueues = producer  
                    .fetchPublishMessageQueues("PushTopic");  
            System.out.println(messageQueues.size());  
          
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
              
         producer.shutdown();  
        }  
    }  
  
}  
</span>  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: