您的位置:首页 > 编程语言 > Java开发

java并发(七、高级并发对象)

2017-07-20 15:17 239 查看
到目前,这课程从一开就集中在java
平台的部分低级API
。这些API
适合于适合于非常基础的任务,但是高级构建需要更高级的任务。特别是在今天充分利用多核心系统上的并发应用。

在这一章,我们将考虑5.0
版本的java
平台引进的一些高级并发特点的。大多数特性是在java.util.concurrent
包下实现的。在Java
集合框架中也有新的并发数据结构。



Lock
对象支持的锁方案简化了并发应用。



Executors
定义了一个创建和管理线程的高级API
。Executor
实现在java.util.concurrent
中,提供了大规模应用的线程管理。



Concurrent collections
使管理大数据集合变得简单,能有效同步的需要。



Atomic variables
具有减少并发需要的功能,能避免内存不一致错误。



ThreadLocalRandom
(JDK 7
)提供了多线程伪随机数。


(一)

锁对象

同步代码依赖于一个简单的可重入锁。这种锁使用简单,但有很多限制。Java.util.concurrent.locks
包提供了更复杂的锁方案。我们不会详细看这个包,而是关注最基础的接口Lock


   

锁对象的工作和同步代码的隐式锁很像。使用隐式锁,只能一个线程一次能够拥一个锁。锁对象也支持wait/notify
机制,通过他的Condition
对象。

   

锁对象比隐式锁最大的好处是能够收回获取锁的尝试。tryLock
方法能够实现当锁不能立即使用或者超时时收回。lockInterruptibly
方法是在锁被获取之前,如果另一个线程发送一个中断时收回。

   

让我们使用锁对象来解决Liveness
中的死锁问题。Alphonse
和Gaston
已经训练他们自己去注意鞠躬。我们做出了这个改善的模型,通过要求我们的Friend
对象在继续鞠躬之前必须先获得所有参与者的锁。Safelock
是一个改善模型的源代码。为了展示这个方案的作用,我们假设Alphonse
和Gaston
沉醉于能够安全鞠躬的新发现,以至于不停的鞠躬。

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import java.util.Random;

public class Safelock {

    
static class Friend {

        
private final String name;

        
private final Lock lock = new ReentrantLock();

        
public Friend(String name) {

             
this.name = name;

        
}

        
public String getName() {

             
return this.name;

        
}

        
public boolean impendingBow(Friend bower) {

             
Boolean myLock = false;

             
Boolean yourLock = false;

             
try {

                  
myLock = lock.tryLock();

                  
yourLock = bower.lock.tryLock();

             
} finally {

                  
if (! (myLock && yourLock)) {

                      
if (myLock) {

                           
lock.unlock();

                      
}

                      
if (yourLock) {

                           
bower.lock.unlock();

                      
}

                  
}

             
}

             
return myLock && yourLock;

        
}

        
public void bow(Friend bower) {

         
4000
    
if (impendingBow(bower)) {

                  
try {

                      
System.out.format("%s: %s has"+ " bowed to me!%n", this.name, bower.getName());

                      
bower.bowBack(this);

                  
} finally {

                      
lock.unlock();

                      
bower.lock.unlock();

                  
}

             
} else {

                  
System.out.format("%s: %s started"+ " to bow to me, but saw that"

                      
+ " I was already bowing to"+ " him.%n",

                      
this.name, bower.getName());

             
}

        
}

        
public void bowBack(Friend bower) {

             
System.out.format("%s: %s has" +" bowed back to me!%n",

                  
this.name, bower.getName());

        
}

    
}

    
static class BowLoop implements Runnable {

        
private Friend bower;

        
private Friend bowee;

        
public BowLoop(Friend bower, Friend bowee) {

             
this.bower = bower;

             
this.bowee = bowee;

        
}

        
public void run() {

             
Random random = new Random();

             
for (;;) {

                  
try {

                      
Thread.sleep(random.nextInt(10));

                  
} catch (InterruptedException e) {}

                  
bowee.bow(bower);

             
}

        
}

    
}

    
public static void main(String[] args) {

        
final Friend alphonse =new Friend("Alphonse");

        
final Friend gaston =new Friend("Gaston");

        
new Thread(new BowLoop(alphonse, gaston)).start();

        
new Thread(new BowLoop(gaston, alphonse)).start();

    
}

}


(二)

Executors

在之前提供的所有例子中,都和新线程和任务之间有关,或者是Runnable
的对象,或者是Thread
对象。这只适用于小的应用,但不适合大规模应用,把线程管理和创建和应用分离式有意义的。这样方法封装在叫做执行器的对象里。接下俩详细介绍执行器。

·        

Executor
接口定义了三种执行器对象类型。

·        

Thread Pools
是最常有的一种实现。

·        

Fork/Join
是一个有利于多处理器的框架(JDK7



1.

Executor

接口

java.util.concurrent
包定义了三种执行器接口:

·        

Executor,
一个简单的支持启动新任务的接口。

·        

ExecutorService
是Executor
的子接口,提供了管理个人任务和执行器生存周期的附加功能。

·        

ScheduledExecutorService
,一个ExecutorService
的子接口,支持任务执行周期的and/or
功能。

典型的,传给executor
对的参数是三种接口类型的一种,而不是executor
类型。


1)

Executor

接口

Executor
接口提供一个单独方法,execute
,给一个普通的线程创建方案提供顺序替换。如果r
是一个Runnable
的对象,e
是Executor
对象。

   

(new Thread(r)).start();

替换成e.execute(r);

然而,execute
没有什么特殊的地方。一个创建新线程和立即启动它的低级方案。依赖于Executor
的实现,execute
做了同样的事情,但是,它很可能使用已经存在的工作线程去运行r
,或者将r
放到等待队列里,直到工作线程可用。(我们将在Thread Pools
中描述工作线程)。

   
java.util.concurrent
中执行器的实现,重复利用了高级的ExecutorService
和ScheduleExecutorService
接口,索然他们也是基于Executor
接口的。


2)

ExecutorService

接口

ExecutorService
接口相似地实现execute
方法,但是有更有用的提交方法。像execute
一样,接收一个Runable
对象,但是也接收Callable
对象,它可以允许线程有返回值。它会返回Future
对象,用来取Callable
的返回值和管理Callable
和Runnable
任务的状态。

ExecuteService
也提供了提交Callable
对象的大集合方法,最后,ExecutorService
提供一些执行器关闭的管理方法。支持立即停止,任务正确处理中断。


3)

ScheduleExecutorService

接口

ScheduleExecutorService
接口为它的父接口ExecutorService
补充了计划,可以执行一个延迟来执行Runnable
或者Callable
任务。还有,这个接口定义了scheduleAtFixedRate
和scheduleWithFixedDelay
,可以按照定义间隔重复执行指定任务。


2.

线程池

Java.util.concurrent
中的大多数执行器实现使用由工作线程组成的线程池。这种线程分别来自于Runnable
和Callable
,他们经常执行联合任务。

   

使用工作线程,减小了线程创建的开销。线程对象大量内存,正在大规模应用中,分配和销毁线程对象会消耗大量的内存管理开销。

一个普遍使用的线程池是固定线程池(fixed thread pool
)。这种类型的线程池有运行固定的线程数;如果正在使用的线程由于某种原因终止,会自动替换成一个新的线程。任务经过一个内存队列提交,当任务数大于线程数,队列保存多出的任务。

固定线程池的一个重要优势是使用它是优雅降低(degrade gracefully
)。要理解它,想象一下web
服务器每个HTTP
请求使用一个单独的线程。如果应用简单的为每个HTTP
请求创建一个线程,系统接受的线程数比它能够处理的多,这样系统将停止响应所有请求,当这些线程的开销超出了系统处理能力。给线程的创建加个限制,应用不会立即处理所有请求,但是会再能力范围内立即处理。

·        

使用创建固定线程池创建执行器的简单方法是调用在java.util.concurrent.Executors
中的newFixedThreadPool
工厂方法,这个类也提供下列工厂方法:

·        

NewCachedThreadPool
的方法创建一个可扩展线程池的执行器。它适合于启动很多短声明周期任务的应用。

·        

NewSingleThreadExecutor
方法创建单线程执行器。

·        

还有几个工厂方法是上面执行器的ScheduledExecutorService
版本。

如果上面的工厂方法不能满足你的需要,java.util.concurrent.ThreadPoolExecutor
或者java.util.concurrent.ThreadPoolExecutor
将给你提供附加功能。


3.

Fork/Join

Java SE 7
中的新特性,fork/join
框架是一个ExecutorService
接口的实现,帮你利用多处理器(系统)。它可以强行进入较小的递归快。目的是使
11240
用所有可用的处理能力去增加你的应用的性能。

和任何ExecutorService
一样,fork/join
把任务分配给线程池里的工作线程。Fork/join
框架的不同是使用工作-
抢断(work-stealing
)算法。工作线程可以抢断其他正在工作线程的任务。

Fork/join
框架的核心是ForkJoinPool
类,AbstractExecutorService
的一个扩展。ForkJoinPool
实现了work-stealing
算法能够执行ForkJoinTasks



1)
 

基本应用

Fork/join
的使用很简单。第一步是写代码执行一个工作的一部分,你的代码可能如下:

if (my portion of the work is small enough)

do the work directly

else

split my work into two pieces

invoke the two pieces and wait for the results

 

包装这段代码做为ForkJoinTask
的子类,通常做为更专业的类型RecursiceTask(
返回结果)
或者RecursiveAction


你的ForkJoinTask
准备好之后,创建所有工程完成的表示和把他传给ForkJoinPool
实例的invoke
方法


2)

模糊到清晰

为了帮助你理解Fork/join
框架如何工作,看一个简单的例子。假设你想让一个图像模糊。整数数组代表原始图像,一个整数代表一个像素的颜色值。被模糊之后的图像也是同样大小的整数数组。

通过一个改变一个像素来完整模糊。每个像素变成它周围像素的平均值(红,绿,蓝部分被平均),结构放到目的数组中。这是一个可能的实现。

public class ForkBlur extends RecursiveAction {

    
private int[] mSource;

    
private int mStart;

    
private int mLength;

    
private int[] mDestination;

    
// Processing window size, should be odd.

    
private int mBlurWidth = 15;

    
public ForkBlur(int[] src, int start, int length, int[] dst) {

        
mSource = src;

        
mStart = start;

        
mLength = length;

        
mDestination = dst;

    
}

    
protected void computeDirectly() {

        
int sidePixels = (mBlurWidth - 1) / 2;

        
for (int index = mStart; index < mStart + mLength; index++) {

             
// Calculate average.

             
float rt = 0, gt = 0, bt = 0;

             
for (int mi = -sidePixels; mi <= sidePixels; mi++) {

                  
int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);

                  
int pixel = mSource[mindex];

                  
rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth;

                  
gt += (float)((pixel & 0x0000ff00) >> 
8) / mBlurWidth;

                  
bt += (float)((pixel & 0x000000ff) >> 
0) / mBlurWidth;

             
}

             
// Re-assemble destination pixel.

             
int dpixel = (0xff000000   
) | (((int)rt) << 16) | (((int)gt) << 
8) | (((int)bt) << 
0);

             
mDestination[index] = dpixel;

        
}

    
}

 
...

现在,你实现了抽象compute()
方法,或者直接执行blur
或者分成两个小的任务。一个简单的数组长度阀值,决定直接执行还是分段执行。

protected static int sThreshold = 100000;

protected void compute() {

    
if (mLength < sThreshold) {

        
computeDirectly();

        
return;

    
}

    
int split = mLength / 2;

    
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),

    
new ForkBlur(mSource, mStart + split, mLength - split, mDestination))
;

}

如果前面的方法在RecursiveAction
类的子类里,设置在ForkJoinPool
中运行时简单的。

创建一个代表所有工作完成的任务。

// source image pixels are in src

// destination image pixels are in dst

ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

Create the
ForkJoinPool
that will run the task.

ForkJoinPool pool = new ForkJoinPool();

Run the task.

pool.invoke(fb);

完整的代码,包括在windows
中展示原图片到目标图片的扩展代码,看ForkBlur
类。


(三)

并发集合

Java.util.concurrent
包包括一些附加java
框架集合。大多数可以通过接口分类:

·        

BlockingQueue
定义了一个先进先出数据结构,当你向一个慢队列中添加或者检索一个空队列,它会阻塞或者超时



·        

ConcurrentMap
是java.util.Map
的子接口,定义了很有用的原子操作。这些操作移除或替换一个key-vlaue
,如果key
存在,或者添加一个key-value
如果key
不存在。标记这些操作是原子的,避免使用同步。ConcurrentMap
的标准多用途实现是ConcurrentHashMap
,是HashMap
的并发模式。

·        

ConcurrentNavigableMap
是ConcurrentMap
的子接口,它支持近似匹配。ConcurrentNavigableMap
的标准一般用途实现是ConcurrentSkipListMap
,它是TreeMap
的并发模式。

所有这些结合避免内存不一致,通过在添加元素操作和以后的移除或者访问操作之间建立happens-before
关系。


(四)

原子变量

java.util.concurrent.atomic

包定义了对单个变量的原子操作。所有类有get
和set
方法,像对volatile
变量读写一样工作。就是说,set
操作有happens-before
关系和后面的对同变量的get
操作。原子的compareAndSet
方法也有内存一致功能,就像使用简单的原子算法、应用于integer
原子变量的方法。

为了看这个包的使用,让我们回到最初我们演示线程冲突的Counter
类:

class Counter {

   
private int c = 0;

   
public void increment() {

       
c++;

   
}

   
public void decrement() {

  
     
c--;

   
}

   
public int value() {

       
return c;

   
}

}

使Counter
避免线程冲突的一个方法是,给它的方法加同步,看
SynchronizedCounter



 

class SynchronizedCounter {

   
private int c = 0;

   
public synchronized void increment() {

       
c++;

   
}

   
public synchronized void decrement() {

       
c--;

   
}

   
public synchronized int value() {

       
return c;

   
}

}

这个简单的类,同步是可接受的解决办法。但是对于更复杂的类,我们想要避免同步带来的对活跃性的影响。将域替换成AtomicInteger
,不用同步来避免线程冲突,正如
AtomicCounter



import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {

   
private AtomicInteger c = new AtomicInteger(0);

   
public void increment() {

       
c.incrementAndGet();

   
}

   
public void decrement() {

       
c.decrementAndGet();

   
}

   
public int value() {

       
return c.get();

   
}

}


(五)

并发随机数

在JDK7
中,java.util.concurrent
包括一个方便的类,ThreadLocalRandom
,应用于多线程或者ForkJoinTasks
中使用随机数的工程。

   

对于并发访问,用ThreadLocalRandom
代替Math.random()
会减少竞争,性能更好。

你要做的只是调用
ThreadLocalRandom.current()


,然后调用它的方法返回随机数,这是一个列子:

int r = ThreadLocalRandom.current() .nextInt(4, 77);

 
 
 
进一步阅读

《Concurrent Programming in Java : Design Principles and Pattern (2nd Edition)
》, Doug Lea.
一个权威专家,
也是java
平台并发框架的架构师。

《Java Concurrency in Practice
》 ,Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea.
一个适合初学者的指导手册。

《Effective Java Programming Language Guide (2nd Edition)
》,Joshua Bloch. Though this is a general programming guide
尽管这是一个程序设计入门,
但是它每章都含有并发程序的最好的例子。

《Concurrency: State Models & Java Programs (2nd Edition)
》, by Jeff Magee and Jeff Kramer.
通过建模和实际例子介绍并发程序设计

Java Concurrent Animated

:
展示并发特点的动画片。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: