您的位置:首页 > 数据库 > Redis

spring taskExecutor redis redisMessageListenerContailner线程优化

2017-08-08 13:17 375 查看
背景:

测试工程师给系统做压力测试,发现JVM产生了大量的redisMessageListenerContainer线程,系统跑着跑着进程就被kill掉了





查看源码:

进入redisMessageListenerContainer 类打断点进去,查看内部代码

redisMessageListenerContainer.java

public void onMessage(Message message, byte[] pattern) {
Collection<MessageListener> listeners = null;
// if it's a pattern, disregard channel
if (pattern != null && pattern.length > 0) {
listeners = patternMapping.get(new ByteArrayWrapper(pattern));
} else {
pattern = null;
// do channel matching first
listeners = channelMapping.get(new ByteArrayWrapper(message.getChannel()));
}
if (!CollectionUtils.isEmpty(listeners)) {
dispatchMessage(listeners, message, pattern);
}
}

}

private void dispatchMessage(Collection<MessageListener> listeners, final Message message, final byte[] pattern) {
final byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
for (final MessageListener messageListener : listeners) {
taskExecutor.execute(new Runnable() {// 使用了线程池

public void run() {
processMessage(messageListener, message, source);
}
});
}
}

// 测试工程师给系统做压力测试,发现JVM产生了大量的redisMessageListenerContainer线程,系统跑着跑着进程就被kill掉了
public void afterPropertiesSet() {
if (taskExecutor == null) {// 如果没有设置默认的使用的是SimpleAsyncaskExecutor,这是一个简单的执行器,并不有线程池功能
manageExecutor = true;
taskExecutor = createDefaultTaskExecutor();
}
if (subscriptionExecutor == null) {
subscriptionExecutor = taskExecutor;
}
initialized = true;
}

protected TaskExecutor createDefaultTaskExecutor() {
String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
return new SimpleAsyncTaskExecutor(threadNamePrefix);
}


RedisHttpSessionConfiguration.Java

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, RedisOperationsSessionRepository messageListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
if(this.redisTaskExecutor != null) {
container.setTaskExecutor(this.redisTaskExecutor);
}

if(this.redisSubscriptionExecutor != null) {
container.setSubscriptionExecutor(this.redisSubscriptionExecutor);
}

container.addMessageListener(messageListener, Arrays.asList(new PatternTopic[]{new PatternTopic("__keyevent@*:del"), new PatternTopic("__keyevent@*:expired")}));
container.addMessageListener(messageListener, Arrays.asList(new PatternTopic[]{new PatternTopic(messageListener.getSessionCreatedChannelPrefix() + "*")}));
return container;
}

@Qualifier("springSessionRedisTaskExecutor")
public void setRedisTaskExecutor(Executor redisTaskExecutor) {
this.redisTaskExecutor = redisTaskExecutor;
}


解决方案:

XML中添加一个redis线程池,问题解决

<bean id="springSessionRedisTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="10" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="300" />
<!-- 队列最大长度 -->
<property name="queueCapacity" value="500" />
<!-- 线程池维护线程所允许的空闲时间,默认为60s -->
<property name="keepAliveSeconds" value="60" />
</bean>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: