您的位置:首页 > 其它

ActiveMQ 生产者流量控制

2015-01-08 00:00 423 查看
摘要: Producer Flow Control 翻译

生产者流量控制(Producer Flow Control)
ActiveMQ 4.X 对流量的控制,主要是对底层TCP进行流量控制。由于流量控制是对底层TCP连接进行限制,所以可能会导致消费者线程被挂起。这个策略非常的高效,但是,当同一个TCP连接中有多个生产者和消费者时,可能会导致死锁!
ActiveMQ 5.0之后,我们可以单独对每个生产者进行流量控制,而不是对底层TCP连接进行限制,这样就不会导致整个TCP连接被挂起。“流量控制”指的是,当目标(destination)的内存、消息缓冲区的临时空间、消息持久化的空间 达到阈值时,Broker会减慢生产者的消息流。这时候,生产者有两种状态:一是等待资源的释放;一是抛出 JMSException 异常。流量控制后生产者的状态,可以在 <systemUsage> 中配置。当达到 memoryLmit 或 <systemUsage> 配置的阈值时,生产者默认是等待资源释放的。
生产者使用同步方式发送消息,ActiveMQ 默认生产者使用流量控制。通常来说,同步的方式都是用在发送persistent消息,除非你把 useAsyncSend 设为true(即异步发送)。
生产者使用异步发送消息,通常是发送 non-persistent 消息,这时候客户端是不接受服务器的任何回应的。如果你希望异步方式也使用流量控制,那你需要设置 ProducerWindowSize。
ActiveMQConnectionFactory connctionFactory = ...
connctionFactory.setProducerWindowSize(1024000);


ProducerWindowSize是一个生产者在等到确认消息之前,可以发送给代理的数据的最大byte数,这个确认消息用来告诉生产者,代理已经收到先前发送的消息了。
或者,如果你要发送非持久化的消息(该消息默认是异步发送的),并且想要得到队列或者主题的内存限制是否达到,你只需将连接工厂配置为“alwaysSyncSend”。虽然这样会变得稍微慢一点,但是这将保证当出现内存问题时,你的消息生产者能够及时得到通知。
你也可以强制禁止生成者使用流量控制,通过把 producerFlowControl 设为 false 实现。配置如下:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false"/>
</policyEntries>
</policyMap>
</destinationPolicy>


需要注意一个问题,自 ActiveMQ 5.0 之后默认使用 Store Based Cursors 的内存模式,non-persistent 消息会保存到缓冲文件中,这时候你会发现队列的内存永远不会用完,所以流量控制会不生效。如果你希望把消息全部放到内存(即不使用缓存文件),你可以把内存模式设为 <vmQueueCursor>,这样流量控制就会生效。配置如下:
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>


上面配置表示:所有的 non-persistent 消息都放在内存中,每个队列的内存为 1MB,当队列内存使用达到 1MB 时,就会进行流量控制。
生成者流量控制的原理
当你发送一个 persistent 消息到 Broker 时,便会等待 Broker 的响应。 Broker 处理完消息后,给生成者发送一个 ProducerAck 消息。它表示前一个消息已经处理完,生产者现在可以发送下一个消息了。如果进行了流量控制,生成者可能在等待 ProducerAck 消息,也可能收到一个 ProducerAck,但它表明生产者需要抛出 JMSException。
优势
为了避免 Broker 出现奔溃的情况,生产者必须等待 ProducerAck 消息,之后才可以继续发送下一个消息。当然,若消费者处理太慢,Broker 也会阻塞整个连接,虽然这样会导致消息发送延迟。
客户端异常配置
当资源不足时,生产者调用send()方法发送消息,生产者要么阻塞(默认为阻塞),要么抛出异常。若想客户端抛出异常,可以把 sendFailIfNoSpace 设为 true,则客户端会抛出 javax.jms.ResourceAllocationException。配置如下:
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>


从 5.3.1 版本开始引入了 sendFailIfNoSpaceAfterTimeout 属性。当资源不足时,该熟悉导致send() 方法等待一段时间(单位:毫秒),在这段时间之前,资源得到满足,则 send() 方法执行成功;否则,客户端抛出异常。配置如下:
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000"> /* 等待 3 秒 */
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>


禁用流量控制
禁用流量控制有两种方法:

将 producerFlowControl 设为 false。如上文所示。

使用消息游标。

System usage
你也可以使用 <SystemUsage> 属性配置生产者的流量控制。配置如下:
<systemUsage>
<systemUsage>
<memoryUsage>     /* 配置内存 */
<memoryUsage limit="64 mb" />
</memoryUsage>
<storeUsage>      /* 配置持久化空间 */
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage>       /* 配置缓存区空间 */
<tempUsage limit="10 gb" />
</tempUsage>
</systemUsage>
</systemUsage>


你可以为非持久化的消息(NON_PERSISTENT)设置内存限制,为持久化消息(PERSISTENT)设置磁盘空间,以及为缓存区设置磁盘空间,代理将在减慢生产者之前使用这些空间。使用上述的默认设置,代理将会一直阻塞send()方法的调用,直至一些消息被消费,并且代理有了可用空间。默认值如上例所述,你可能需要根据你的环境增加这些值。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ActiveMQ 流量控制