您的位置:首页 > 编程语言 > Java开发

rocketmq入门学习笔记

2017-10-15 18:27 309 查看
特性:

1)支持分布式

2)可以保证严格的消息顺序

3)支持高并发

4)可以通过生产者push或者是消费者pull两种模式来消费消息

5)通过nameServer来协调分布式。

6)支持事务控制,支持失败重试。

集群模式:单主、多主、多主多备(同步/异步)

同时创建多个目录:

mkdir -p home/rocketmq/store/{commitlog,consumequeue,index}

#配置文件解析

#rocketmq集群名字配置

brokerClusterName=rocketmq-cluster

#rocketmq的broker名字配置

brokerName=broker-1

#rocketmq集群主备配置:

brokerId=0 //0表示是主,而大于0则是备,备机只能读不能写

#rocketmq集群nameServer地址的配置

namesrvAddr=ip1:port;ip2:port  //默认的port为9876

#rocketmq集群发送消息,默认重建队列数配置

defaultTopicQueueNums=4

#rocketmq集群中是否允许自动创建不存在的topic的配置

autoCreateTopicEnable=false

#rocketmq集群中是否允许自动创建订阅组的配置

autoCreateSubscriptionGroup=false

#rocketmq集群中当前broker对外服务监听端口的配置

listenPort=10911

#rocketmq集群中删除文件时间点的配置

deleteWhen=04

#rocketmq集群中文件保留时间配置

fileReservedTime=120 //单位为小时

#rocketmq集群中存储日志文件的大小配置

mapedFileSizeCommitLog=1073741824

#rocketmq集群中ConsumeQueue每个文件中存储的条数配置

mapedFileSizeConsumeQueue=300000

#rocketmq集群中监测物理文件磁盘使用情况报警配置

diskMaxUsedSpaceRatio=88

#rocketmq集群文件存储路径配置

storePathRootDir=/home/rocketmq/store

storePathCommitLog=/home/rocketmq/store/commitlog

storePathConsumeQueue=/home/rocketmq/store/consumequeue

storePathIndex=/home/rocketmq/store/index

storeCheckpoint=/home/rocketmq/store/checkpoint

abortFile=/home/rocketmq/store/abort

#集群配置文件结束

1)启动NameServer

nohup sh mqnamesrv &

2) 启动broker,-n指定了ip地址和端口号,-c指定了启动broker所要的配置文件

nohup sh mqbroker -n 127.0.0.1:9876 -c /rocketmq/conf/2m-2s-sync/broker-a.properties &

如果在实战中使用rocketmq来完成消息的存储,转发和接收需要引入rocketmq的依赖

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>current-version</version>

</dependency>

#生产者开发关键步骤:

1)生成唯一的mqProducer用于发送消息

DefaultMQProducer producer = new DefaultMQProducer("Uniqueness-ProducerGroup");

2)给生成的mqProducer配置nameSrv的地址和端口号

producer.setNamesrvAddr("ip1:port;ip2:port;...");

3)设置失败重试次数为N次

producer.setRetryTimesWhenSendFailed(N);

4)启动producer

producer.start();

5)准备要发送的消息

Message msg = new Message("TopicName","TagName",messageContentBody.getBytes("utf-8"));

6)发送消息并接收发送结果

SendResult sendResult = producer.send(msg,validTime); //validTime范围内发送失败则重试

#消费者开发关键步骤:以Push模式为例来说明

1)生成唯一的mqConsumer用于消费消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Uniqueness-ConsumerGroup");

2)给生成的mqConsumer配置nameSrv的地址和端口号

consumer.setNamesrvAddr("ip1:port;ip2:port;...");

3)设置消费订阅的TopicName和TagName

consumer.subscribe("TopicName","TagName/*");//*表示全部

4)设置消费消息的起始位置

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

5)设置失败重试次数为N次

producer.setRetryTimesWhenSendFailed(N);

6)注册消息监听器到consumer来监听新消息的到来并做出对应处理

consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
for(MessageExt me:msgs){
//对接受到的消息处理
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

})

6)启动consumer

consumer.start();

#Rocketmq集群配置注意事项:

1)brokerClusterName必需是唯一且同一个;

2)同一个borker的不同机器brokerName相同,brokerID不同,主为0,从为大于0

3)集群中的namesrvAddr的配置为ip1:port;....ip4:port;

4) 配置正确的同步方式:ASYNC_MASTER,SYNC_MASTER,...

#Rocketmq中的常用模块功能介绍

namesrv:用于保存消息的元消息,包括消息队列和主题名字等信息。

broker:生产者和消费者沟通的枢纽,生产者放消息,消费者取消息

client:给使用者提供接受和发送消息的API

store:消息,索引等的存储

remoting:实现远程通信的机制,基于Netty4 + fastjson序列化 + 自定义二进制协议来完成

#rocketmq提供的三种模式的producer

TransactionProducer:发送的消息支持事务

OrderProducer:发送的消息保证有序,生产者顺序存放有序消息到同一broker并发送,消费端顺序消费即可(MessageListenerOrderly)

NormalProducer:发送的消息不保证有序,可以并发消费(MessageListenerConcurrently),效率最高。

#事务消息发送关键步奏

1)TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

2) TransactionMQProducer producer = new TransactionMQProducer("unique-TransactionProduceName");

3) producer.setTransactionCheckListener(transactionCheckListener); //服务回查监听配置

如果生产者一直没有返回失败或者成功消息,则服务器可以主动回查客户端来询问是否执行成功事务消息

public class TransactionCheckListenerImpl implements TransactionCheckListener{
@Override
public LocalTransactionState checkLocalTransactionState(messageExt msg){
//查看消息处理情况成功与否,并返回状态
return LocalTransactionState.COMMIT_MESSAGE;
}

}

4) TransactionExecuterImpl transactionExe = new TransactionExecuterImpl();//本地事务执行器

public class TransactionExecuterImpl implements LocalTransactionExecuter{
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg,final Object arg){
try{
//执行本地事务内容
}catch(Exception e){

      //本地事务执行失败

      return LocalTransactionState.ROLLBACK_MESSAGE;//发送失败消息,本次消息对消费者不可见,消费者不处理
}
return LocalTransactionState.COMMIT_MESSAGE;//发送成功消息,本次消息对消费者可见,消费者继续处理
}

}

5) Message msg = new Message("TopicName","TagName","KeyName",messageBody.getBytes());

6) producer.sendMessageInTransaction(msg,transactionExe,null);

#事务消息不支持回查机制的处理方案,主动询问处理结果,从而来判断是否成功执行消息。所以需要对消息的处理状态单独记录

#pull消费模式的关键步奏

1)MQPullConsumerScheduleService mqPullConsumerScheduleService = new MQPullConsumerScheduleService("unique-PullConsumerName");

2)获取Consumer: mqPullConsumerScheduleService.getDefaultMQPullConsumer().setNamesrvAddr("ip:port");

mqPullConsumerScheduleService.setMessageModel(MessageModel.CLUSTERING);//设置消息模式为集群模式

3) 注册回调处理:mqPullConsumerScheduleService.registerPullTaskCallback("TopicName",new PullTaskCallback(){
@Override
public void doPullTask(MessageQueue mq,PullTaskContext ptc){
//如果有消息需要处理,处理消息
MQPullConsumer mpc = ptc.getPullConsumer();
try{
Long offset=mpc.fetchConsumeOffset(mq,false);
if(offset<0){
offset=0;
}
PullResult pr = mpc.pull(mq,"*",offset,32);
switch(pr.getPullStatus()){
case FOUND:
   //有消息需要处理
   List<MessageExt> msgList = pr.getMsgFoundList();
   for(MessageExt me:msgList){
   //消费消息以及异常情况处理
   }
   break;
case NO_MATCHED_MSG:
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
mpc.updateConsumeOffset(mq,pr.getNextBeginOffset());
ptc.setPullNextDelayTimeMillis(10*10000);
}catch(Exection e){

}
}

});

4)开始消费消息:mqPullConsumerScheduleService.start();
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息