您的位置:首页 > 编程语言 > Java开发

Java 原子操作类详解(AtomicInteger、AtomicIntegerArray等)

2016-08-22 18:38 405 查看
当程序更新一个变量时,如果多线程同时更新这个变量,可能得到期望之外的值,比如变量i=1,A线程更新i+1,B线程也更新i+1,经过两个线程操作之后可能i不等于3,而是等于2。因为A和B线程在更新变量i的时候拿到的i都是1,这就是线程不安全的更新操作,通常我们会使用synchronized来解决这个问题,synchronized会保证多线程不会同时更新变量i。

import java.util.concurrent.CountDownLatch;

public class UnSafeAdd {
private static int threadCount=10;
private static CountDownLatch countDown=new CountDownLatch(threadCount);
private static int count=0;
private static class Counter implements Runnable{
@Override
public void run() {
for(int i=0;i<1000;i++){
count++;//非原子操作
}
countDown.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new Counter());
}
for(int i=0;i<threadCount;i++){
threads[i].start();;
}
countDown.await();
System.out.println(count);
}输出:
8968

import java.util.concurrent.CountDownLatch;

public class SafeAddWithSyn {
private static int threadCount=10;
private static CountDownLatch countDown=new CountDownLatch(threadCount);
private static int count=0;
synchronized private static void addCount(){//同步方法
count++;
}
private static class Counter implements Runnable{
@Override
public void run() {
for(int i=0;i<1000;i++){
addCount();
}
countDown.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new Counter());
}
for(int i=0;i<threadCount;i++){
threads[i].start();;
}
countDown.await();
System.out.println(count);
}
}
输出:
10000

而Java从JDK 1.5开始提供了java.util.concurrent.atomic包(以下简称Atomic包),这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式。因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。Atomic包里的类基本都是使用Unsafe实现的包装类。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class SafeAddWithAtomicInteger {
private static int threadCount=10;
private static CountDownLatch countDown=new CountDownLatch(threadCount);
private static AtomicInteger count=new AtomicInteger(0);//原子操作类
private static class Counter implements Runnable{
@Override
public void run() {
for(int i=0;i<1000;i++){
count.addAndGet(1);
}
countDown.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new Counter());
}
for(int i=0;i<threadCount;i++){
threads[i].start();;
}
countDown.await();
System.out.println(count.get());
}
}输出:
10000

原子更新基本类型

使用原子的方式更新基本类型,Atomic包提供了以下3个类。
AtomicBoolean:原子更新布尔类型。
AtomicInteger:原子更新整型。
AtomicLong:原子更新长整型。

以上3个类提供的方法几乎一模一样,所以本节仅以AtomicInteger为例进行讲解,AtomicInteger的源码如下:

public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates 使用Unsafe类的CAS操作实现更新
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
......
//volatile保证可见性
private volatile int value;
//构造器
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
......

//CAS更新
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
......
/*以下方法都使用循环CAS更新数据*/
<pre name="code" class="java"> public final int getAndSet(int newValue) {//以原子方式设置新值
for (;;) {
int current = get();
if (compareAndSet(current, newValue))
return current;
}
}  public final int getAndIncrement() {//以原子方式自增 for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } ......

//以原子方式将输入值与当前值相加并返回结果 public final int addAndGet(int delta) { for (;;) {//循环 int current = get(); int next = current + delta; if (compareAndSet(current, next))//CAS return next; } } ......}


原子更新数组

通过原子的方式更新数组里的某个元素,Atomic包提供了以3类
AtomicIntegerArray:原子更新整型数组里的元素。
AtomicLongArray:原子更新长整型数组里的元素。
AtomicReferenceArray:原子更新引用类型数组里的元素。

import java.util.concurrent.CountDownLatch;

public class AtomicIntegerArrayTest {
private static int threadCount=1000;
private static CountDownLatch countDown=new CountDownLatch(threadCount);
static int[] values=new int[10];
private static class Counter implements Runnable{
@Override
public void run() {
for(int i=0;i<100;i++){
for(int j=0;j<10;j++){//所有元素+1
values[j]++;
}
}
countDown.countDown();
}
}
public static void main(String[] args) throws InterruptedException{
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new Counter());
}
for(int i=0;i<threadCount;i++){
threads[i].start();;
}
countDown.await();
for(int i=0;i<10;i++){
System.out.print(values[i]+" ");
}
}
}输出:
99997 99996 99997 99997 99996 99996 99996 99996 99996 99996

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicIntegerArrayTest {
private static int threadCount=1000;
private static CountDownLatch countDown=new CountDownLatch(threadCount);
static int[] values=new int[10];
static AtomicIntegerArray ai=new AtomicIntegerArray(values);
private static class Counter implements Runnable{
@Override
public void run() {
for(int i=0;i<100;i++){
for(int j=0;j<10;j++){//所有元素+1
ai.getAndIncrement(j);
}
}
countDown.countDown();
}
}
public static void main(String[] args) throws InterruptedException{
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new Counter());
}
for(int i=0;i<threadCount;i++){
threads[i].start();;
}
countDown.await();
for(int i=0;i<10;i++){
System.out.print(ai.get(i)+" ");
}
System.out.println();
for(int i=0;i<10;i++){
System.out.print(values[i]+" ");
}
}
}输出:
100000 100000 100000 100000 100000 100000 100000 100000 100000 100000

0 0 0 0 0 0 0 0 0 0

需要注意的是,数组value通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份,所以当AtomicIntegerArray对内部的数组元素进行修改时,不会影响传入的数组。

//An {@code int} array in which elements may be updated atomically.
public class AtomicIntegerArray implements java.io.Serializable {
......
private final int[] array;//存储数据的int数组

......
//构造器
public AtomicIntegerArray(int length) {
array = new int[length];
}
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();//这里会复制传入的int数组,因此不会改变原来的int数组
}

public final int length() {
return array.length;
}
......
}

原子更新引用

原子更新基本类型的AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子更新引用类型提供的类。Atomic包提供了以下3个类。
AtomicReference:原子更新引用类型。
AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
AtomicMarkableReference:原子更新带有标记位的引用类型。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

public class Test {
private static int threadCount=10;
private static CountDownLatch countDown=new CountDownLatch(threadCount);
public static AtomicReference<User> atomicUserRef = new AtomicReference<User>();
private static class ReferenceUpdater implements Runnable{
User user;
public ReferenceUpdater(User user){
this.user=user;
}
@Override
public void run() {
for(int i=0;i<1000;i++){
User oldUser=atomicUserRef.get();
atomicUserRef.compareAndSet(oldUser, user);
Thread.yield();
}
countDown.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads=new Thread[threadCount];
for(int i=0;i<threadCount;i++){
threads[i]=new Thread(new ReferenceUpdater(new User("name"+i,i)));
}
for(int i=0;i<threadCount;i++){
threads[i].start();;
}
countDown.await();
System.out.println(atomicUserRef.get().getName());
System.out.println(atomicUserRef.get().getOld());
}

static class User {
private String name;
private int old;

public User(String name, int old) {
this.name = name;
this.old = old;
}

public String getName() {
return name;
}

public int getOld() {
return old;
}
}
}
输出:

name1

1

每次输出结果都不确定,10种情况都有可能,但是name属性和age属性是匹配的。

/**An object reference that may be updated atomically.*/
public class AtomicReference<V> implements java.io.Serializable {
<pre name="code" class="java"> ......
private static final Unsafe unsafe = Unsafe.getUnsafe();//用于CAS更新 ...... private volatile V value;//引用,volatile保证可见性 public AtomicReference(V initialValue) { value = initialValue; } public AtomicReference()
{ } //利用Unsafe类执行CAS操作 public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); } ......

//循环CAS public final V getAndSet(V newValue) { while (true) { V x = get(); if (compareAndSet(x, newValue)) return x; } }

......}


原子更新字段

如果需原子地更新某个类里的某个字段时,就需要使用原子更新字段类,Atomic包提供了以下3个类进行原子字段更新。
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
AtomicLongFieldUpdater:原子更新长整型字段的更新器。
AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题。

要想原子地更新字段类需要两步。第一步,因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。第二步,更新类的字段(属性)必须使用public volatile修饰符。

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterTest {
// 创建原子更新器,并设置需要更新的对象类和对象的属性
private static AtomicIntegerFieldUpdater<User> a =
AtomicIntegerFieldUpdater.newUpdater(User.class, "old");

public static void main(String[] args) throws InterruptedException {
// 设置柯南的年龄是10岁
User conan = new User("conan", 10);
// 柯南长了一岁,但是仍然会输出旧的年龄
System.out.println(a.getAndIncrement(conan));
// 输出柯南现在的年龄
System.out.println(a.get(conan));
}

public static class User {
private String name;
public volatile int old;

public User(String name, int old) {
this.name = name;
this.old = old;
}

public String getName() {
return name;
}

public int getOld() {
return old;
}
}
}输出:
10

11

内容源自:

《Java并发编程的艺术》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: