您的位置:首页 > 其它

RocketMQ原理解析-consumer 3.长轮询

2017-07-13 15:40 766 查看
Rocketmq的消息是由consumer端主动到broker拉取的, consumer向broker发送拉消息请求, PullMessageService服务通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息

         DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法执行向broker拉消息动作

1.      获取ProcessQueue判读是否drop的, drop为true返回

2.      给ProcessQueue设置拉消息时间戳

3.      流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息

4.      流量控制,正在消费队列中消息的跨度超过阀值(默认2000),稍后在消费

5.      根据topic获取订阅关系

6.      构建拉消息回调对象PullBack, 从broker拉取消息(异步拉取)返回结果是回调

7.      从内存中获取commitOffsetValue  //TODO 这个值跟pullRequest.getNextOffset区别

8.      构建sysFlag   pull接口用到的flag

9.      调底层通信层向broker发送拉消息请求

如果master压力过大,会建议去slave拉取消息

如果是到broker拉取消息清楚实时提交标记位,因为slave不允许实时提交消费进度,可以定时提交 

//TODO 关于master拉消息实时提交指的是什么?

10.  拉到消息后回调PullCallback

处理broker返回结果pullResult

           更新从哪个broker(master 还是slave)拉取消息

           反序列化消息

           消息过滤

           消息中放入队列最大最小offset,方便应用来感知消息堆积度

将消息加入正在处理队列ProcessQueue

将消息提交到消费消息服务ConsumeMessageService

流控处理, 如果pullInterval参数大于0 (拉消息间隔,如果为了降低拉取速度,可以设置大于0的值),延迟再执行拉消息,  如果pullInterval为0立刻在执行拉消息动作

序列图

1.      向broker发送长轮询请求



2.   Broker接收长轮询请求



3.      Consumer接收broker响应



长轮询活动图:



一张图画不下,再来一张



4000
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: