您的位置:首页 > 编程语言 > Java开发

Java并发编程最佳实例详解系列

2018-04-26 20:22 375 查看

Java并发编程最佳实例详解系列:

Java并发编程(一)线程定义、状态和属性

Java并发编程(一)线程定义、状态和属性

线程是指程序在执行过程中,能够执行程序代码的一个执行单元。在java语言中,线程有四种状态:运行 、就绪、挂起和结束。 进程是指一段正在执行的程序。而线程有时也被成为轻量级的进程,他是程序执行的最小单元,一个进程可以拥有多个线程,各个线程之间共享程序的内功空间(代码段、数据段和堆空间)及一些进程级的资源(例如打开的文件),但是各个线程都拥有自己的棧空间。

一 、线程和进程

1. 什么是线程和进程的区别:
线程是指程序在执行过程中,能够执行程序代码的一个执行单元。在java语言中,线程有四种状态:运行 、就绪、挂起和结束。
进程是指一段正在执行的程序。而线程有时也被成为轻量级的进程,他是程序执行的最小单元,一个进程可以拥有多个线程,各个线程之间共享程序的内功空间(代码段、数据段和堆空间)及一些进程级的资源(例如打开的文件),但是各个线程都拥有自己的棧空间。
2. 为何要使用多进程
在操作系统级别上来看主要有以下几个方面:

  • 使用多线程可以减少程序的响应时间,如果某个操作和耗时,或者陷入长时间的等待,此时程序讲不会响应鼠标和键盘等的操作,使用多线程后可以把这个耗时的线程分配到一个单独的线程去执行,从而使程序具备了更好的交互性。
  • 与进程相比,线程创建和切换开销更小,同时多线程在数据共享方面效率非常高。
  • 多CPU或者多核计算机本身就具备执行多线程的能力,如果使用单个进程,将无法重复利用计算机资源,造成资源的巨大浪费。在多CPU计算机使用多线程能提高CPU的利用率。
  • 使用多线程能简化程序的结构,使程序便于理解和维护

二、创建线程

多线程的实现一般有以下三种方法其中前两种为最常用的方法:
1. 继承Thread类,重写run()方法
Thread本质上也是实现了Runnable接口的一个实例。需要注意的是调用start()方法后并不是是立即的执行多线程的代码,而是使该线程变为可运行态,什么时候运行多线程代码是由操作系统决定的。
以下是主要步骤:
(1)定义Thread类的子类,并重写该类的run方法,该run方法的方法体就代表了线程要完成的任务。因此把run()方法称为执行体。
(2)创建Thread子类的实例,即创建了线程对象。
(3)调用线程对象的start()方法来启动该线程。

public class TestThread extends Thread{
public void run() {
System.out.println(“Hello World”);
}
public static void main(String[] args) {
Thread mThread = new TestThread();
mThread.start();
}
}

2. 实现Runnable接口,并实现该接口的run()方法
以下是主要步骤:
(1)自定义类并实现Runnable接口,实现run()方法。
(2)创建Thread子类的实例,用实现Runnable接口的对象作为参数实例化该Thread对象。
(3)调用Thread的start()方法来启动该线程。

public class TestRunnable implements Runnable {
public void run() {
System.out.println(“Hello World”);
}
}

public class TestRunnable {
public static void main(String[] args) {
TestRunnable mTestRunnable = new TestRunnable();
Thread mThread = new Thread(mTestRunnable);
mThread.start();
}
}

3. 实现Callable接口,重写call()方法
Callable接口实际是属于Executor框架中的功能类,Callable接口与Runnable接口的功能类似,但提供了比Runnable更强大的功能,主要表现为以下的3点:
(1)Callable可以在任务接受后提供一个返回值,Runnable无法提供这个功能。
(2)Callable中的call()方法可以抛出异常,而Runnable的run()方法不能抛出异常。
(3)运行Callable可以拿到一个Future对象,Future对象表示异步计算的结果,他提供了检查计算是否完成的方法。由于线程属于异步计算模型,因此无法从别的线程中得到函数的返回值,在这种情况下就可以使用Future来监视目标线程调用call()方法的情况,但调用Future的get()方法以获取结果时,当前线程就会阻塞,直到call()方法的返回结果。

