您的位置:首页 > 编程语言 > Java开发

java线程池在web项目中应用

2016-01-08 15:57 477 查看

线程池

JANUARY 8,2016

遇到了问题

依稀还记得是15年初的时候,一些业务数据发生了错误,经过仔细分析,认为是重复发起了请求引起的,经过多方确认任务重复请求不是认为操作,分析程序发现程序也不会无缘无故发起二次请求。因为这个情况只发生过一次两次,再加上仔细检查代码任务不肯能发生的事一般会推给操作人误操作,所以问题就这么搁置了。

再后来因为操作越来越频繁上面的情况越来越多,然后才意识到这个问题的严重性,以至于到处google baidu 论坛提问,当时找到了遇到相同问题的贴子浏览器在一定超时后会自动再次请求,然后又把问题推给浏览器(哈哈,反正当时不认为程序有问题)

再后来有机会了解了Nginx 其中Nginx配置重复提交现象我从前台提交一个get请求。后台处理了两次!顿时引起了我的注意。再继续了解到F5也有tiemout属性配置,然后再通过后台数据重复提交的间隔大概都是5分钟,联想到timeout属性配置是300秒才意识到是执行的业务方法时间过长,浏览器得不到响应F5自动触发了二次请求。

解决问题

上面说了,引起重复提交是由于方法执行时间过长导致F5自动触发二次请求。
解决方案1:使方法执行时间缩短(肯定不可能做到,执行方法时间不可估计)
解决方案2:使长时间执行的方法新建一个线程,如果页面请求直接告诉前端页面说明方法执行中,剩下的交给后台线程去执行(最后使用的)。

再次遇到问题

由于执行时间长的方法都新启动线程去执行,方法多而导致新开的线程多,最后服务器会由线程池满了而崩溃,服务器崩溃我也跟着崩溃

了解线程池

使用线程池的好处

1,减少在创建和销毁线程上所花的时间及系统资源开销
2,减少系统创建大量线程而导致消耗完系统内存及“过渡切换”
3,池中的线程数量是固定的,如果需要执行大量线程方法超过线程池数量会由排队策略决定线程的执行过程

新建线程池

package net.uni.ap.thread;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 处理内容:线程池工厂类
*/
public class ThreadPoolExecutorFactory {
/**
* corePoolSize 池中所保存的线程数,包括空闲线程。
*/
private static final int corePoolSize = 40;
/**
* maximumPoolSize - 池中允许的最大线程数(采用LinkedBlockingQueue时没有作用)。
*/
private static final int maximumPoolSize = 40;
/**
* keepAliveTime -当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间,线程池维护线程所允许的空闲时间
*/
private static final int keepAliveTime = 60;

/**
* 执行前用于保持任务的队列(缓冲队列)
*/
private static final int capacity = 300;

/**
* 线程池对象
*/
private static ThreadPoolExecutor threadPoolExecutor = null;

//构造方法私有化
private ThreadPoolExecutorFactory(){}

public static ThreadPoolExecutor getThreadPoolExecutor(){
if(null == threadPoolExecutor){
ThreadPoolExecutor t;
synchronized (ThreadPoolExecutor.class) {
t = threadPoolExecutor;
if(null == t){
synchronized (ThreadPoolExecutor.class) {
t = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.DiscardOldestPolicy());
}
threadPoolExecutor = t;
}
}
}
return threadPoolExecutor;
}
}


执行过程是:
1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

Executors工具

创建线程池另一种方法就是使用:Executors.newFixedThreadPool(int)这个方法,因为它既可以限制数量,而且线程用完后不会一直被cache住;那么就通过它来看看源码,回过头来再看其他构造方法的区别:

<span style="font-size:10px;">public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}</span>
其实你可以自己new一个ThreadPoolExecutor,来达到自己的参数可控的程度,例如,可以将LinkedBlockingQueue换成其它的(如:SynchronousQueue),只是可读性会降低,这里只是使用了一种设计模式。我们现在来看看ThreadPoolExecutor的源码是怎么样的。这里来看下构造方法中对那些属性做了赋值

源码1

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这里你可以看到最终赋值的过程,可以先大概知道下参数的意思:
corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Thread放入到等待队列中了;
maximumPoolSize:一般你用不到,当大于了这个值就会将Thread由一个丢弃处理机制来处理,但是当你发生:newFixedThreadPool的时候,corePoolSize和maximumPoolSize是一样的,而corePoolSize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中,看了后面的代码你就知道了。workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的队列属性为:workers,为一个HashSet;内部被包装了一层,后面会看到这部分代码。
keepAliveTime:默认都是0,当线程没有任务处理后,保持多长时间,cachedPoolSize是默认60s,不推荐使用。threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread方法即可;handler:也就是参数maximumPoolSize达到后丢弃处理的方法,java提供了5种丢弃处理的方法,当然你也可以自己弄,主要是要实现接口:RejectedExecutionHandler中的方法:public
void rejectedExecution(Runnabler, ThreadPoolExecutor e)java默认的是使用:AbortPolicy,他的作用是当出现这中情况的时候会抛出一个异常;其余的还包含:
1、CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程
2、DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。
3、DiscardPolicy:什么也不做
4、AbortPolicy:java默认,抛出一个异常:RejectedExecutionException。
通常你得到线程池后,会调用其中的:submit方法或execute方法去操作;其实你会发现,submit方法最终会调用execute方法来进行操作,只是他提供了一个Future来托管返回值的处理而已,当你调用需要有返回值的信息时,你用它来处理是比较好的;这个Future会包装对Callable信息,并定义一个Sync对象(),当你发生读取返回值的操作的时候,会通过Sync对象进入锁,直到有返回值的数据通知,具体细节先不要看太多,继续向下:来看看execute最为核心的方法吧:

源码2

//corePoolSize 核心线程池数量(在线程数量小于corePoolSize时,新增加执行任务时会优先
//创建新的线程执行该任务,在=核心线程池数量时,会将任务加入BlockingQueue阻塞队列,当队
//列满时会又创建新的线程执行该任务,直到等于maximumPoolSize,当更进一步的时候将会执
//行reject(command))
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
//队列满且线程池调用了shutdown后,还在调用execute方法
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}


附录

package net.uni.ap.thread;

/**
*
* 处理内容:线程处理类
* @version: 1.0
* @see:net.uni.ap.thread.IThreadPoolExecutorHandler.java
* @date:2015-5-13
* @author:梅海波
*/
public interface IThreadPoolExecutorHandler extends Runnable{

}
package net.uni.ap.thread;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadHandler<T> extends ThreadHandlerAbstract{
private static final Logger logger = LoggerFactory.getLogger(ThreadHandler.class);
protected T t;
protected Class<T> modelClass;
protected String method = "";

@SuppressWarnings("unchecked")
public ThreadHandler(Integer threadCount,T t){
this.t = t;
modelClass = (Class<T>) t.getClass();
if(null != threadCount){
super.countDownLatch = new CountDownLatch(threadCount);
}
}

@Override
public void run() {
try{
Method[] methods = this.modelClass.getMethods();

for (Method method : methods) {
if(method.getName().equals(this.method)){
method.invoke(t);
}
}
if(null != super.countDownLatch){
super.countDownLatch.countDown();
}
if(null != ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue() && (ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue().size() < 20
|| ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue().size() == 0 )){
ThreadPoolExecutorFactory.getThreadPoolExecutor().setCorePoolSize(20);
}else{
ThreadPoolExecutorFactory.getThreadPoolExecutor().setCorePoolSize(40);
}
}catch (Exception e) {
e.printStackTrace();
logger.error("线程池处理异常 方法:" + this.method,e);
}

}

@Override
public void execute(IThreadPoolExecutorHandler threadPoolExecutorHandler,String method)throws Exception{
this.method = method;

try {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
threadPoolExecutor.execute(threadPoolExecutorHandler);
} catch (Exception e) {
e.printStackTrace();
logger.error("线程池处理异常 execute 方法:" + this.method,e);
throw new Exception(e.getMessage(),e);
}

}

@Override
public void await() throws Exception {
try {
if(super.countDownLatch != null){
countDownLatch.await();
}
} catch (Exception e) {
e.printStackTrace();
logger.error("线程池处理异常 await 方法:" + this.method,e);
throw new Exception(e.getMessage(),e);
}

}

@Override
public void shutdown() throws Exception {
try {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
threadPoolExecutor.shutdown();
} catch (Exception e) {
e.printStackTrace();
logger.error("线程池处理异常 shutdown 方法:" + this.method,e);
throw new Exception(e.getMessage(),e);
}

}

public int getPoolSize(){
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
return threadPoolExecutor.getPoolSize();

}

/**
* 获取线程池队列状态数量
* @return
* @方法说明
* @date 2015-8-26
* @author 梅海波
*/
public int getQueueSize(){
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
return threadPoolExecutor.getQueue().size();
}

public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorFactory.getThreadPoolExecutor();
for (int i = 0; i < 10000; i++) {
final int index = i;
threadPoolExecutor.execute(
new Runnable() {
@Override
public void run() {
try {
System.out.println(index + "队列数:" + ThreadPoolExecutorFactory.getThreadPoolExecutor().getQueue().size());
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

}
}

}


package net.uni.ap.thread;

import java.util.concurrent.CountDownLatch;

public abstract class ThreadHandlerAbstract implements IThreadPoolExecutorHandler {

/**
* 一个任务分多个线程处理时使用
*/
protected CountDownLatch countDownLatch = null;

public ThreadHandlerAbstract(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}

public ThreadHandlerAbstract(){}

public abstract void execute(IThreadPoolExecutorHandler threadPoolExecutorHandler,String method)throws Exception;

public abstract void await()throws Exception;

public abstract void shutdown()throws Exception;
}


内容暂时写这么多,后期有时间再继续补充

作者:梅海波
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: