JDK源码——java.util.concurrent(六)
2017-05-09 20:37
393 查看
测试代码:
https://github.com/kevindai007/springboot_houseSearch/tree/master/src/test/java/com/kevindai/juc
这里能看到CyclicBarrier会让调用await()的线程等待,把CyclicBarrier的资源获取完之后,所有的线程一起运行.
下面咱们一起看看其源码.先看看构造函数
再来看看await()方法
CyclicBarrier的主要方法到这里就分析完了,主要是用了ReentrantLock+Condition+int count组成,没用 AQS;
注意与CountDownLatch的区别:
CountDownLatch是等待所有线程运行完成之后,然后去运行另外一个(或一组)线程;而CyclicBarrier则是一组线程相互等待,当所有线程准备完毕之后,这组线程一起执行,且可以在等待完成后执行一个屏障命令
CountDownLatch只能使用一次,而CyclicBarrier正常结束后调用nextGeneration初始化可以重复使用
昨天电脑上装了换了JDK8,然后发现JDK8中ConcurrenHashMap的代码真的是复杂到家了,果断换成JDK7来研究
先看看看一些重要的属性
这是主要字段,基本能够理解,下面咱们从构造方法开始开看ConcurrentHashMap的具体流程
构造方法中主要进行了参数校验,确认了segment的数量和大小,并初始化S0,下面看看put方法
这时咱们来看看Segment是怎么实现的
可以看到Segment继承了ReentrantLock,因此在put方法中能保证线程安全.通过Segment中比较重要的方法基本就是这些,但其中还有很多看不懂的地方,会继续努力的
https://github.com/kevindai007/springboot_houseSearch/tree/master/src/test/java/com/kevindai/juc
CyclicBarrier
咱们首先通过一个demo来了解CyclicBarrier的用法和特点public class CyclicBarrierTest { public static void main(String[] args) { final CyclicBarrier cyclicBarrier = new CyclicBarrier(10); for (int i = 0; i < 11; i++) { Runnable run = new Runnable() { @Override public void run() { System.out.println("线程开始" + Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程开始启动!" + Thread.currentThread().getName()); } }; Thread thread = new Thread(run,"Thread" + i); thread.start(); } } }
这里能看到CyclicBarrier会让调用await()的线程等待,把CyclicBarrier的资源获取完之后,所有的线程一起运行.
下面咱们一起看看其源码.先看看构造函数
public CyclicBarrier(int parties) { this(parties, null); } //传入可获取的资源数及资源被获取完时执行的命令 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
再来看看await()方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //CyclicBarrier可重复使用,用于做判断是否在同一个条件中 final Generation g = generation; //为true表示已经被打破,抛异常 if (g.broken) throw new BrokenBarrierException(); //如果线程被中断,那么打破屏障,唤醒屏障前的其他等待线程,抛出异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //剩余资源数减一 int index = --count; //如果最后一个资源被获取,那么执行barrierCommand,然后唤醒所有线程 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //唤醒屏障前其他等待线程,重置count和new generation nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //如果还有资源可以被其他线程获取,那么自旋等待 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //如果await的线程被中断,检查下generation if (g == generation && ! g.broken) { //处于当前generation并且屏障没有被打破,那就打破屏障 breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } //当资源被减完时调用此方法,让所有等待线程继续执行,重置count,设置一个新的Generation private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } //打破屏障 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
CyclicBarrier的主要方法到这里就分析完了,主要是用了ReentrantLock+Condition+int count组成,没用 AQS;
注意与CountDownLatch的区别:
CountDownLatch是等待所有线程运行完成之后,然后去运行另外一个(或一组)线程;而CyclicBarrier则是一组线程相互等待,当所有线程准备完毕之后,这组线程一起执行,且可以在等待完成后执行一个屏障命令
CountDownLatch只能使用一次,而CyclicBarrier正常结束后调用nextGeneration初始化可以重复使用
ConcurrentHashMap
说到ConcurrentHashMap不得不先说说HashMap,还好原来分析过HashMap的源码,大家看这里,咱们直接看是看源码吧(这个不做demo了,这就是一个线程安全的HashMap用法也基本相似)昨天电脑上装了换了JDK8,然后发现JDK8中ConcurrenHashMap的代码真的是复杂到家了,果断换成JDK7来研究
先看看看一些重要的属性
//默认初始大小 static final int DEFAULT_INITIAL_CAPACITY = 16; //负载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //segment的个数 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //segment中table的最小容量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大segent数量 static final int MAX_SEGMENTS = 1 << 16; static final int RETRIES_BEFORE_LOCK = 2; final int segmentMask; final int segmentShift; final Segment<K,V>[] segments;
这是主要字段,基本能够理解,下面咱们从构造方法开始开看ConcurrentHashMap的具体流程
public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //参数校验 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //设置最大segment数量 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1;//segment数量 while (ssize < concurrencyLevel) { ++sshift; //ssize向左位移一位 ssize <<= 1; } this.segmentShift = 32 - sshift;//segment的偏移量 this.segmentMask = ssize - 1;//segment掩码值 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY;//2,segment大小 while (cap < c)//这里保证每个segment的大小为2的倍数 cap <<= 1; Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];//用ssize初始化segments数组 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
构造方法中主要进行了参数校验,确认了segment的数量和大小,并初始化S0,下面看看put方法
public V put(K key, V value) { Segment<K,V> s; //ConcurrentHashMap value不能为Null if (value == null) throw new NullPointerException(); //取key的hashcode再来一次hash,2次hash打撒分布,避免冲突 int hash = hash(key); //计算要存入的segment的下标 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //只初始化了s0,这里确保segment存在 s = ensureSegment(j); return s.put(key, hash, value, false);//掉segment的put } //因为只初始化了S0,所以要保证当存放位置不为S0时segment不为空 private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; //计算偏移量 Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; //s0不为空null,所以一些参数直接从s0获取 int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); //构造segment里面的table HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //自旋+cas保证存储位置一定设置成功 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
这时咱们来看看Segment是怎么实现的
static final class Segment<K,V> extends ReentrantLock implements Serializable { //segment中的table transient volatile HashEntry<K,V>[] table; //链表长度,即元素数量 transient int count; //修改次数 transient int modCount; //极限值,当table中包含的HashEntry元素的个数超过此值时,触发table的再散列 transient int threshold; //加载因子 final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } //Concurrent的put操作,其实就是找到相应的Sement然后调用此put方法 final V put(K key, int hash, V value, boolean onlyIfAbsent) { //首先尝试加锁,加锁失败则调用scanAndLockForPut自旋加锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; //在table中查找key对应的位置 int index = (tab.length - 1) & hash; //获取第一个节点 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { //节点存在就检查是否存在相同的key,如果存在则覆盖值 if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else {//不存在就新建一个 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //超过长度则rehash if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; //新table大小 threshold = (int)(newCapacity * loadFactor); //新的极限值 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; //创建新的table数组 int sizeMask = newCapacity - 1; //计算具体位置时用,跟hashmap计算方式一样 for (int i = 0; i < oldCapacity ; i++) { //循环oldtable HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) // 只有一个节点,直接移过去 newTable[idx] = e; else { // 节点重用 HashEntry<K,V> lastRun = e; int lastIdx = idx; //下面2个for循环的逻辑是lastRun,last从next节点往后移,最后lastRun指向最后一个转移到新table的index不变的节点 //比较乱,画图走几遍,意思就是说假如原来的table[1]有10个节点,然后不停计算节点在newtable的位置,很可能从第四个节点的时候开始, //后面的所有节点在newtable中的存储位置都一样了,那么我newtable只要把第4个节点直接放过去就行,然后从链表头开始处理其他节点, //就不用把所有节点都新建一遍了 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; //直接lastRun设置到newtable // 复制其他节点 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // 把新节点加入到newtable node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } /** * 自旋尝试加锁,不成功扫描对应位置的链表,如果链表中key不存在就创建一个node,达到最大次数后就阻塞加锁,如果key存在返回的null * 处理过程中其他线程改变了链表结构,那就重头再来 */ private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { //基本就是查找key不存在就创建一个,存在就trylock一直到次数限制,再不行就阻塞加锁 if (node == null) node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { //超过最大尝试次数,那么就lock阻塞,单核1,多核64 lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { //隔一次检查一遍尝试的时候发现链表的首节点变化了,也就是有别的线程操作了,那就重来 e = first = f; // re-traverse if entry changed retries = -1; } } return node; } }
可以看到Segment继承了ReentrantLock,因此在put方法中能保证线程安全.通过Segment中比较重要的方法基本就是这些,但其中还有很多看不懂的地方,会继续努力的
相关文章推荐
- JDK源码——java.util.concurrent(二)
- JDK源码——java.util.concurrent(三)
- JDK源码——java.util.concurrent(七)
- JDK源码(线程池ThreadPoolExecutor)——java.util.concurrent(九)
- Jdk源码阅读之Java.util.concurrent
- JDK源码(FutureTask)——java.util.concurrent(十)
- Java 并发工具包-java.util.concurrent-源码jdk1.7全面解析
- JDK源码——java.util.concurrent(八)
- JDK源码——java.util.concurrent(四)
- JDK源码——java.util.concurrent
- JDK源码——java.util.concurrent(五)
- 《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包
- java类库的阅读笔记_jdk1.7.0_40_java.util.concurrent.locks.LockSupport
- java类库的阅读笔记_jdk1.7.0_40_java.util.concurrent.ConcurrentHashMap
- Java concurrent Framework并发容器之ConcurrentHashMap(Doug Lea 非JDK版)源码分析
- 《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue
- 《java.util.concurrent 包源码阅读》05 BlockingQueue
- 【jdk源码解析三】java.util.Hashtable
- JDK源码分析——Java.util.Vector的浅析
- 关于JDK中的java.util.concurrent.atomic