分布式消息-RocketMQ
2018-03-18 15:46
281 查看
一、中间件介绍:
项目地址:http://rocketmq.apache.org/项目部署(windows环境):
> git clone -b develop https://github.com/apache/incubator-rocketmq.git > mvn clean install -Prelease-all -DskipTests -U > cd distribution/target/apache-rocketmq
修改/bin/runserver.sh以及/bin/runbroker.sh 启动参数
#=========================================================================================== # JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
在 git bash 执行命令
> nohup sh bin/mqnamesrv & [1] 8596 nohup: ignoring input and appending output to 'nohup.out' > tail nohup.out The Name Server boot success. serializeType=JSON > nohup sh bin/mqbroker -n localhost:9876 & [2] 10420 nohup: ignoring input and appending output to 'nohup.out' > export NAMESRV_ADDR=localhost:9876
二、整体架构图
如上图所示, RocketMQ的部署结构有以下特点:
* Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
* Broker部署相对复杂,建议采用多Master多Slave 模式,异步复制;每个 Master 配置一个 Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级。优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多Master 模式几乎一样。缺点:Master 宕机,磁盘损坏情况,会丢失少量消息
* Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳(实际就是producer集群名称)。Producer完全无状态,可集群部署。
* Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
三、RMQ特性
1、高效吞吐性
独特的数据结构,Rocketmq的消息的存储是由consume queue和 commitLog 配合完成的consume queue放在内存中,相当于消息的目录,commitLog为消息实际存放的物理位置,消息持久化,可以是文件系统、kv型数据库、关系性数据库,由系统异步线程写
commitLogOffset是指这条消息在commitLog文件实际偏移量,size指消息的大小,tag是消息的hash值
2、顺序问题
RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)很多业务有顺序消息的需求,RocketMQ支持全局和局部的顺序,一般推荐使用局部顺序,将具有顺序要求的一类消息hash到同一个队列中便可保持有序,如下图所示。
缺点:
消费顺序消息的并行度依赖于队列数量
队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
遇到消息失败的消息,无法跳过,当前队列消费暂停
//producer.java SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
//consumer.java consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } });
3、消息重复性问题
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。* 消费端处理消息的业务逻辑保持幂等性
* 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
4、消息过滤机制
在发布/订阅消息模式中,在Broker端进行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是Hashcode。
过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤
5、定时消息
应用场景,系统自动确认订单、关闭订单等等定时任务6、事务消息
发送方向 MQ 服务端发送消息;MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。
四、消息发送实现细节
五、业界主流MQ对比
六、应用以及开发注意事项
常用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景1.不同应用之间服务调用,方式一般有中间件hsf、dubbo等服务、二方包依赖、http方式以及ws服务方式,内部应用之间涉及到的调用频繁的,比如业务配置参数,通过消息
2.订单系统、库存系统
3.秒杀服务
4.日志采集
开发注意事项:
一个应用尽可能用一个Topic,Topic通过console控制台申请;消息子类型用tags标识,可以自用定义,只有定义了tags,consumer端订阅消息时,才能根据tags在broker端对消息过滤
每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。
消息发送失败,在应用层面通过线程定时重发来处理
consumer端处理重复消息,应用层面要做到幂等处理,建议使用消息的业务层面的标识码keys字段去重
相关文章推荐
- RocketMQ原理解析-producer 4.发送分布式事物消息
- RocketMQ原理解析-producer 4.发送分布式事物消息
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”
- 分布式开放消息系统RocketMQ的原理与实践(消息的顺序问题、重复问题、可靠消息/事务消息)
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- 分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不同点
- 分布式消息中间件rocketmq的原理与实践
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- 分布式消息队列RocketMQ之Netty -- 1+N+M1+M2模型
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- 消息顺序和消息事务 - RocketMQ及分布式消息系统的原理以及重要问题解读
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- 分布式消息队列RocketMQ之Netty -- 1+N+M1+M2模型
- (原创)Rocketmq分布式消息队列的部署与监控
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId