ActiveMQ 生产者流量控制
2015-01-08 00:00
423 查看
摘要: Producer Flow Control 翻译
生产者流量控制(Producer Flow Control)
ActiveMQ 4.X 对流量的控制,主要是对底层TCP进行流量控制。由于流量控制是对底层TCP连接进行限制,所以可能会导致消费者线程被挂起。这个策略非常的高效,但是,当同一个TCP连接中有多个生产者和消费者时,可能会导致死锁!
ActiveMQ 5.0之后,我们可以单独对每个生产者进行流量控制,而不是对底层TCP连接进行限制,这样就不会导致整个TCP连接被挂起。“流量控制”指的是,当目标(destination)的内存、消息缓冲区的临时空间、消息持久化的空间 达到阈值时,Broker会减慢生产者的消息流。这时候,生产者有两种状态:一是等待资源的释放;一是抛出 JMSException 异常。流量控制后生产者的状态,可以在 <systemUsage> 中配置。当达到 memoryLmit 或 <systemUsage> 配置的阈值时,生产者默认是等待资源释放的。
生产者使用同步方式发送消息,ActiveMQ 默认生产者使用流量控制。通常来说,同步的方式都是用在发送persistent消息,除非你把 useAsyncSend 设为true(即异步发送)。
生产者使用异步发送消息,通常是发送 non-persistent 消息,这时候客户端是不接受服务器的任何回应的。如果你希望异步方式也使用流量控制,那你需要设置 ProducerWindowSize。
ProducerWindowSize是一个生产者在等到确认消息之前,可以发送给代理的数据的最大byte数,这个确认消息用来告诉生产者,代理已经收到先前发送的消息了。
或者,如果你要发送非持久化的消息(该消息默认是异步发送的),并且想要得到队列或者主题的内存限制是否达到,你只需将连接工厂配置为“alwaysSyncSend”。虽然这样会变得稍微慢一点,但是这将保证当出现内存问题时,你的消息生产者能够及时得到通知。
你也可以强制禁止生成者使用流量控制,通过把 producerFlowControl 设为 false 实现。配置如下:
需要注意一个问题,自 ActiveMQ 5.0 之后默认使用 Store Based Cursors 的内存模式,non-persistent 消息会保存到缓冲文件中,这时候你会发现队列的内存永远不会用完,所以流量控制会不生效。如果你希望把消息全部放到内存(即不使用缓存文件),你可以把内存模式设为 <vmQueueCursor>,这样流量控制就会生效。配置如下:
上面配置表示:所有的 non-persistent 消息都放在内存中,每个队列的内存为 1MB,当队列内存使用达到 1MB 时,就会进行流量控制。
生成者流量控制的原理
当你发送一个 persistent 消息到 Broker 时,便会等待 Broker 的响应。 Broker 处理完消息后,给生成者发送一个 ProducerAck 消息。它表示前一个消息已经处理完,生产者现在可以发送下一个消息了。如果进行了流量控制,生成者可能在等待 ProducerAck 消息,也可能收到一个 ProducerAck,但它表明生产者需要抛出 JMSException。
优势
为了避免 Broker 出现奔溃的情况,生产者必须等待 ProducerAck 消息,之后才可以继续发送下一个消息。当然,若消费者处理太慢,Broker 也会阻塞整个连接,虽然这样会导致消息发送延迟。
客户端异常配置
当资源不足时,生产者调用send()方法发送消息,生产者要么阻塞(默认为阻塞),要么抛出异常。若想客户端抛出异常,可以把 sendFailIfNoSpace 设为 true,则客户端会抛出 javax.jms.ResourceAllocationException。配置如下:
从 5.3.1 版本开始引入了 sendFailIfNoSpaceAfterTimeout 属性。当资源不足时,该熟悉导致send() 方法等待一段时间(单位:毫秒),在这段时间之前,资源得到满足,则 send() 方法执行成功;否则,客户端抛出异常。配置如下:
禁用流量控制
禁用流量控制有两种方法:
将 producerFlowControl 设为 false。如上文所示。
使用消息游标。
System usage
你也可以使用 <SystemUsage> 属性配置生产者的流量控制。配置如下:
你可以为非持久化的消息(NON_PERSISTENT)设置内存限制,为持久化消息(PERSISTENT)设置磁盘空间,以及为缓存区设置磁盘空间,代理将在减慢生产者之前使用这些空间。使用上述的默认设置,代理将会一直阻塞send()方法的调用,直至一些消息被消费,并且代理有了可用空间。默认值如上例所述,你可能需要根据你的环境增加这些值。
生产者流量控制(Producer Flow Control)
ActiveMQ 4.X 对流量的控制,主要是对底层TCP进行流量控制。由于流量控制是对底层TCP连接进行限制,所以可能会导致消费者线程被挂起。这个策略非常的高效,但是,当同一个TCP连接中有多个生产者和消费者时,可能会导致死锁!
ActiveMQ 5.0之后,我们可以单独对每个生产者进行流量控制,而不是对底层TCP连接进行限制,这样就不会导致整个TCP连接被挂起。“流量控制”指的是,当目标(destination)的内存、消息缓冲区的临时空间、消息持久化的空间 达到阈值时,Broker会减慢生产者的消息流。这时候,生产者有两种状态:一是等待资源的释放;一是抛出 JMSException 异常。流量控制后生产者的状态,可以在 <systemUsage> 中配置。当达到 memoryLmit 或 <systemUsage> 配置的阈值时,生产者默认是等待资源释放的。
生产者使用同步方式发送消息,ActiveMQ 默认生产者使用流量控制。通常来说,同步的方式都是用在发送persistent消息,除非你把 useAsyncSend 设为true(即异步发送)。
生产者使用异步发送消息,通常是发送 non-persistent 消息,这时候客户端是不接受服务器的任何回应的。如果你希望异步方式也使用流量控制,那你需要设置 ProducerWindowSize。
ActiveMQConnectionFactory connctionFactory = ... connctionFactory.setProducerWindowSize(1024000);
ProducerWindowSize是一个生产者在等到确认消息之前,可以发送给代理的数据的最大byte数,这个确认消息用来告诉生产者,代理已经收到先前发送的消息了。
或者,如果你要发送非持久化的消息(该消息默认是异步发送的),并且想要得到队列或者主题的内存限制是否达到,你只需将连接工厂配置为“alwaysSyncSend”。虽然这样会变得稍微慢一点,但是这将保证当出现内存问题时,你的消息生产者能够及时得到通知。
你也可以强制禁止生成者使用流量控制,通过把 producerFlowControl 设为 false 实现。配置如下:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false"/> </policyEntries> </policyMap> </destinationPolicy>
需要注意一个问题,自 ActiveMQ 5.0 之后默认使用 Store Based Cursors 的内存模式,non-persistent 消息会保存到缓冲文件中,这时候你会发现队列的内存永远不会用完,所以流量控制会不生效。如果你希望把消息全部放到内存(即不使用缓存文件),你可以把内存模式设为 <vmQueueCursor>,这样流量控制就会生效。配置如下:
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> </policyEntry>
上面配置表示:所有的 non-persistent 消息都放在内存中,每个队列的内存为 1MB,当队列内存使用达到 1MB 时,就会进行流量控制。
生成者流量控制的原理
当你发送一个 persistent 消息到 Broker 时,便会等待 Broker 的响应。 Broker 处理完消息后,给生成者发送一个 ProducerAck 消息。它表示前一个消息已经处理完,生产者现在可以发送下一个消息了。如果进行了流量控制,生成者可能在等待 ProducerAck 消息,也可能收到一个 ProducerAck,但它表明生产者需要抛出 JMSException。
优势
为了避免 Broker 出现奔溃的情况,生产者必须等待 ProducerAck 消息,之后才可以继续发送下一个消息。当然,若消费者处理太慢,Broker 也会阻塞整个连接,虽然这样会导致消息发送延迟。
客户端异常配置
当资源不足时,生产者调用send()方法发送消息,生产者要么阻塞(默认为阻塞),要么抛出异常。若想客户端抛出异常,可以把 sendFailIfNoSpace 设为 true,则客户端会抛出 javax.jms.ResourceAllocationException。配置如下:
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
从 5.3.1 版本开始引入了 sendFailIfNoSpaceAfterTimeout 属性。当资源不足时,该熟悉导致send() 方法等待一段时间(单位:毫秒),在这段时间之前,资源得到满足,则 send() 方法执行成功;否则,客户端抛出异常。配置如下:
<systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="3000"> /* 等待 3 秒 */ <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
禁用流量控制
禁用流量控制有两种方法:
将 producerFlowControl 设为 false。如上文所示。
使用消息游标。
System usage
你也可以使用 <SystemUsage> 属性配置生产者的流量控制。配置如下:
<systemUsage> <systemUsage> <memoryUsage> /* 配置内存 */ <memoryUsage limit="64 mb" /> </memoryUsage> <storeUsage> /* 配置持久化空间 */ <storeUsage limit="100 gb" /> </storeUsage> <tempUsage> /* 配置缓存区空间 */ <tempUsage limit="10 gb" /> </tempUsage> </systemUsage> </systemUsage>
你可以为非持久化的消息(NON_PERSISTENT)设置内存限制,为持久化消息(PERSISTENT)设置磁盘空间,以及为缓存区设置磁盘空间,代理将在减慢生产者之前使用这些空间。使用上述的默认设置,代理将会一直阻塞send()方法的调用,直至一些消息被消费,并且代理有了可用空间。默认值如上例所述,你可能需要根据你的环境增加这些值。
相关文章推荐
- 消息队列RabbitMQ和ActiveMQ的生产者流量控制
- ActiveMQ 生产者流量控制(Producer Flow Control)
- 消息队列RabbitMQ和ActiveMQ的生产者流量控制
- 消息队列RabbitMQ和ActiveMQ的生产者流量控制
- 消息队列RabbitMQ和ActiveMQ的生产者流量控制
- ActiveMq生产者流量控制(Producer Flow Control)
- 消息队列RabbitMQ和ActiveMQ的生产者流量控制
- 消息队列做生产者流量控制
- TCP/IP之TCP协议——流量控制(滑动窗口协议)
- 使用linux下的TC进行服务器流量控制
- 服务接口的流量控制策略之RateLimit
- TCP的流量控制
- TCP协议的滑动窗口协议以及流量控制
- 第二人生的源码分析(二十七)发送数据的流量控制
- 合理控制流量,避免网络拥塞,实现多链路自动切换解决方案为企业互联网应用保驾护航
- 使用 linux 下的 TC 进行服务器流量控制
- TC(HTB)+iptables作流量控制
- linux中用TC.来做流量控制 ZT
- 简单的activemq,生产者和消费者代码