您的位置:首页 > 其它

高并发学习笔记(三)

2019-03-22 18:50 10 查看
一、同步的相关知识 1. volatile的关键字     volatile用于修饰变量,表示变量的可见性。在CPU执行计算的过程中,并不是直接使用内存中的数据,而是将计算过程中需要的数据复制一份加载到CPU计算的高速缓存中,CPU进行计算时直接从高速缓存中读取和写入数据,当运算结束时,再将高速缓存中的数据刷新到内存中(CPU计算中断时也有可能刷新)。这个特性导致在线程运行过程中,如果变量被其他线程修改,可能造成内存和高速缓存中数据不一致的情况出现,从而导致结果出错。而volatile修饰的变量是线程可见的,当JVM碰见volatile修饰的变量时,会通知CPU在计算过程中,使用该变量参与计算时都要去内存中检查变量数据是否发生改变,而不是一直使用高速缓存中的数据。但是,volatile只是通知CPU检查内存数据,并不能保证变量在多线程中同步。
/**
* 验证volatile的可见性
* Created by bzhang on 2019/3/14.
*/
public class VolatileTest {
      /*volatile*/ int i = 1;

      public void m1(){
            System.out.println(Thread.currentThread().getName()+":start");
            while (i>0){
            }
            System.out.println(Thread.currentThread().getName()+":end");
      }

      public static void main(String[] args) {
            VolatileTest test = new VolatileTest();
            Thread thread = new Thread(new Runnable() {
                  @Override
                  public void run() {
                        test.m1();
                  }
            });
            thread.start();
            try {
                  TimeUnit.SECONDS.sleep(1);
                  test.i = -1;
                  thread.join();
                  System.out.println("main end");
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
      }
}

//变量i没有volatile修饰的结果:
Thread-0:start

//变量i有volatile修饰的结果:
Thread-0:start
Thread-0:end
main end
  2.原子同步数据类型     普通的变量在多线程环境下是不具备原子性的,为了方便开发Java为我们提供了java.util.concurrent.atomic包。该包提供了若干个原子类型,这些类中的每一个方法都保证了原子操作。在多线程环境下访问原子类型对象中的方法,不会出现同步问题。  
AtomicBoolean 可以用原子方式更新的 boolean 值
AtomicInteger 可以用原子方式更新的 int 值
AtomicIntegerArray 可以用原子方式更新其元素的 int 数组。
AtomicIntegerFieldUpdater<T> 基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。此类用于原子数据结构,该结构中同一节点的几个字段都独立受原子更新控制。
AtomicLong 可以用原子方式更新的 long 值
AtomicLongArray 可以用原子方式更新其元素的 long 数组。
AtomicLongFieldUpdater<T> 基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。此类用于原子数据结构,该结构中同一节点的几个字段都独立受原子更新控制。
AtomicMarkableReference<V> AtomicMarkableReference 维护带有标记位的对象引用,可以原子方式对其进行更新。
AtomicReference<V> 可以用原子方式更新的对象引用。
AtomicReferenceArray<E> 可以用原子方式更新其元素的对象引用数组。
AtomicReferenceFieldUpdater<T,V> 基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。该类用于原子数据结构,该结构中同一节点的几个引用字段都独立受原子更新控制
AtomicStampedReference<V> AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
    下面以AtomicInteger为例,了解原子类型。
/**
* AtomicInteger使用示例
* Created by bzhang on 2019/3/14.
*/
public class AtomicClassTest {
      private AtomicInteger integer = new AtomicInteger();
      private int k;

      public void add(){
            for (int i=0;i<10000;i++){
                  integer.incrementAndGet();    //以原子的方式+1
                   //k++
            }
      }

      public int get(){
            return integer.get();
            //return k;
      }

      public static void main(String[] args) {
            AtomicClassTest test = new AtomicClassTest();
            new Thread(new Runnable() {
                  @Override
                  public void run() {
                        for (int i =0 ;i<10;i++){
                              test.add();
                        }
                  }
            }).start();
            new Thread(new Runnable() {
                  @Override
                  public void run() {
                        for (int i =0 ;i<5;i++){
                              test.add();
                        }
                  }
            }).start();
            try {
                    //睡眠等待自增结束,不等待得到的结果可能不正确,原因是AtomicInteger中只保证单个方法的原子性,并不保证多个方法之间的原子操作
                  TimeUnit.SECONDS.sleep(2);    
                  System.out.println(test.get());
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
      }
}

//使用变量integer的结果:
150000
//使用变量k的结果:
100193

 

    可以看到并没使用锁来进行同步操作,依然保证了结果的正确性。来看看AtomicInteger源码中是如何实现的:
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();    //Unsafe是JDK内部的工具类,主要实现了平台相关的操作,没事不要用它,不安全
    private static final long valueOffset;

    static {
        try {
            //返回指定静态field的内存地址偏移量,即返回value的内存地址偏移量
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;    //实际存储数据的变量

        //构造初始value为initialValue的AtomicInteger对象
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

        //构造初始value为0的AtomicInteger对象
    public AtomicInteger() {
    }

        //返回value的值,该方法不具备原子操作
    public final int get() {
        return value;
    }

        //设置value的值为newValue
    public final void set(int newValue) {
        value = newValue;
    }

        //最后设置为给定的newValue值
    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

        //以原子方式设置新值,并返回旧值
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

        //如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
        
        //如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。但可能失败(jdk8中根本看不出和上面一个有什么区别!沃德尼玛)
    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

        //以原子方式将当前值+1,返回旧值
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

        //以原子方式将当前值-1,返回旧值
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

        //以原子方式将给定值与当前值相加,返回旧值
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

        //以原子方式将当前值+1,返回新值
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

        ////以原子方式将当前值-1,返回新值
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

        //以原子方式将给定值与当前值相加,返回新值
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

        //以给定的函数返回值更新原有的旧值,返回旧值。IntUnaryOperator是java.util.function包下的接口,是为lambda表达式的运用而新增的
    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

        //以给定的函数返回值更新原有的旧值,返回新值
    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

        //将旧值与给定的x值经过函数运行后得到的值更新替换旧值,并返回旧值
    public final int getAndAccumulate(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

        //将旧值与给定的x值经过函数运行后得到的值更新替换旧值,并返回新值
    public final int accumulateAndGet(int x,
                                      IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

}

//unsafe类中的部分方法
public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

public final int getAndSetInt(Object o, long offset, int newValue) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, newValue));
    return v;
}
           可以看出AtomicInteger类中实现原子操作的方法都是通过unsafe对象的compareAndSwapInt方法实现,该方法是个本地方法,是借助CAS原理实现原子操作。CAS即比较交换法,其原理是进行运算前将当前值的内存地址与给定值的内存地址相比较,只有当前值内存地址和给定值的内存地址相等时,才更新为新值,否则更新失败重新获取当前值内存地址为给定值内存地址后继续比较换算直到更新成功为止,CAS是一种乐观锁的实现。
  • 乐观锁:  假设不会发生并发冲突,只在最后更新共享资源的时候判断一下在此期间有没有别的线程修改了这个共享资源。如果发生冲突就重试,直到没有冲突,更新成功为止。
  • 悲观锁:  假定会发生并发冲突,即共享资源会被某个线程更改。所以当某个线程获取共享资源时,会阻止别的线程获取共享资源。也称独占锁或者互斥锁,synchronized同步锁就是一种悲观锁。
    CAS虽然可以不使用锁就能使多线程环境下达到同步的效果,且能大大提升效率。但它并不是完美无缺的,依然存在着缺陷:ABA问题、循环时间过长开销大以及只能保证一个共享资源的原子操作。
  • ABA问题:CAS在更新值之前要检查当前值是否改变,若没有变才会更新,但若是出现当前值由A变为B,又变回A,这时CAS会认为当前值没有变化,但实际上已经发生了改变(解决办法:可以多带个版本号或是时间戳区分开)。
  • 循环时间过长,开销大:CAS长时间循环判断,不能成功更新,会给CPU带来巨大的执行开销。
  • 只能保证一个共享资源的原子操作:当需要对多个共享资源都实现原子操作时,这时候就使用锁了。
  3.wait及notify方法     wait和notify的方法时Object类下的方法,wait方法让持有该锁对象的线程进入_WaitSet队列中等待被唤醒,notify和notifyAll方法则是唤醒_WaitSet队列中的一个(随机)或所有的等待线程。典型应用是在生产者消费者模式的使用。
/**
* 使用wait和notify实现生产者消费者模式
* Created by bzhang on 2019/3/15.
*/
public class WaitTest {
      public static void main(String[] args) {
            Pool pool = new Pool();
            Object lock = new Object();   //锁对象
            //生产者线程
            new Thread(new Runnable() {
                  @Override
                  public void run() {
                        int i=0;
                        while (true){
                              synchronized (lock){
                                    if (pool.size()>=5){
                                          try {
                                                lock.wait();      //库存满了,不在生产
                                          } catch (InterruptedException e) {
                                                e.printStackTrace();
                                          }
                                    }else {
                                          pool.add(new Object());       //生产产品
                                          try {
                                                System.out.println(Thread.currentThread().getName()+"生产了"+ ++i +"个对象——"+
                                                        "pool中还有"+pool.size()+"个对象");
                                                TimeUnit.SECONDS.sleep(2);
                                          } catch (InterruptedException e) {
                                                e.printStackTrace();
                                          }
                                    }
                                    lock.notifyAll();//唤醒消费者线程
                              }
                        }
                  }
            }).start();
            //消费者线程
            new Thread(new Runnable() {
                  @Override
                  public void run() {
                        int i = 0;
                        while (true){
                              synchronized (lock){
                                    if (pool.size()==0){
                                          try {
                                                lock.wait();      //没有产品可消费,进入等待
                                          } catch (InterruptedException e) {
                                                e.printStackTrace();
                                          }
                                    }else {
                                          pool.get();       //消费产品
                                          try {
                                                System.out.println(Thread.currentThread().getName()+"消费了"+ ++i +"个对象——"+
                                                        "pool中还有"+pool.size()+"个对象");
                                                TimeUnit.SECONDS.sleep(1);
                                          } catch (InterruptedException e) {
                                                e.printStackTrace();
                                          }
                                    }
                                    lock.notifyAll();//唤醒生产者线程
                              }
                        }
                  }
            }).start();
      }
}

/**
* 产品对象池
*/
class Pool{
      Queue<Object> queue = new LinkedList<>();
      public void add(Object object){
            queue.offer(object);
      }

      public Object get(){
            return queue.poll();
      }

      public int size(){
            return queue.size();
      }
}
      要是用wait和notify的方法是有前提的 当前线程必须获其对象的锁,否则会抛出IllegalMonitorStateException异常,所以这两个方法必须在同步代码里面调用。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  CAS