您的位置:首页 > 编程语言 > Java开发

Java 多线程编程-并发编程

2018-02-28 16:02 531 查看
Java 多线程编程-并发编程

Table of Contents Java 多线程编程-并发编程并发基础类之任务类Callable 接口Future 接口FutureTask 类 构造方法run()方法get()方法并发基础类之线程相关类ThreadFactory 接口ThreadGroup 类Executors.DefaultThreadFactory 类并发基础类之调度器Executor 接口ExecutorService 接口ScheduledExecutorService 接口AbstractExecutorService 抽象类ThreadPoolExecutor 类内部类 WorkerAtomicInteger Int 类型变量 ct1构造方法Excute() 方法线程池的管理ThreadPoolExecutor 使用教程

对于初学者来说,多线程就是 new 一个 Thread(),然后设置一个 Runnable,调用 Thread.start()方法启动线程。可是这种方式可能存在以下问题

子线程不可控制,不可取消(可以自行设置标志位,结束run方法),不可返回结果

创建新的线程,并且启动线程,需要抢占资源,可能会超过线程数,同时空闲的线程,没有被重新利用。

为此,这里会介绍两个内容点

并发编程基础:Callable 相关内容

多线程框架: Executor 相关内容

其中 Executor 的相关类图如下:

细节的介绍,可以看后面的文章。

并发基础类之任务类

Callable 接口

Callable 接口代码如下:

public interface Callable<V> {
/**执行方法,其中V表示返回的结果
*/
V call() throws Exception;
}


和 Runnable 类似,只不过提供了一个 call()方法,可以返回执行的结果,其泛型参数 表示结果类型。

Future 接口

Future 的代码如下:

public interface Future<V> {

/**
* Attempts to cancel execution of this task.  This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run.  If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
* 取消任务,如果任务已经完成,已经被取消或者不可被取消,则取消失败。boolean 值参数表示是否可以取消      * 执行中的任务。
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns {@code true} if this task was cancelled before it completed
* normally. 任务是否已经被取消
*/
boolean isCancelled();

/**
* Returns {@code true} if this task completed. 任务是否已经被完成
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result. 获取任务结果,如果任务没有完成,get 方法会阻塞
*
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
* 等同于 get()只不过,声明了最长阻塞时间,不会无限阻塞
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}


总的来说, Future 接口, 提供了以下能力

获取任务结果(可阻塞)

取消任务,获取任务的简单状态(是否完成,是否被取消等)

这个是异步回调的基础接口,需要好好理解,但是实际上,我们并不会直接用它,而是实现它的子类 FutureTask ,具体的介绍,往后看。

FutureTask 类

FutureTask 类实现 RunnableFuture 接口,而 RunnableFuture接口的实现如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}


从这个角度看,我们可以知道 RunableFuture 实现的规范如下:

run() 方法,子任务执行的方法

get() 方法,获取子任务执行结果的能力

从 Future 结果继承的,取消子任务,获取子任务简单状态的方法

而 FutureTask 是 RunnableFuture 的具体实现类,根据官方的描述, FutureTask 是可取消的异步操作,提供了 Future 接口的具体实现,实现了开始和取消一个操作的,查询操作是否完成,获取操作结果的方法。它可以包裹一个 Callable 或者 Runnable 可以被 Executor 执行和调度。

我们来看下关键代码

public class FutureTask<V> implements RunnableFuture<V> {
/**
* 表示该任务的执行状态,只可能为以下四种情况
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL 正常完成的流程 new -> completing -> normal
* NEW -> COMPLETING -> EXCEPTIONAL 异常退出的流程 new -> completing -> exceptional
* NEW -> CANCELLED 取消的流程 new -> cancelled
* NEW -> INTERRUPTING -> INTERRUPTED 被中断的流程 new -> interrupting -> interrupted
*/
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
/** The underlying callable; nulled out after running  */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() 工作线程,用于执行 Callable */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

// 两个构造方法是一样的,如果传递的是 Runnable 则会被封装成 Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;       // ensure visibility of callable
}
}


构造方法

构造方法可以传递一个 Callable 或者 Runnable,但是实际上传递 Runnable 也好,最终还是通过 Excutors 工具类,将 Runnable 封装成 Callable,这里的话,会对 state 进行赋值,赋值为 new。

run()方法

public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}


首先会判断任务的 State,如果 State 不为 new 则直接返回,然后执行 Callable 类型的类变量 callable 的call()方法,依旧构造方法里面传入的 Callable 对象。途中会捕获异常,如果发生异常,则将 result 置为null,并且设置 Exception,最后如果执行执行,则将 call()方法返回的结果,通过set()方法传递给outcome变量,最终通过get()等方法,获取的就是这个 outcome 泛型对象的值。

get()方法

get()方法用于获取任务执行的结果,如果状态还处于未完成的话,则改方法进行阻塞。

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}


这里的关键方法是调用了 awaitDone() 方法,顾名思义,就是一个阻塞方法,其代码如下:

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L;    // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {//进入一个无限循环中
int s = state;
if (s > COMPLETING) {//判断任务是否被完成,取消,或者中断
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)//如果任务执行中,则 yeid 线程,yeid 不同于 sleep 和waite,仅仅
//在线程资源紧张的情况下,会暂停线程
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();//对 q 进行初始化,仅仅在 timed 传参为 true 的时候
}
else if (!queued)//加入队列,并且将 q.next 赋值给 wauters
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {//以下代码是计算阻塞时间的代码
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)//LockSupport 是一个用来创建锁和其它同步类的基本线程元素类
LockSupport.parkNanos(this, parkNanos);//该线程会阻塞特定的事件
}
else
LockSupport.park(this);
}
}


waitDone()方法就是将当前线程加入等待队列(WaitNode 持有当前线程),然后调用 LockSupport的park()方法,将线程阻塞,等待执行完成或者异常。那么这里的等待,又是在哪里被唤醒的呢?在 run()方法里,调用了 set()方法,最终调用了 finishCompletion()方法。

public void run() {
..................
if (ran)
set(result);
.................
}
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//这里对进行 unPark(),接触线程阻塞
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null;        // to reduce footprint
}


FutureTask 中的同步操作,并没有使用锁机制,而是使用了 LockSupport 阻塞和唤起线程,锁操作的话,会额外的耗时。

总的来说,FuterTask 提供了,任务执行的方法,获取任务结果的方法,以及取消的机制,能够实现同步编程的基本操作。

并发基础类之线程相关类

ThreadFactory 接口

总是通过硬编码 new 一个 Thread是不优雅的,所以 java 提供了一个接口,用于实现new Thread这样一个操作,这样的话,你就可以生产出一些你需要的特殊 Thrad,例如达成某种任务的 Thread,高优先级的 Thread 等等。

public interface ThreadFactory {
Thread newThread(Runnable r);
}


总结:工厂模式,往往是为了减少业务耦合。

ThreadGroup 类

ThreadGroup 类,这个类主要是方便线程群的管理,统一设置线程的一些属性。

例如从 setMaxPriority() 方法说起,这个方法统一设置了线程群里面的优先级:

public final void setMaxPriority(int pri) {
int ngroupsSnapshot;
ThreadGroup[] groupsSnapshot;
synchronized (this) {
checkAccess();
// Android changed: Clamp to MIN_PRIORITY, MAX_PRIORITY.
// if (pri < Thread.MIN_PRIORITY || pri > Thread.MAX_PRIORITY) {
//     return;
// }
if (pri < Thread.MIN_PRIORITY) {
pri = Thread.MIN_PRIORITY;
}
if (pri > Thread.MAX_PRIORITY) {
pri = Thread.MAX_PRIORITY;
}

maxPriority = (parent != null) ? Math.min(pri, parent.maxPriority) : pri;
ngroupsSnapshot = ngroups;
if (groups != null) {
groupsSnapshot = Arrays.copyOf(groups, ngroupsSnapshot);
} else {
groupsSnapshot = null;
}
}
for (int i = 0 ; i < ngroupsSnapshot ; i++) {
groupsSnapshot[i].setMaxPriority(pri);
}
}


这个方法主要是遍历类Thread 数组变量 ngroups,然后逐一给线程设置优先级。

总结:ThreadGroup 类,提供了线程组这样一个概念,这个组里面拥有的是一些属性相同的线程。至于实际用途,视开发场景而言。

Executors.DefaultThreadFactory 类

创建一个 ThreadPoolExecutor 如果不传递 ThreadFactory 的话,则会使用 DefaultThreadFactory。

private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}


构造方法里面会通过 Thread.currentThread.getThreadGroup()创建一个 ThreadGroup,然后在 newThread() 方法里面,会给新加入的 Thread,设置 ThreadGroup,这样通过,这个 ThreadFactory 产生的 Thread,就具有一些统一的属性了。

题外话:ThreadGroup 里面使用了数组作为存储的数据结构,看起来好像不太好扩展,而且从数组不可变的角度来考虑,似乎无法理解这一点。

并发基础类之调度器

为什么需要调度器,我们考虑一些情况,如果一个应用进程里面,野蛮的 new 一些 Thread()出来,并且调用了线程的 start(),从操作系统角度考虑,这样肯定会造成资源竞争的,会带来以下一些缺点

资源竞争,重要的线程,可能被低优先级线程阻塞。

资源浪费,闲置的线程,无法被回收利用。

从 Android 的角度考虑, liunx 限制了最大线程数目,超过线程数目,会导致 OOM。

所以,需要线程调度器这样一个管理者的角色。Java从 jdk 1.5 开始在 java.util.concurrent 包里面提供了一系列相关的接口和类,实现这个能力。

先看下类图结构。

Executor接口 最基本的接口,实现了线程池的 excute()行为

ExecutorService接口 继承了 Executor 接口,实现了 shutdown(),submit()方法,扩展了停止任务,提交任务的能力

AbstractExecutorService抽象类 实现了 ExecutorService 接口里面的大部分方法

ThreadPoolExecutor 类 继承 AbstractExecutorService ,实现了线程池

ScheduledExecutorService接口 继承ExecutorService接口,提供了周期调度的能力

ScheduledThreadPoolExecutor类,周期调度的线程池

Executos 类,线程池的静态工厂方法,提供了四种便捷的线程池

Executor 接口

最基础的是 Executor 接口

public interface Executor {
/**
* Executes the given command at some time in the future.  The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
*/
void execute(Runnable command);
}


这个接口类,提供了 excute()方法,可以执行提交一个任务,也就是 执行 Runnable 对象的 run()方法。

ExecutorService 接口

继承 Executor 接口,实现了一些拓展,较为基本的线程池接接口。

public interface ExecutorService extends Executor {

/**停止一次之前提交的任务
*/
void shutdown();

/**
* 停止所以正在执行的列表
* @return list of tasks that never commenced execution
*/
List<Runnable> shutdownNow();

/**
* Returns {@code true} if this executor has been shut down.
*
* @return {@code true} if this executor has been shut down
*/
boolean isShutdown();

/**
* 是否所以任务都被停止了,如果是,则返回 true
* @return {@code true} if all tasks have completed following shut down
*/
boolean isTerminated();

/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
*         {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
* 提交一个 Callable,返回值为 Future,Future 的get()方法会在任务成功完成后,返回结果
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);

/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
*         sequential order as produced by the iterator for the
*         given task list, each of which has completed
* @throws InterruptedException if interrupted while waiting, in
*         which case unfinished tasks are cancelled
* @throws NullPointerException if tasks or any of its elements are {@code null}
* @throws RejectedExecutionException if any task cannot be
*         scheduled for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Upon return, tasks that have not completed are cancelled.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
*         sequential order as produced by the iterator for the
*         given task list. If the operation did not time out,
*         each task will have completed. If it did time out, some
*         of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in
*         which case unfinished tasks are cancelled
* @throws NullPointerException if tasks, any of its elements, or
*         unit are {@code null}
* @throws RejectedExecutionException if any task cannot be scheduled
*         for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks or any element task
*         subject to execution is {@code null}
* @throws IllegalArgumentException if tasks is empty
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
*         for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do before the given timeout elapses.
* Upon normal or exceptional return, tasks that have not
* completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks, or unit, or any element
*         task subject to execution is {@code null}
* @throws TimeoutException if the given timeout elapses before
*         any task successfully completes
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
*         for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}


void shutdown()

停止任务,但是不会等待之前提交任务完成之后,再停止

List showdownNow()

停止只在执行和等待执行的任务,并且返回等待执行的任务列表

Future submit(Callable task)

提交一个 Callable 类型的任务,返回一个可获取结果的 Future 对象,Future.get()方法会返回结果,直到任务完成。

Future
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息