rocketmq入门学习笔记
2017-10-15 18:27
309 查看
特性:
1)支持分布式
2)可以保证严格的消息顺序
3)支持高并发
4)可以通过生产者push或者是消费者pull两种模式来消费消息
5)通过nameServer来协调分布式。
6)支持事务控制,支持失败重试。
集群模式:单主、多主、多主多备(同步/异步)
同时创建多个目录:
mkdir -p home/rocketmq/store/{commitlog,consumequeue,index}
#配置文件解析
#rocketmq集群名字配置
brokerClusterName=rocketmq-cluster
#rocketmq的broker名字配置
brokerName=broker-1
#rocketmq集群主备配置:
brokerId=0 //0表示是主,而大于0则是备,备机只能读不能写
#rocketmq集群nameServer地址的配置
namesrvAddr=ip1:port;ip2:port //默认的port为9876
#rocketmq集群发送消息,默认重建队列数配置
defaultTopicQueueNums=4
#rocketmq集群中是否允许自动创建不存在的topic的配置
autoCreateTopicEnable=false
#rocketmq集群中是否允许自动创建订阅组的配置
autoCreateSubscriptionGroup=false
#rocketmq集群中当前broker对外服务监听端口的配置
listenPort=10911
#rocketmq集群中删除文件时间点的配置
deleteWhen=04
#rocketmq集群中文件保留时间配置
fileReservedTime=120 //单位为小时
#rocketmq集群中存储日志文件的大小配置
mapedFileSizeCommitLog=1073741824
#rocketmq集群中ConsumeQueue每个文件中存储的条数配置
mapedFileSizeConsumeQueue=300000
#rocketmq集群中监测物理文件磁盘使用情况报警配置
diskMaxUsedSpaceRatio=88
#rocketmq集群文件存储路径配置
storePathRootDir=/home/rocketmq/store
storePathCommitLog=/home/rocketmq/store/commitlog
storePathConsumeQueue=/home/rocketmq/store/consumequeue
storePathIndex=/home/rocketmq/store/index
storeCheckpoint=/home/rocketmq/store/checkpoint
abortFile=/home/rocketmq/store/abort
#集群配置文件结束
1)启动NameServer
nohup sh mqnamesrv &
2) 启动broker,-n指定了ip地址和端口号,-c指定了启动broker所要的配置文件
nohup sh mqbroker -n 127.0.0.1:9876 -c /rocketmq/conf/2m-2s-sync/broker-a.properties &
如果在实战中使用rocketmq来完成消息的存储,转发和接收需要引入rocketmq的依赖
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>current-version</version>
</dependency>
#生产者开发关键步骤:
1)生成唯一的mqProducer用于发送消息
DefaultMQProducer producer = new DefaultMQProducer("Uniqueness-ProducerGroup");
2)给生成的mqProducer配置nameSrv的地址和端口号
producer.setNamesrvAddr("ip1:port;ip2:port;...");
3)设置失败重试次数为N次
producer.setRetryTimesWhenSendFailed(N);
4)启动producer
producer.start();
5)准备要发送的消息
Message msg = new Message("TopicName","TagName",messageContentBody.getBytes("utf-8"));
6)发送消息并接收发送结果
SendResult sendResult = producer.send(msg,validTime); //validTime范围内发送失败则重试
#消费者开发关键步骤:以Push模式为例来说明
1)生成唯一的mqConsumer用于消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Uniqueness-ConsumerGroup");
2)给生成的mqConsumer配置nameSrv的地址和端口号
consumer.setNamesrvAddr("ip1:port;ip2:port;...");
3)设置消费订阅的TopicName和TagName
consumer.subscribe("TopicName","TagName/*");//*表示全部
4)设置消费消息的起始位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
5)设置失败重试次数为N次
producer.setRetryTimesWhenSendFailed(N);
6)注册消息监听器到consumer来监听新消息的到来并做出对应处理
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
for(MessageExt me:msgs){
//对接受到的消息处理
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
})
6)启动consumer
consumer.start();
#Rocketmq集群配置注意事项:
1)brokerClusterName必需是唯一且同一个;
2)同一个borker的不同机器brokerName相同,brokerID不同,主为0,从为大于0
3)集群中的namesrvAddr的配置为ip1:port;....ip4:port;
4) 配置正确的同步方式:ASYNC_MASTER,SYNC_MASTER,...
#Rocketmq中的常用模块功能介绍
namesrv:用于保存消息的元消息,包括消息队列和主题名字等信息。
broker:生产者和消费者沟通的枢纽,生产者放消息,消费者取消息
client:给使用者提供接受和发送消息的API
store:消息,索引等的存储
remoting:实现远程通信的机制,基于Netty4 + fastjson序列化 + 自定义二进制协议来完成
#rocketmq提供的三种模式的producer
TransactionProducer:发送的消息支持事务
OrderProducer:发送的消息保证有序,生产者顺序存放有序消息到同一broker并发送,消费端顺序消费即可(MessageListenerOrderly)
NormalProducer:发送的消息不保证有序,可以并发消费(MessageListenerConcurrently),效率最高。
#事务消息发送关键步奏
1)TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
2) TransactionMQProducer producer = new TransactionMQProducer("unique-TransactionProduceName");
3) producer.setTransactionCheckListener(transactionCheckListener); //服务回查监听配置
如果生产者一直没有返回失败或者成功消息,则服务器可以主动回查客户端来询问是否执行成功事务消息
public class TransactionCheckListenerImpl implements TransactionCheckListener{
@Override
public LocalTransactionState checkLocalTransactionState(messageExt msg){
//查看消息处理情况成功与否,并返回状态
return LocalTransactionState.COMMIT_MESSAGE;
}
}
4) TransactionExecuterImpl transactionExe = new TransactionExecuterImpl();//本地事务执行器
public class TransactionExecuterImpl implements LocalTransactionExecuter{
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg,final Object arg){
try{
//执行本地事务内容
}catch(Exception e){
//本地事务执行失败
return LocalTransactionState.ROLLBACK_MESSAGE;//发送失败消息,本次消息对消费者不可见,消费者不处理
}
return LocalTransactionState.COMMIT_MESSAGE;//发送成功消息,本次消息对消费者可见,消费者继续处理
}
}
5) Message msg = new Message("TopicName","TagName","KeyName",messageBody.getBytes());
6) producer.sendMessageInTransaction(msg,transactionExe,null);
#事务消息不支持回查机制的处理方案,主动询问处理结果,从而来判断是否成功执行消息。所以需要对消息的处理状态单独记录
#pull消费模式的关键步奏
1)MQPullConsumerScheduleService mqPullConsumerScheduleService = new MQPullConsumerScheduleService("unique-PullConsumerName");
2)获取Consumer: mqPullConsumerScheduleService.getDefaultMQPullConsumer().setNamesrvAddr("ip:port");
mqPullConsumerScheduleService.setMessageModel(MessageModel.CLUSTERING);//设置消息模式为集群模式
3) 注册回调处理:mqPullConsumerScheduleService.registerPullTaskCallback("TopicName",new PullTaskCallback(){
@Override
public void doPullTask(MessageQueue mq,PullTaskContext ptc){
//如果有消息需要处理,处理消息
MQPullConsumer mpc = ptc.getPullConsumer();
try{
Long offset=mpc.fetchConsumeOffset(mq,false);
if(offset<0){
offset=0;
}
PullResult pr = mpc.pull(mq,"*",offset,32);
switch(pr.getPullStatus()){
case FOUND:
//有消息需要处理
List<MessageExt> msgList = pr.getMsgFoundList();
for(MessageExt me:msgList){
//消费消息以及异常情况处理
}
break;
case NO_MATCHED_MSG:
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
mpc.updateConsumeOffset(mq,pr.getNextBeginOffset());
ptc.setPullNextDelayTimeMillis(10*10000);
}catch(Exection e){
}
}
});
4)开始消费消息:mqPullConsumerScheduleService.start();
1)支持分布式
2)可以保证严格的消息顺序
3)支持高并发
4)可以通过生产者push或者是消费者pull两种模式来消费消息
5)通过nameServer来协调分布式。
6)支持事务控制,支持失败重试。
集群模式:单主、多主、多主多备(同步/异步)
同时创建多个目录:
mkdir -p home/rocketmq/store/{commitlog,consumequeue,index}
#配置文件解析
#rocketmq集群名字配置
brokerClusterName=rocketmq-cluster
#rocketmq的broker名字配置
brokerName=broker-1
#rocketmq集群主备配置:
brokerId=0 //0表示是主,而大于0则是备,备机只能读不能写
#rocketmq集群nameServer地址的配置
namesrvAddr=ip1:port;ip2:port //默认的port为9876
#rocketmq集群发送消息,默认重建队列数配置
defaultTopicQueueNums=4
#rocketmq集群中是否允许自动创建不存在的topic的配置
autoCreateTopicEnable=false
#rocketmq集群中是否允许自动创建订阅组的配置
autoCreateSubscriptionGroup=false
#rocketmq集群中当前broker对外服务监听端口的配置
listenPort=10911
#rocketmq集群中删除文件时间点的配置
deleteWhen=04
#rocketmq集群中文件保留时间配置
fileReservedTime=120 //单位为小时
#rocketmq集群中存储日志文件的大小配置
mapedFileSizeCommitLog=1073741824
#rocketmq集群中ConsumeQueue每个文件中存储的条数配置
mapedFileSizeConsumeQueue=300000
#rocketmq集群中监测物理文件磁盘使用情况报警配置
diskMaxUsedSpaceRatio=88
#rocketmq集群文件存储路径配置
storePathRootDir=/home/rocketmq/store
storePathCommitLog=/home/rocketmq/store/commitlog
storePathConsumeQueue=/home/rocketmq/store/consumequeue
storePathIndex=/home/rocketmq/store/index
storeCheckpoint=/home/rocketmq/store/checkpoint
abortFile=/home/rocketmq/store/abort
#集群配置文件结束
1)启动NameServer
nohup sh mqnamesrv &
2) 启动broker,-n指定了ip地址和端口号,-c指定了启动broker所要的配置文件
nohup sh mqbroker -n 127.0.0.1:9876 -c /rocketmq/conf/2m-2s-sync/broker-a.properties &
如果在实战中使用rocketmq来完成消息的存储,转发和接收需要引入rocketmq的依赖
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>current-version</version>
</dependency>
#生产者开发关键步骤:
1)生成唯一的mqProducer用于发送消息
DefaultMQProducer producer = new DefaultMQProducer("Uniqueness-ProducerGroup");
2)给生成的mqProducer配置nameSrv的地址和端口号
producer.setNamesrvAddr("ip1:port;ip2:port;...");
3)设置失败重试次数为N次
producer.setRetryTimesWhenSendFailed(N);
4)启动producer
producer.start();
5)准备要发送的消息
Message msg = new Message("TopicName","TagName",messageContentBody.getBytes("utf-8"));
6)发送消息并接收发送结果
SendResult sendResult = producer.send(msg,validTime); //validTime范围内发送失败则重试
#消费者开发关键步骤:以Push模式为例来说明
1)生成唯一的mqConsumer用于消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Uniqueness-ConsumerGroup");
2)给生成的mqConsumer配置nameSrv的地址和端口号
consumer.setNamesrvAddr("ip1:port;ip2:port;...");
3)设置消费订阅的TopicName和TagName
consumer.subscribe("TopicName","TagName/*");//*表示全部
4)设置消费消息的起始位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
5)设置失败重试次数为N次
producer.setRetryTimesWhenSendFailed(N);
6)注册消息监听器到consumer来监听新消息的到来并做出对应处理
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
for(MessageExt me:msgs){
//对接受到的消息处理
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
})
6)启动consumer
consumer.start();
#Rocketmq集群配置注意事项:
1)brokerClusterName必需是唯一且同一个;
2)同一个borker的不同机器brokerName相同,brokerID不同,主为0,从为大于0
3)集群中的namesrvAddr的配置为ip1:port;....ip4:port;
4) 配置正确的同步方式:ASYNC_MASTER,SYNC_MASTER,...
#Rocketmq中的常用模块功能介绍
namesrv:用于保存消息的元消息,包括消息队列和主题名字等信息。
broker:生产者和消费者沟通的枢纽,生产者放消息,消费者取消息
client:给使用者提供接受和发送消息的API
store:消息,索引等的存储
remoting:实现远程通信的机制,基于Netty4 + fastjson序列化 + 自定义二进制协议来完成
#rocketmq提供的三种模式的producer
TransactionProducer:发送的消息支持事务
OrderProducer:发送的消息保证有序,生产者顺序存放有序消息到同一broker并发送,消费端顺序消费即可(MessageListenerOrderly)
NormalProducer:发送的消息不保证有序,可以并发消费(MessageListenerConcurrently),效率最高。
#事务消息发送关键步奏
1)TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
2) TransactionMQProducer producer = new TransactionMQProducer("unique-TransactionProduceName");
3) producer.setTransactionCheckListener(transactionCheckListener); //服务回查监听配置
如果生产者一直没有返回失败或者成功消息,则服务器可以主动回查客户端来询问是否执行成功事务消息
public class TransactionCheckListenerImpl implements TransactionCheckListener{
@Override
public LocalTransactionState checkLocalTransactionState(messageExt msg){
//查看消息处理情况成功与否,并返回状态
return LocalTransactionState.COMMIT_MESSAGE;
}
}
4) TransactionExecuterImpl transactionExe = new TransactionExecuterImpl();//本地事务执行器
public class TransactionExecuterImpl implements LocalTransactionExecuter{
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg,final Object arg){
try{
//执行本地事务内容
}catch(Exception e){
//本地事务执行失败
return LocalTransactionState.ROLLBACK_MESSAGE;//发送失败消息,本次消息对消费者不可见,消费者不处理
}
return LocalTransactionState.COMMIT_MESSAGE;//发送成功消息,本次消息对消费者可见,消费者继续处理
}
}
5) Message msg = new Message("TopicName","TagName","KeyName",messageBody.getBytes());
6) producer.sendMessageInTransaction(msg,transactionExe,null);
#事务消息不支持回查机制的处理方案,主动询问处理结果,从而来判断是否成功执行消息。所以需要对消息的处理状态单独记录
#pull消费模式的关键步奏
1)MQPullConsumerScheduleService mqPullConsumerScheduleService = new MQPullConsumerScheduleService("unique-PullConsumerName");
2)获取Consumer: mqPullConsumerScheduleService.getDefaultMQPullConsumer().setNamesrvAddr("ip:port");
mqPullConsumerScheduleService.setMessageModel(MessageModel.CLUSTERING);//设置消息模式为集群模式
3) 注册回调处理:mqPullConsumerScheduleService.registerPullTaskCallback("TopicName",new PullTaskCallback(){
@Override
public void doPullTask(MessageQueue mq,PullTaskContext ptc){
//如果有消息需要处理,处理消息
MQPullConsumer mpc = ptc.getPullConsumer();
try{
Long offset=mpc.fetchConsumeOffset(mq,false);
if(offset<0){
offset=0;
}
PullResult pr = mpc.pull(mq,"*",offset,32);
switch(pr.getPullStatus()){
case FOUND:
//有消息需要处理
List<MessageExt> msgList = pr.getMsgFoundList();
for(MessageExt me:msgList){
//消费消息以及异常情况处理
}
break;
case NO_MATCHED_MSG:
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
mpc.updateConsumeOffset(mq,pr.getNextBeginOffset());
ptc.setPullNextDelayTimeMillis(10*10000);
}catch(Exection e){
}
}
});
4)开始消费消息:mqPullConsumerScheduleService.start();
相关文章推荐
- Lua编程入门-学习笔记2
- UnityShader入门精要学习笔记(十五):渲染纹理
- Spring入门学习笔记(1)
- shell学习笔记之一——入门
- Spring MVC学习笔记 1:入门篇
- Python学习笔记 01 快速入门
- 前端入门DAY4-学习笔记
- 单片机入门学习笔记----第一课:简单的C语言语法+流水灯实验
- Struts2 入门学习笔记(五)——上传下载
- Android开发入门之学习笔记(一)
- scrapy入门学习笔记之爬取豆瓣9分榜单
- Lua入门系列----pil学习笔记之Type and Values (2)
- Visual C++ 学习笔记( 一 ) C++ 入门(转rockybug)
- (2)Linux入门学习笔记
- java学习笔记(从搭建环境到入门java)
- Python学习入门笔记(一):Python文件类型
- JavaScript 简单入门学习笔记(四)
- 慕课网 Java 入门 第二季 学习笔记
- React学习笔记_Babel 入门