java.util.concurrent 之Executors
2016-05-11 13:37
405 查看
java.util.concurrent.Executor,
java.util.concurrent.ExecutorService,
java.util.concurrent. Executors这三者均是 Java Executor 框架的一部分,用来提供线程池的功能。因为创建和管理线程非常心累,并且操作系统通常对线程数有限制,所以建议使用线程池来并发执行任务,而不是每次请求进来时创建一个线程。使用线程池不仅可以提高应用的响应时间,还可以避免
"java.lang.OutOfMemoryError: unable to create new native thread"之类的错误。
在 Java 1.5 时,开发者需要关心线程池的创建和管理,但在 Java 1.5 之后 Executor 框架提供了多种内置的线程池,例如:FixedThreadPool(包含固定数目的线程),CachedThreadPool(可根据需要创建新的线程)等等。
为什么要用线程池:
减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为因为消耗过多的内存
Excutors主要方法:
newCachedThreadPool() :创建一个
ExecutorService,该 ExecutorService 根据需要来创建线程,可以重复利用已存在的线程来执行任务。
newFixedThreadPool(int
numberOfThreads) :创建一个可重复使用的、固定线程数量的 ExecutorService。
newScheduledThreadPool(int
corePoolSize):根据时间计划,延迟给定时间后创建 ExecutorService(或者周期性地创建 ExecutorService)。
newSingleThreadExecutor():创建单个工作线程
ExecutorService。
newSingleThreadScheduledExecutor():根据时间计划延迟创建单个工作线程
ExecutorService(或者周期性的创建)。
newWorkStealingPool():创建一个拥有多个任务队列(以便减少连接数)的
ExecutorService。
Executor vs ExecutorService vs Executors
正如上面所说,这三者均是 Executor 框架中的一部分。Java 开发者很有必要学习和理解他们,以便更高效的使用 Java 提供的不同类型的线程池。总结一下这三者间的区别,以便大家更好的理解:Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口
Executor 和 ExecutorService 第二个区别是:Executor 接口定义了
execute()方法用来接收一个
Runnable接口的对象,而
ExecutorService 接口中的
submit()方法可以接受
Runnable和
Callable接口的对象。
Executor 和 ExecutorService 接口第三个区别是 Executor 中的
execute()方法不返回任何结果,而
ExecutorService 中的
submit()方法可以通过一个
Future 对象返回运算结果。
Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用
shutDown()方法终止线程池。可以通过 《Java
Concurrency in Practice》 一书了解更多关于关闭线程池和如何处理 pending 的任务的知识。
Executors 类提供工厂方法用来创建不同类型的线程池。比如:
newSingleThreadExecutor()创建一个只有一个线程的线程池,
newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,
newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。
Source Code Example:
package java.util.concurrent; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.security.PrivilegedActionException; import java.security.AccessControlException; import sun.security.util.SecurityConstants; public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedExecutorService(executor); } public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedScheduledExecutorService(executor); } public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } public static ThreadFactory privilegedThreadFactory() { return new PrivilegedThreadFactory(); } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<Object>(task, null); } public static Callable<Object> callable(final PrivilegedAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() { return action.run(); }}; } public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() throws Exception { return action.run(); }}; } public static <T> Callable<T> privilegedCallable(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallable<T>(callable); } public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallableUsingCurrentClassLoader<T>(callable); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } static final class PrivilegedCallable<T> implements Callable<T> { private final Callable<T> task; private final AccessControlContext acc; PrivilegedCallable(Callable<T> task) { this.task = task; this.acc = AccessController.getContext(); } public T call() throws Exception { try { return AccessController.doPrivileged( new PrivilegedExceptionAction<T>() { public T run() throws Exception { return task.call(); } }, acc); } catch (PrivilegedActionException e) { throw e.getException(); } } } static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> { private final Callable<T> task; private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Whether setContextClassLoader turns out to be necessary // or not, we fail fast if permission is not available. sm.checkPermission(new RuntimePermission("setContextClassLoader")); } this.task = task; this.acc = AccessController.getContext(); this.ccl = Thread.currentThread().getContextClassLoader(); } public T call() throws Exception { try { return AccessController.doPrivileged( new PrivilegedExceptionAction<T>() { public T run() throws Exception { Thread t = Thread.currentThread(); ClassLoader cl = t.getContextClassLoader(); if (ccl == cl) { return task.call(); } else { t.setContextClassLoader(ccl); try { return task.call(); } finally { t.setContextClassLoader(cl); } } } }, acc); } catch (PrivilegedActionException e) { throw e.getException(); } } } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } static class PrivilegedThreadFactory extends DefaultThreadFactory { private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedThreadFactory() { super(); SecurityManager sm = System.getSecurityManager(); if (sm != null) { // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Fail fast sm.checkPermission(new RuntimePermission("setContextClassLoader")); } this.acc = AccessController.getContext(); this.ccl = Thread.currentThread().getContextClassLoader(); } public Thread newThread(final Runnable r) { return super.newThread(new Runnable() { public void run() { AccessController.doPrivileged(new PrivilegedAction<Void>() { public Void run() { Thread.currentThread().setContextClassLoader(ccl); r.run(); return null; } }, acc); } }); } } static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } } static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return e.schedule(callable, delay, unit); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } } private Executors() {} }
package java.util.concurrent; public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
package java.util.concurrent; import java.util.List; import java.util.Collection; public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
相关文章推荐
- Java实现插入排序
- 如何查看Java进程并获取进程ID?
- 解决MyEclipse10.7 building/deploy 到服务器的时候报java.lang.NullPointerException
- 用 Java 对 hbase 进行CRUD增删改查操作
- SpringMVC与mybatis整合
- Java LinkedHashMap工作原理及实现(一)
- Java 实现二维码及有Logo 的二维码(SpringMVC+Zxing+Jsp)
- 《java入门第一季》之正则表达式小案例
- 《java入门第一季》之正则表达式小案例
- 蓝桥杯 BEGIN_03 入门训练 圆的面积
- 蓝桥杯 BEGIN_02 入门训练 序列求和
- Java虚拟机------JVM
- Spring Remoting: Remote Method Invocation (RMI)--转
- 蓝桥杯 BEGIN_01 入门训练 A+B问题
- Java中的:Date类、Math类
- 《java入门第一季》之好玩的正则表达式
- 《java入门第一季》之好玩的正则表达式
- Spring Mvc数据的处理
- 【第六章】 AOP 之 6.3 基于Schema的AOP ——跟我学spring3
- JAVA Scanner类里next方法和nextLine方法的区别