public class TestCallable {
//创建线程类
public static class MyTestCallable implements Callable {
public String call() throws Exception {
retun “Hello World”;
}
}
public static void main(String[] args) {
MyTestCallable mMyTestCallable= new MyTestCallable();
ExecutorService mExecutorService = Executors.newSingleThreadPool();
Future mfuture = mExecutorService.submit(mMyTestCallable);
try {
//等待线程结束,并返回结果
System.out.println(mfuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}

上述程序的输出结果为:Hello World

在这三种方式中,一般推荐实现Runnable接口的方式,其原因是:首先,Thread类定义了多种方法可以被派生类使用重写,但是只有run()方法是必须被重写的,实现这个线程的主要功能,这也是实现Runnable接口需要的方法。其次,一个类应该在他们需要加强或者修改时才会被继承。因此如果没有必要重写Thread类的其他方法,那么在这种情况下最好是用实现Runnable接口的方式。

三、中断线程

当线程的run()方法执行方法体中的最后一条语句后,并经由执行return语句返回时,或者出现在方法中没有捕获的异常时线程将终止。在java早期版本中有一个stop方法,其他线程可以调用它终止线程,但是这个方法现在已经被弃用了。
interrupt方法可以用来请求终止线程,当一个线程调用interrupt方法时,线程的中断状态将被置位。这是每个线程都具有的boolean标志,每个线程都应该不时的检查这个标志,来判断线程是否被中断。
要想弄清线程是否被置位,可以调用Thread.currentThread().isInterrupted():

while(!Thread.currentThread().isInterrupted()){
do something
}

但是如果一个线程被阻塞,就无法检测中断状态。这是产生InterruptedException的地方。当一个被阻塞的线程(调用sleep或者wait)上调用interrupt方法。阻塞调用将会被InterruptedException中断。
如果每次迭代之后都调用sleep方法(或者其他可中断的方法),isInterrupted检测就没必要也没用处了,如果在中断状态被置位时调用sleep方法,它不会休眠反而会清除这一状态并抛出InterruptedException。所以如果在循环中调用sleep,不要去检测中断状态,只需捕获InterruptedException。
在很多发布的代码中会发现InterruptedException被抑制在很低的层次上:

void myTask(){
…
try{
sleep(50)
}catch(InterruptedException e){
…
}
}

不要这样做,如果不认为catch中做一处理有什么好处的话,有两种合理的选择:

  • 在catch中调用Thread.currentThread().interrup()来设置中断状态。调用者可以对其进行检测
  • 更好的选择用throw InterruptedException标记你的方法,不采用try语句块来捕获已成。这样调用者可以捕获这个异常:
void myTask()throw InterruptedException{
sleep(50)
}

四、线程的状态

(1). 新建状态(New):新创建了一个线程对象。
(2). 就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于可运行线程池中,变得可运行,等待获取CPU的使用权。
(3). 运行状态(Running):就绪状态的线程获取了CPU,执行程序代码。
(4). 阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:

  • 等待阻塞:运行的线程执行wait()方法,JVM会把该线程放入等待池中。

  • 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中。

  • 其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。

    (5). 死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。

五、线程的优先级和守护线程

1. 线程优先级
在java中,每一个线程有一个优先级,默认情况下,一个线程继承它父类的优先级。可以用setPriority方法提高或降低任何一个线程优先级。可以将优先级设置在MIN_PRIORITY(在Thread类定义为1)与MAX_PRIORITY(在Thread类定义为10)之间的任何值。线程的默认优先级为NORM_PRIORITY(在Thread类定义为5)。
尽量不要依赖优先级,如果确实要用,应该避免初学者常犯的一个错误。如果有几个高优先级的线程没有进入非活动状态,低优先级线程可能永远也不能执行。每当调度器决定运行一个新线程时,首先会在具有搞优先级的线程中进行选择,尽管这样会使低优先级的线程完全饿死。

2. 守护线程

调用setDaemon(true);将线程转换为守护线程。守护线程唯一的用途就是为其他线程提供服务。计时线程就是一个例子,他定时发送信号给其他线程或者清空过时的告诉缓存项的线程。当只剩下守护线程时,虚拟机就退出了,由于如果只剩下守护线程,就没必要继续运行程序了。
另外JVM的垃圾回收、内存管理等线程都是守护线程。还有就是在做数据库应用时候,使用的数据库连接池,连接池本身也包含着很多后台线程,监控连接个数、超时时间、状态等等。

Java并发编程(二)同步

线程是指程序在执行过程中,能够执行程序代码的一个执行单元。在java语言中,线程有四种状态:运行 、就绪、挂起和结束。 进程是指一段正在执行的程序。而线程有时也被成为轻量级的进程,他是程序执行的最小单元,一个进程可以拥有多个线程,各个线程之间共享程序的内功空间(代码段、数据段和堆空间)及一些进程级的资源(例如打开的文件),但是各个线程都拥有自己的棧空间。

如果你的java基础较弱,或者不大了解java多线程请先看这篇文章java多线程(一)线程定义、状态和属性

同步一直是java多线程的难点,在我们做android开发时也很少应用,但这并不是我们不熟悉同步的理由。希望这篇文章能使更多的人能够了解并且应用java的同步。
在多线程的应用中,两个或者两个以上的线程需要共享对同一个数据的存取。如果两个线程存取相同的对象,并且每一个线程都调用了修改该对象的方法,这种情况通常成为竞争条件。
竞争条件最容易理解的例子就是:比如火车卖票,火车票是一定的,但卖火车票的窗口到处都有,每个窗口就相当于一个线程,这么多的线程共用所有的火车票这个资源。并且无法保证其原子性,如果在一个时间点上,两个线程同时使用这个资源,那他们取出的火车票是一样的(座位号一样),这样就会给乘客造成麻烦。解决方法为,当一个线程要使用火车票这个资源时,我们就交给它一把锁,等它把事情做完后在把锁给另一个要用这个资源的线程。这样就不会出现上述情况。

1. 锁对象

synchronized关键字自动提供了锁以及相关的条件,大多数需要显式锁的情况使用synchronized非常的方便,但是等我们了解ReentrantLock类和条件对象时,我们能更好的理解synchronized关键字。ReentrantLock是JAVA SE 5.0引入的, 用ReentrantLock保护代码块的结构如下:

mLock.lock();
try{
…
}
finally{
mLock.unlock();
}

这一结构确保任何时刻只有一个线程进入临界区,一旦一个线程封锁了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,它们则被阻塞直到第一个线程释放锁对象。把解锁的操作放在finally中是十分必要的,如果在临界区发生了异常,锁是必须要释放的,否则其他线程将会永远阻塞。

2. 条件对象

进入临界区时,却发现在某一个条件满足之后,它才能执行。要使用一个条件对象来管理那些已经获得了一个锁但是却不能做有用工作的线程,条件对象又称作条件变量。
我们来看看下面的例子来看看为何需要条件对象

假设一个场景我们需要用银行转账,我们首先写了银行的类,它的构造函数需要传入账户数量和账户金额

public class Bank {
private double[] accounts;
private Lock bankLock;
public Bank(int n,double initialBalance){
accounts=new double
;
bankLock=new ReentrantLock();
for (int i=0;i<accounts.length;i++){
accounts[i]=initialBalance;
}
}
}

接下来我们要提款,写一个提款的方法,from是转账方,to是接收方,amount转账金额,结果我们发现转账方余额不足,如果有其他线程给这个转账方再存足够的钱就可以转账成功了,但是这个线程已经获取了锁,它具有排他性,别的线程也无法获取锁来进行存款操作,这就是我们需要引入条件对象的原因。

public void transfer(int from,int to,int amount){
bankLock.lock();
try{
while (accounts[from]<amount){
//wait
}
}finally {
bankLock.unlock();
}
}

一个锁对象拥有多个相关的条件对象,可以用newCondition方法获得一个条件对象,我们得到条件对象后调用await方法,当前线程就被阻塞了并放弃了锁

public class Bank {
private double[] accounts;
private Lock bankLock;
private Condition condition;
public Bank(int n,double initialBalance){
accounts=new double
;
bankLock=new ReentrantLock();
//得到条件对象
condition=bankLock.newCondition();
for (int i=0;i<accounts.length;i++){
accounts[i]=initialBalance;
}
}
public void transfer(int from,int to,int amount) throws InterruptedException {
bankLock.lock();
try{
while (accounts[from]<amount){
//阻塞当前线程,并放弃锁
condition.await();
}
}finally {
bankLock.unlock();
}
}
}

等待获得锁的线程和调用await方法的线程本质上是不同的,一旦一个线程调用的await方法,他就会进入该条件的等待集。当锁可用时,该线程不能马上解锁,相反他处于阻塞状态,直到另一个线程调用了同一个条件上的signalAll方法时为止。当另一个线程准备转账给我们此前的转账方时,只要调用condition.signalAll();该调用会重新激活因为这一条件而等待的所有线程。
当一个线程调用了await方法他没法重新激活自身,并寄希望于其他线程来调用signalAll方法来激活自身,如果没有其他线程来激活等待的线程,那么就会产生死锁现象,如果所有的其他线程都被阻塞,最后一个活动线程在解除其他线程阻塞状态前调用await,那么它也被阻塞,就没有任何线程可以解除其他线程的阻塞,程序就被挂起了。
那何时调用signalAll呢?正常来说应该是有利于等待线程的方向改变时来调用signalAll。在这个例子里就是,当一个账户余额发生变化时,等待的线程应该有机会检查余额。

public void transfer(int from,int to,int amount) throws InterruptedException {
bankLock.lock();
try{
while (accounts[from]<amount){
//阻塞当前线程,并放弃锁
condition.await();
}
//转账的操作
…
condition.signalAll();
}finally {
bankLock.unlock();
}
}

当调用signalAll方法时并不是立即激活一个等待线程,它仅仅解除了等待线程的阻塞,以便这些线程能够在当前线程退出同步方法后,通过竞争实现对对象的访问。还有一个方法是signal,它则是随机解除某个线程的阻塞,如果该线程仍然不能运行,那么则再次被阻塞,如果没有其他线程再次调用signal,那么系统就死锁了。

3. Synchronized关键字

Lock和Condition接口为程序设计人员提供了高度的锁定控制,然而大多数情况下,并不需要那样的控制,并且可以使用一种嵌入到java语言内部的机制。从Java1.0版开始,Java中的每一个对象都有一个内部锁。如果一个方法用synchronized关键字声明,那么对象的锁将保护整个方法。也就是说,要调用该方法,线程必须获得内部的对象锁。
换句话说,

public synchronized void method(){

}

等价于

public void method(){
this.lock.lock();
try{

}finally{
this.lock.unlock();
}

上面银行的例子,我们可以将Bank类的transfer方法声明为synchronized,而不是使用一个显示的锁。
内部对象锁只有一个相关条件,wait方法添加到一个线程到等待集中,notifyAll或者notify方法解除等待线程的阻塞状态。也就是说wait相当于调用condition.await(),notifyAll等价于condition.signalAll();

我们上面的例子transfer方法也可以这样写:

public synchronized void transfer(int from,int to,int amount)throws InterruptedException{
while (accounts[from]<amount) {
wait();
}
//转账的操作
…
notifyAll();
}

可以看到使用synchronized关键字来编写代码要简洁很多,当然要理解这一代码,你必须要了解每一个对象有一个内部锁,并且该锁有一个内部条件。由锁来管理那些试图进入synchronized方法的线程,由条件来管理那些调用wait的线程。

4. 同步阻塞

上面我们说过,每一个Java对象都有一个锁,线程可以调用同步方法来获得锁,还有另一种机制可以获得锁,通过进入一个同步阻塞,当线程进入如下形式的阻塞:

synchronized(obj){

}

于是他获得了obj的锁。再来看看Bank类

public class Bank {
private double[] accounts;
private Object lock=new Object();
public Bank(int n,double initialBalance){
accounts=new double
;
for (int i=0;i<accounts.length;i++){
accounts[i]=initialBalance;
}
}
public void transfer(int from,int to,int amount){
synchronized(lock){
//转账的操作
…
}
}
}

在此,lock对象创建仅仅是用来使用每个Java对象持有的锁。有时开发人员使用一个对象的锁来实现额外的原子操作,称为客户端锁定。例如Vector类,它的方法是同步的。现在假设在Vector中存储银行余额

public void transfer(Vector<Double>accounts,int from,int to,int amount){
accounts.set(from,accounts.get(from)-amount);
accounts.set(to,accounts.get(to)+amount;
}

Vecror类的get和set方法是同步的,但是这并未对我们有所帮助。在第一次对get调用完成以后,一个线程完全可能在transfer方法中被被剥夺运行权,于是另一个线程可能在相同的存储位置存入了不同的值,但是,我们可以截获这个锁

public void transfer(Vector<Double>accounts,int from,int to,int amount){
synchronized(accounts){
accounts.set(from,accounts.get(from)-amount);
accounts.set(to,accounts.get(to)+amount;
}
}

客户端锁定(同步代码块)是非常脆弱的,通常不推荐使用,一般实现同步最好用java.util.concurrent包下提供的类,比如阻塞队列。如果同步方法适合你的程序,那么请尽量的使用同步方法,他可以减少编写代码的数量,减少出错的几率,如果特别需要使用Lock/Condition结构提供的独有特性时,才使用Lock/Condition。

Java并发编程(三)volatile域

前言

有时仅仅为了读写一个或者两个实例域就使用同步的话,显得开销过大,volatile关键字为实例域的同步访问提供了免锁的机制。如果声明一个域为volatile,那么编译器和虚拟机就知道该域是可能被另一个线程并发更新的。再讲到volatile关键字之前我们需要了解一下内存模型的相关概念以及并发编程中的三个特性:原子性,可见性和有序性。

1. java内存模型与原子性,可见性和有序性

Java内存模型规定所有的变量都是存在主存当中,每个线程都有自己的工作内存。线程对变量的所有操作都必须在工作内存中进行,而不能直接对主存进行操作。并且每个线程不能访问其他线程的工作内存。
在java中,执行下面这个语句:

int i=3;

执行线程必须先在自己的工作线程中对变量i所在的缓存行进行赋值操作,然后再写入主存当中。而不是直接将数值3写入主存当中。
那么Java语言 本身对 原子性、可见性以及有序性提供了哪些保证呢?

原子性

对基本数据类型的变量的读取和赋值操作是原子性操作,即这些操作是不可被中断的,要么执行,要么不执行。
来看一下下面的代码:

x = 10; //语句1
y = x; //语句2
x++; //语句3
x = x + 1; //语句4

只有语句1是原子性操作,其他三个语句都不是原子性操作。
语句2实际上包含2个操作,它先要去读取x的值,再将x的值写入工作内存,虽然读取x的值以及 将x的值写入工作内存 这2个操作都是原子性操作,但是合起来就不是原子性操作了。
同样的,x++和 x = x+1包括3个操作:读取x的值,进行加1操作,写入新的值。
也就是说,只有简单的读取、赋值(而且必须是将数字赋值给某个变量,变量之间的相互赋值不是原子操作)才是原子操作。
java.util.concurrent.atomic包中有很多类使用了很高效的机器级指令(而不是使用锁)来保证其他操作的原子性。例如AtomicInteger类提供了方法incrementAndGet和decrementAndGet,它们分别以原子方式将一个整数自增和自减。可以安全地使用AtomicInteger类作为共享计数器而无需同步。
另外这个包还包含AtomicBoolean,AtomicLong和AtomicReference这些原子类仅供开发并发工具的系统程序员使用,应用程序员不应该使用这些类。

可见性

可见性,是指线程之间的可见性,一个线程修改的状态对另一个线程是可见的。也就是一个线程修改的结果。另一个线程马上就能看到。
当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,所以对其他线程是可见的,当有其他线程需要读取时,它会去内存中读取新值。
而普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。

有序性

在Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
可以通过volatile关键字来保证一定的“有序性”。另外可以通过synchronized和Lock来保证有序性,很显然,synchronized和Lock保证每个时刻是有一个线程执行同步代码,相当于是让线程顺序执行同步代码,自然就保证了有序性。

2. volatile关键字

一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:

  • 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
  • 禁止进行指令重排序。

先看一段代码,假如线程1先执行,线程2后执行:

//线程1
boolean stop = false;
while(!stop){
doSomething();
}

//线程2
stop = true;

很多人在中断线程时可能都会采用这种标记办法。但是事实上,这段代码会完全运行正确么?即一定会将线程中断么?不一定,也许在大多数时候,这个代码能够把线程中断,但是也有可能会导致无法中断线程(虽然这个可能性很小,但是只要一旦发生这种情况就会造成死循环了)。
为何有可能导致无法中断线程?每个线程在运行过程中都有自己的工作内存,那么线程1在运行的时候,会将stop变量的值拷贝一份放在自己的工作内存当中。那么当线程2更改了stop变量的值之后,但是还没来得及写入主存当中,线程2转去做其他事情了,那么线程1由于不知道线程2对stop变量的更改,因此还会一直循环下去。
但是用volatile修饰之后就变得不一样了:

  • 使用volatile关键字会强制将修改的值立即写入主存;
  • 使用volatile关键字的话,当线程2进行修改时,会导致线程1的工作内存中缓存变量stop的缓存行无效;
  • 由于线程1的工作内存中缓存变量stop的缓存行无效,所以线程1再次读取变量stop的值时会去主存读取。

volatile保证原子性吗?

我们知道volatile关键字保证了操作的可见性,但是volatile能保证对变量的操作是原子性吗?

public class Test {
public volatile int inc = 0;
public void increase() {
inc++;
}

public static void main(String[] args) {
final Test test = new Test();
for(int i=0;i<10;i++){
new Thread(){
public void run() {
for(int j=0;j<1000;j++)
test.increase();
};
}.start();
}
//保证前面的线程都执行完
while(Thread.activeCount()>1)
Thread.yield();
System.out.println(test.inc);
}
}

这段代码每次运行结果都不一致,都是一个小于10000的数字,在前面已经提到过,自增操作是不具备原子性的,它包括读取变量的原始值、进行加1操作、写入工作内存。那么就是说自增操作的三个子操作可能会分割开执行。
假如某个时刻变量inc的值为10,线程1对变量进行自增操作,线程1先读取了变量inc的原始值,然后线程1被阻塞了;然后线程2对变量进行自增操作,线程2也去读取变量inc的原始值,然后进行加1操作,并把11写入工作内存,最后写入主存。线程1接着进行加1操作,由于此前已经读取了inc的值,此时在线程1的工作内存中inc的值仍然为10,所以线程1对inc进行加1操作后inc的值为11,然后将11写入工作内存,最后写入主存。那么两个线程分别进行了一次自增操作后,inc只增加了1。
自增操作不是原子性操作,而且volatile也无法保证对变量的任何操作都是原子性的。

volatile能保证有序性吗?

在前面提到volatile关键字能禁止指令重排序,所以volatile能在一定程度上保证有序性。
volatile关键字禁止指令重排序有两层意思:

  • 当程序执行到volatile变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见;在其后面的操作肯定还没有进行;
  • 在进行指令优化时,不能将在对volatile变量访问的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行。

3. 正确使用volatile关键字

synchronized关键字是防止多个线程同时执行一段代码,那么就会很影响程序执行效率,而volatile关键字在某些情况下性能要优于synchronized,但是要注意volatile关键字是无法替代synchronized关键字的,因为volatile关键字无法保证操作的原子性。通常来说,使用volatile必须具备以下2个条件:

  • 对变量的写操作不依赖于当前值
  • 该变量没有包含在具有其他变量的不变式中

第一个条件就是不能是自增自减等操作,上文已经提到volatile不保证原子性。
第二个条件我们来举个例子它包含了一个不变式 :下界总是小于或等于上界

public class NumberRange {
private volatile int lower, upper;
public int getLower() { return lower; }
public int getUpper() { return upper; }
public void setLower(int value) {
if (value > upper)
throw new IllegalArgumentException(…);
lower = value;
}
public void setUpper(int value) {
if (value < lower)
throw new IllegalArgumentException(…);
upper = value;
}
}

这种方式限制了范围的状态变量,因此将 lower 和 upper 字段定义为 volatile 类型不能够充分实现类的线程安全,从而仍然需要使用同步。否则,如果凑巧两个线程在同一时间使用不一致的值执行 setLower 和 setUpper 的话,则会使范围处于不一致的状态。例如,如果初始状态是 (0, 5),同一时间内,线程 A 调用 setLower(4) 并且线程 B 调用 setUpper(3),显然这两个操作交叉存入的值是不符合条件的,那么两个线程都会通过用于保护不变式的检查,使得最后的范围值是 (4, 3),这显然是不对的。
其实就是要保证操作的原子性就可以使用volatile,使用volatile主要有两个场景:

状态标志

volatile boolean shutdownRequested;
…
public void shutdown()
{
shutdownRequested = true;
}
public void doWork() {
while (!shutdownRequested) {
// do stuff
}
}

很可能会从循环外部调用 shutdown() 方法 —— 即在另一个线程中 —— 因此,需要执行某种同步来确保正确实现 shutdownRequested 变量的可见性。然而,使用 synchronized 块编写循环要比使用volatile 状态标志编写麻烦很多。由于 volatile 简化了编码,并且状态标志并不依赖于程序内任何其他状态,因此此处非常适合使用 volatile。

双重检查模式 (DCL)

public class Singleton {
private volatile static Singleton instance = null;
public static Singleton getInstance() {
if (instance == null) {
synchronized(this) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}

在这里使用volatile会或多或少的影响性能,但考虑到程序的正确性,牺牲这点性能还是值得的。
DCL优点是资源利用率高,第一次执行getInstance时单例对象才被实例化,效率高。缺点是第一次加载时反应稍慢一些,在高并发环境下也有一定的缺陷,虽然发生的概率很小。
DCL虽然在一定程度解决了资源的消耗和多余的同步,线程安全等问题,但是他还是在某些情况会出现失效的问题,也就是DCL失效,在《java并发编程实践》一书建议用以下的代码(静态内部类单例模式)来替代DCL:

public class Singleton {
private Singleton(){
}
public static Singleton getInstance(){
return SingletonHolder.sInstance;
}
private static class SingletonHolder {
private static final Singleton sInstance = new Singleton();
}
}

关于双重检查可以查看https://www.geek-share.com/detail/2481044180.html

4. 总结

与锁相比,Volatile 变量是一种非常简单但同时又非常脆弱的同步机制,它在某些情况下将提供优于锁的性能和伸缩性。如果严格遵循 volatile 的使用条件即变量真正独立于其他变量和自己以前的值 ,在某些情况下可以使用 volatile 代替 synchronized 来简化代码。然而,使用 volatile 的代码往往比使用锁的代码更加容易出错。本文介绍了可以使用 volatile 代替 synchronized 的最常见的两种用例,其他的情况我们最好还是去使用synchronized 。

Java并发编程(四)Java内存模型

前言

此前我们讲到了线程、同步以及volatile关键字,对于Java的并发编程我们有必要了解下Java的内存模型,因为Java线程之间的通信对于工程师来言是完全透明的,内存可见性问题很容易使工程师们觉得困惑,这篇文章我们来主要的讲下Java内存模型的相关概念。

1.共享内存和消息传递

线程之间的通信机制有两种:共享内存和消息传递;在共享内存的并发模型里,线程之间共享程序的公共状态,线程之间通过写-读内存中的公共状态来隐式进行通信。在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过明确的发送消息来显式进行通信。
同步是指程序用于控制不同线程之间操作发生相对顺序的机制。在共享内存并发模型里,同步是显式进行的。工程师必须显式指定某个方法或某段代码需要在线程之间互斥执行。在消息传递的并发模型里,由于消息的发送必须在消息的接收之前,因此同步是隐式进行的。
Java的并发采用的是共享内存模型,Java线程之间的通信总是隐式进行,整个通信过程对工程师完全透明。

2.Java内存模型的抽象

在java中,所有实例域、静态域和数组元素存储在堆内存中,堆内存在线程之间共享(本文使用“共享变量”这个术语代指实例域,静态域和数组元素)。局部变量,方法定义参数和异常处理器参数不会在线程之间共享,它们不会有内存可见性问题,也不受内存模型的影响。
Java线程之间的通信由Java内存模型(本文简称为JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的一个抽象概念,并不真实存在。它涵盖了缓存,写缓冲区,寄存器以及其他的硬件和编译器优化。Java内存模型的抽象示意图如下:

从上图来看,线程A与线程B之间如要通信的话,必须要经历下面2个步骤:

  1. 线程A把本地内存A中更新过的共享变量刷新到主内存中去。
  2. 线程B到主内存中去读取线程A之前已更新过的共享变量。

3.从源代码到指令序列的重排序

在执行程序时为了提高性能,编译器和处理器常常会对指令做重排序。重排序分三种类型:

  1. 编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
  2. 指令级并行的重排序。现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
  3. 内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
    从java源代码到最终实际执行的指令序列,会分别经历下面三种重排序:

上述的1属于编译器重排序,2和3属于处理器重排序。这些重排序都可能会导致多线程程序出现内存可见性问题。对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排序(不是所有的编译器重排序都要禁止)。对于处理器重排序,JMM的处理器重排序规则会要求java编译器在生成指令序列时,插入特定类型的内存屏障指令,通过内存屏障指令来禁止特定类型的处理器重排序(不是所有的处理器重排序都要禁止)。
JMM属于语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,通过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证。

4.happens-before简介

happens-before是JMM最核心的概念,对于Java工程师来说,理解happens-before是理解JMM的关键。

JMM的设计意图

在设计JMM需要考虑两个关键因素:

  1. 工程师对内存模型的使用,希望内存模型易于理解和编程,工程师希望基于一个强内存模型来编写代码。
  2. 编译器和处理器对内存的实现,希望内存模型对他们的束缚越少越好,编译器和处理器希望实现一个弱内存模型。

这两个因素是互相矛盾的,所以JSR-133专家组设计时需要考虑到一个好的平衡点:一方面为工程师提供足够强的内存可见性,另一方面要对编译器和处理器的限制要尽量松些。

我们来举了例子:

int a=10; //A
int b=20; //B
int c=a*b; //C

上面是一个简单的乘法运算,并存在3个happens-before关系:

  1. A happens-before B
  2. B happens-before C
  3. A happens-before C

这三个happens-before关系中,2和3是必须的,但1是不必要的。因此,JMM把happens-before要求禁止的重排序分为两类:

  1. 会改变程序执行结果的重排序。
  2. 不会改变程序执行结果的重排序。

JMM对这两种不同性质的重排序,采取了不同的策略:

  1. 对于会改变程序执行结果的重排序,JMM要求编译器和处理器必须禁止这种重排序。
  2. 对于不会改变程序执行结果的重排序,JMM要求编译器和处理器不做要求,可以允许这种重排序。

happens-before的定义与规则

JSR-133使用happens-before的概念来指定两个操作之间的执行顺序,由于这两个操作可以在一个线程内,也可以在不同的线程之间。因此,JMM可以通过happens-before关系向工程师提供跨线程的内存可见性保证。

happens-before规则如下:

  1. 程序顺序规则:一个线程中的每个操作,happens- before 于该线程中的任意后续操作。
  2. 监视器锁规则:对一个监视器锁的解锁,happens- before 于随后对这个监视器锁的加锁。
  3. volatile变量规则:对一个volatile域的写,happens- before 于任意后续对这个volatile域的读。
  4. 传递性:如果A happens- before B,且B happens- before C,那么A happens- before
    C。

5.顺序一致性

顺序一致性内存模型是一个理论参考模型,在设计的时候,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存模型为参考。

数据竞争与顺序一致性

当程序未正确同步时,就会存在数据竞争。数据竞争指的是:在一个线程中写一个变量,在另一个线程读同一个变量,而且写和读没有通过同步来排序。
当代码中包含数据竞争时,程序的执行往往产生违反直觉的结果。如果一个多线程程序能正确同步,这个程序将是一个没有数据竞争的程序。
JMM对正确同步的多线程程序的内存一致性做了如下保证:
如果程序是正确同步的,程序的执行将具有顺序一致性(sequentially consistent),即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。这里的同步是指广义上的同步,包括对常用同步原语(synchronized,volatile和final)的正确使用。

顺序一致性模型

顺序一致性内存模型是一个被计算机科学家理想化了的理论参考模型,它为程序员提供了极强的内存可见性保证。顺序一致性内存模型有两大特性:

  1. 一个线程中的所有操作必须按照程序的顺序来执行。
  2. (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。

顺序一致性内存模型为程序员提供的视图如下:

在概念上,顺序一致性模型有一个单一的全局内存,这个内存通过一个左右摆动的开关可以连接到任意一个线程。同时,每一个线程必须按程序的顺序来执行内存读/写操作。从上图我们可以看出,在任意时间点最多只能有一个线程可以连接到内存。当多个线程并发执行时,图中的开关装置能把所有线程的所有内存读/写操作串行化。

顺序一致性内存模型中的每个操作必须立即对任意线程可见,但是在JMM中就没有这个保证。未同步程序在JMM中不但整体的执行顺序是无序的,而且所有线程看到的操作执行顺序也可能不一致。比如,在当前线程把写过的数据缓存在本地内存中,且还没有刷新到主内存之前,这个写操作仅对当前线程可见;从其他线程的角度来观察,会认为这个写操作根本还没有被当前线程执行。只有当前线程把本地内存中写过的数据刷新到主内存之后,这个写操作才能对其他线程可见。在这种情况下,当前线程和其它线程看到的操作执行顺序将不一致。

同步程序的顺序一致性

我们接下来看看正确同步的程序如何具有顺序一致性。

class SynchronizedExample {
int a = 0;
boolean flag = false;

public synchronized void writer() {
a = 1;
flag = true;
}

public synchronized void reader() {
if (flag) {
int i = a;
……
}
}
}

上面示例代码中,假设A线程执行writer()方法后,B线程执行reader()方法。这是一个正确同步的多线程程序。根据JMM规范,该程序的执行结果将与该程序在顺序一致性模型中的执行结果相同。下面是该程序在两个内存模型中的执行时序对比图:

在顺序一致性模型中,所有操作完全按程序的顺序串行执行。而在JMM中,临界区内的代码可以重排序(但JMM不允许临界区内的代码“逸出”到临界区之外,那样会破坏监视器的语义)。JMM会在退出监视器和进入监视器这两个关键时间点做一些特别处理,使得线程在这两个时间点具有与顺序一致性模型相同的内存视图。虽然线程A在临界区内做了重排序,但由于监视器的互斥执行的特性,这里的线程B根本无法“观察”到线程A在临界区内的重排序。这种重排序既提高了执行效率,又没有改变程序的执行结果。
从这里我们可以看到JMM在具体实现上的基本方针:在不改变(正确同步的)程序执行结果的前提下,尽可能的为编译器和处理器的优化打开方便之门。

未同步程序的顺序一致性

JMM不保证未同步程序的执行结果与该程序在顺序一致性模型中的执行结果一致。因为未同步程序在顺序一致性模型中执行时,整体上是无序的,其执行结果无法预知。保证未同步程序在两个模型中的执行结果一致毫无意义。
和顺序一致性模型一样,未同步程序在JMM中的执行时,整体上也是无序的,其执行结果也无法预知。
同时,未同步程序在这两个模型中的执行特性有下面几个差异:

  1. 顺序一致性模型保证单线程内的操作会按程序的顺序执行,而JMM不保证单线程内的操作会按程序的顺序执行(比如上面正确同步的多线程程序在临界区内的重排序)。
  2. 顺序一致性模型保证所有线程只能看到一致的操作执行顺序,而JMM不保证所有线程能看到一致的操作执行顺序。
  3. JMM不保证对64位的long型和double型变量的读/写操作具有原子性,而顺序一致性模型保证对所有的内存读/写操作都具有原子性。

对于第三个差异:在一些32位的处理器上,如果要求对64位数据的读/写操作具有原子性,会有比较大的开销。为了照顾这种处理器,java语言规范鼓励但不强求JVM对64位的long型变量和double型变量的读/写具有原子性。当JVM在这种处理器上运行时,会把一个64位long/ double型变量的读/写操作拆分为两个32位的读/写操作来执行。这两个32位的读/写操作可能会被分配到不同的总线事务中执行,此时对这个64位变量的读/写将不具有原子性。
当单个内存操作不具有原子性,将可能会产生意想不到后果。请看下面示意图:

如上图所示,假设处理器A写一个long型变量,同时处理器B要读这个long型变量。处理器A中64位的写操作被拆分为两个32位的写操作,且这两个32位的写操作被分配到不同的写事务中执行。同时处理器B中64位的读操作被拆分为两个32位的读操作,且这两个32位的读操作被分配到同一个的读事务中执行。当处理器A和B按上图的时序来执行时,处理器B将看到仅仅被处理器A“写了一半“的无效值。

参考资料:
《Java并发编程的艺术》
深入理解Java内存模型(一)——基础

Java并发编程(五)ConcurrentHashMap的实现原理和源码分析

前言

在Java1.5中,并发编程大师Doug Lea给我们带来了concurrent包,而该包中提供的ConcurrentHashMap是线程安全并且高效的HashMap,本节我们就来研究下ConcurrentHashMap是如何保证线程安全的同时又能高效的操作。

1.为何用ConcurrentHashMap

在并发编程中使用HashMap可能会导致死循环,而使用线程安全的HashTable效率又低下。

线程不安全的HashMap

在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap,如以下代码会导致死循环:

final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), “”);
}
}, “moon” + i).start();
}
}
}, “ftf”);
t.start();
t.join();

HashMap在并发执行put操作是会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成唤醒数据结构,Entry的next节点永远不为空,就会产生死循环。

效率低下的HashTable

HashTable使用synchronized来保证线程的安全,但是在线程竞争激烈的情况下HashTable的效率非常低下。当一个线程访问HashTable的同步方法,其他方法访问HashTable的同步方法时,会进入阻塞或者轮询状态。如果线程1使用put进行元素添加,线程2不但不能用put方法添加于元素同是也无法用get方法来获取元素,所以竞争越激烈效率越低。

ConcurrentHashMap的锁分段技术

HashTable容器在竞争激烈的并发环境效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器有多把锁,每一把锁用于锁住容器中一部分数据,那么多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问率,这就是ConcurrentHashMap的锁分段技术。将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问。

2.Java1.6的ConcurrentHashMap的结构

首先来看看 Java1.6中ConcurrentHashMap的类图:

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment内部维护了一个链表数组,每个HashEntry就是这个链表数组的元素, 每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

3.java1.8的ConcurrentHashMap源码分析

重要的内部类

从Java1.7 版本开始 ConcurrentHashMap 不再采用 Segment 实现,而是改用 Node,Node 是一个链表的结构,每个节点可以引用到下一个节点(next)。

  • Node类
    Node是最核心的内部类,包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。
    它与HashMap中的定义很相似,但是有一些差别它对value和next属性设置了volatile同步锁,它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。
  • TreeNode类
    树节点类,另外一个核心的数据结构。 当链表长度过长的时候,会转换为TreeNode。
    但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。
    而且TreeNode在ConcurrentHashMap继承自Node类,而并非HashMap中的集成自LinkedHashMap.Entry
  • TreeBin
    这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。
  • ForwardingNode
    一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1.
    这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找

构造函数

public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

Java1.8版本的 ConcurrentHashMap 在构造函数中不会初始化 Node 数组,而是第一次 put 操作的时候初始化。
整个 Map 第一次 put 的时候,map 中用于存放数据的 Node[] 还是null。

Unsafe与CAS

在ConcurrentHashMap中,大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN的思想是比较类似的。
unsafe代码块控制了一些属性的修改工作,比如最常用的SIZECTL 。 在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工作。 利用CAS进行无锁操作,可以大大提高性能。

初始化函数initTable

调用ConcurrentHashMap的构造方法仅仅是设置了一些参数而已,而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。
初始化方法主要应用了关键属性sizeCtl 如果这个值 < 0,表示其他线程正在进行初始化,就放弃这个操作。
在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n

sizeCtl含义
1.负数代表正在进行初始化或扩容操作
2.-1代表正在初始化
3.-N 表示有N-1个线程正在进行扩容操作
4.正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。

/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, –1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings(“unchecked”)
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>
;
table = tab = nt;
sc = n – (n >>> 2);//相当于0.75*n 设置一个扩容的阈值
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

扩容方法transfer

支持多线程进行扩容操作,并没有加锁 ,这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。
单线程扩容的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:

  1. 如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
  2. 如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上
  3. 如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
  4. 遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍,完成扩容。

多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。

/**
* 一个过渡的table表 只有在扩容的时候才会使用
*/
private transient volatile Node<K,V>[] nextTable;

/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings(“unchecked”)
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位
boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//这个while循环体的作用就是在控制i– 通过i–可以依次遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (–i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = –1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex – stride : 0))) {
bound = nextBound;
i = nextIndex – 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) – (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc – 1)) {
if ((sc – 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍历到的节点为空 则放入ForwardingNode指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//节点上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果fh>=0 证明这是一个Node节点
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i–操作
advance = true;
}
//对TreeBin对象进行处理 与上面的过程类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//构造正序和反序两个链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i–操作
advance = true;
}
}
}
}
}
}

put函数

put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值。另外由于涉及到多线程,put方法就要复杂一点。在多线程中可能有以下两个情况:

  1. 如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容。
  2. 如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。

整体流程就是首先定义不允许key或value为null的情况放入 对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。如果这个位置是空的,那么直接放入,而且不需要加锁操作。
如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。 如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许 key或value为null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 第一次 put 操作的时候初始化,如果table为空的话,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();

//根据hash值计算出在table里面的位置
else if ((f = tabAt(tab, i = (n – 1) & hash)) == null) {
// 根据对应的key hash 到具体的索引,如果该索引对应的 Node 为 null,则采用 CAS 操作更新整个 table
// 如果这个位置没有值 ,直接放进去,不需要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当遇到表连接点时,需要进行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 结点上锁,只是对链表头结点作锁操作
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh > 0 说明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
//在这里遍历链表所有的结点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值和key值相同 则修改对应结点的value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
if ((e = e.next) == null) {
// 插入到链表尾
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果这个节点是树节点,就按照树的方式插入值
else if (f instanceof TreeBin) {
// 如果是红黑树结点,按照红黑树的插入
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果这个链表结点达到了临界值8,那么把这个链表转换成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的元素数量+1,table的扩容是在这里发生的
addCount(1L, binCount);
return null;
}

协助扩容函数helpTransfer

这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用上面讲到的transfer方法来进行扩容。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);//计算一个操作校验码
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

红黑树转换

在putVal函数中,treeifyBin是在链表长度达到一定阈值(8)后转换成红黑树的函数。 但是并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才将链表的结构转换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode。

private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

get方法

给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//根据hash值确定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n – 1) & h)) != null) {
//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果eh<0 说明这个节点在树上 直接寻找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;}Java并发编程(六)阻塞队列
发表于 2016-07-31 |  分类于 Java |  297
前言在 Android多线程(一)线程池这篇文章时,当我们要创建ThreadPoolExecutor的时候需要传进来一个类型为BlockingQueue的参数,它就是阻塞队列,在这一篇文章里我们会介绍阻塞队列的定义、种类、实现原理以及应用。
1.什么是阻塞队列阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。BlockingQueue有两个常见阻塞场景当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。那么支持以上两种阻塞场景的队列我们称之为阻塞队列。BlockingQueue的核心方法放入数据:offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
加入BlockingQueue,则返回失败。put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.获取数据:poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。插入和移除操作的4种处理方式抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue
full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。2.Java中的阻塞队列JDK7提供了7个阻塞队列,分别是:ArrayBlockingQueue :由数组结构组成的有界阻塞队列。LinkedBlockingQueue :由链表结构组成的有界阻塞队列。PriorityBlockingQueue :支持优先级排序的无界阻塞队列。DelayQueue:使用优先级队列实现的无界阻塞队列。SynchronousQueue:不存储元素的阻塞队列。LinkedTransferQueue:由链表结构组成的无界阻塞队列。LinkedBlockingDeque:由链表结构组成的双向阻塞队列。ArrayBlockingQueue用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);LinkedBlockingQueue基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。PriorityBlockingQueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意哦的是不能保证同优先级元素的顺序。DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景:缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。SynchronousQueue*是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法。如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer方法的关键代码如下:
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);第一行代码是试图把存放当前元素的s节点作为tail节点。第二行代码是让CPU自旋等待消费者消费元素。因为自旋会消耗CPU,所以自旋一定的次数后使用Thread.yield()方法来暂停当前正在执行的线程,并执行其他线程。tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是Jdk的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。3.阻塞队列的实现原理(JDK1.7)以ArrayBlockingQueue为例,我们先来看看代码:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private static final long serialVersionUID = –817911632652898426L;
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
…省略
}从上面代码可以看出ArrayBlockingQueue是维护一个Object类型的数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数,lock则是一个可重入锁,notEmpty和notFull是等待条件。接下来我们看看关键方法put:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,当被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}插入成功后,通过notEmpty唤醒正在等待取元素的线程。再来看看take方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}跟put方法实现类似,put方法等待的是notFull信号,而take方法等待的是notEmpty信号。在take方法中,如果可以取元素,则通过dequeue方法取得元素,下面是dequeue方法的实现:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings(“unchecked”)
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count–;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}4.阻塞队列的使用场景除了线程池的实现使用阻塞队列之外,我们可以在生产者-消费者模式来使用阻塞队列,首先使用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者模式:
public class Test {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}

class Consumer extends Thread{
@Override
public void run() {
while(true){
synchronized (queue) {
while(queue.size() == 0){
try {
System.out.println(“队列空,等待数据”);
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.poll(); //每次移走队首元素
queue.notify();
}
}
}
}

class Producer extends Thread{
@Override
public void run() {
while(true){
synchronized (queue) {
while(queue.size() == queueSize){
try {
System.out.println(“队列满,等待有空余空间”);
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.offer(1); //每次插入一个元素
queue.notify();
}
}
}
}
}下面是使用阻塞队列实现的生产者-消费者模式:
public class Test {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}

class Consumer extends Thread{
@Override
public void run() {
while(true){
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Producer extends Thread{
@Override
public void run() {
while(true){
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
} } }}}很显然使用阻塞队列实现不需要单独考虑同步和线程间通信的问题,实现起来很简单。Java并发编程(七)ConcurrentLinkedQueue的实现原理和源码分析
发表于 2016-08-14 |  分类于 Java |  185
前言我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现,本节我们就来研究下ConcurrentLinkedQueue是如何保证线程安全的同时又能高效的操作的。
1.ConcurrentLinkedQueue的结构ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。基于CAS的“wait-free”(常规无等待)来实现,CAS并不是一个算法,它是一个CPU直接支持的硬件指令,这也就在一定程度上决定了它的平台相关性。当前常用的多线程同步机制可以分为下面三种类型:volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。互斥锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。ConcurrentLinkedQueue 的非阻塞算法实现主要可概括为下面几点:使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 /
出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 /
出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。以批处理方式来更新head/tail,从整体上减少入队 / 出队操作的开销。
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。2.入队列入队列就是将入队节点添加到队列的尾部,假设我们要在一个队列中依次插入4个节点,来看看下面的图来方便理解:
添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。添加元素3,设置tail节点的next节点为元素3节点。添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,在入队列前如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。上面的分析从单线程入队的角度来理解入队过程,但是多个线程同时进行入队情况就变得更加复杂,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。让我们再通过源码来详细分析下它是如何使用CAS方式来入队的(JDK1.8):
public boolean offer(E e) {
checkNotNull(e);
//创建入队节点
final Node<E> newNode = new Node<E>(e);
//t为tail节点,p为尾节点,默认相等,采用失败即重试的方式,直到入队成功
for (Node<E> t = tail, p = t;;) {
//获得p的下一个节点
Node<E> q = p.next;
// 如果下一个节点是null,也就是p节点就是尾节点
if (q == null) {
//将入队节点newNode设置为当前队列尾节点p的next节点
if (p.casNext(null, newNode)) {
//判断tail节点是不是尾节点,也可以理解为如果插入结点后tail节点和p节点距离达到两个结点
if (p != t)
//如果tail不是尾节点则将入队节点设置为tail。
// 如果失败了,那么说明有其他线程已经把tail移动过
casTail(t, newNode);
return true;
}
}
// 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回 head节点
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
//p有next节点,表示p的next节点是尾节点,则需要重新更新p后将它指向next节点
p = (p != t && t != (t = tail)) ? t : q;
}
}从源代码我们看出入队过程中主要做了三件事情,第一是定位出尾节点;第二个是使用CAS指令将入队节点设置成尾节点的next节点,如果不成功则重试;第三是重新定位tail节点。
从第一个if判断就来判定p有没有next节点如果没有则p是尾节点则将入队节点设置为p的next节点,同时如果tail节点不是尾节点则将入队节点设置为tail节点。如果p有next节点则p的next节点是尾节点,需要重新更新p后将它指向next节点。还有一种情况p等于p的next节点说明p节点和p的next节点都为空,表示这个队列刚初始化,正准备添加数据,所以需要返回head节点。3.出队列出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。让我们通过每个节点出队的快照来观察下head节点的变化。

从上图可知,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。让我们再通过源码来深入分析下出队过程(JDK1.8):
public E poll() {
// 设置起始点
restartFromHead:
for (;;) {
//p表示head结点,需要出队的节点
for (Node<E> h = head, p = h, q;;) {
//获取p节点的元素
E item = p.item;
//如果p节点的元素不为空,使用CAS设置p节点引用的元素为null
if (item != null && p.casItem(item, null)) {

if (p != h) // hop two nodes at a time
//如果p节点不是head节点则更新head节点,也可以理解为删除该结点后检查head是否与头结点相差两个结点,如果是则更新head节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//如果p节点的下一个节点为null,则说明这个队列为空,更新head结点
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//结点出队失败,重新跳到restartFromHead来进行出队
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}更新head节点的updateHead方法:
final void updateHead(Node<E> h, Node<E> p)
{
// 如果两个结点不相同,尝试用CAS指令原子更新head指向新头节点
if (h != p && casHead(h, p))
//将旧的头结点指向自身以实现删除
h.lazySetNext(h);
}首先获取head节点的元素,并判断head节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将head节点的引用设置成null,如果CAS成功,则直接返回head节点的元素,如果CAS不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取head节点。如果p节点的下一个节点为null,则说明这个队列为空(此时队列没有元素,只有一个伪结点p),则更新head节点。4.队列判空有些人在判断队列是否为空时喜欢用queue.size()==0,让我们来看看size方法:
public int size()
{
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}可以看到这样在队列在结点较多时会依次遍历所有结点,这样的性能会有较大影响,因而可以考虑empty函数,它只要判断第一个结点(注意不一定是head指向的结点)。
public boolean isEmpty() {
return first() == null;
}
阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: