第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)
2017-07-17 16:50
483 查看
1.ConcurrentHashMap
1)HashMap多线程put死循环
HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
2)HashTable效率低下
3)ConcurrentHashMap的锁分段技术可有效提升并发访问率
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁 (ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个
ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。
2. ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现
3.BlockingQueue(阻塞队列)
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从
DelayQueue中获取元素时,表示缓存有效期到了。
·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始 执行,比如TimerQueue就是使用DelayQueue实现的
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列的实现原理:使用通知模式实现。
以ArrayBlockingQueue为例,其使用了Condition来实现
ConditionObject中await()实现为:
当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过
LockSupport.park(this)来实现。
4.Fork/Join框架
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
1)工作窃取算法
2)Fork/Join框架的设计
步骤1分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据
Fork/Join使用两个类来完成以上两件事情。
①ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
·RecursiveAction:用于没有返回结果的任务。
·RecursiveTask:用于有返回结果的任务。
②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
斐波那契数列Java实现
递归实现:
Fork/Join 实现
结果如下:
Fork/Join实现耗时有一定的提升。
非递归的方式实现,耗时0秒,递归代码是简单,但是不一定最优
1)HashMap多线程put死循环
HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | package com.tlk.chapter6; import java.util.HashMap; import java.util.UUID; /** * 在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100% * @author tanlk * @date 2017年5月17日下午7:39:18 */ public class HashMapTest { public static void main(String[] args) throws InterruptedException { final HashMap<String, String> map = new HashMap<String, String>( 2 ); Thread t = new Thread( new Runnable() { @Override public void run() { for ( int i = 0 ; i < 10000 ; i++) { new Thread( new Runnable() { @Override public void run() { map.put(UUID.randomUUID().toString(), "" ); } }, "ftf" + i).start(); } } }, "ftf" ); t.start(); t.join(); //等待该线程终止。终止才会执行main结束 } } |
3)ConcurrentHashMap的锁分段技术可有效提升并发访问率
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁 (ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个
ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。
2. ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现
3.BlockingQueue(阻塞队列)
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从
DelayQueue中获取元素时,表示缓存有效期到了。
·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始 执行,比如TimerQueue就是使用DelayQueue实现的
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列的实现原理:使用通知模式实现。
以ArrayBlockingQueue为例,其使用了Condition来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | private final Condition notFull; private final Condition notEmpty; public ArrayBlockingQueue( int capacity, boolean fair) { // 省略其他代码 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } |
LockSupport.park(this)来实现。
1 2 3 4 5 6 | public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park( false , 0L); setBlocker(t, null ); } |
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
1)工作窃取算法
2)Fork/Join框架的设计
步骤1分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据
Fork/Join使用两个类来完成以上两件事情。
①ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
·RecursiveAction:用于没有返回结果的任务。
·RecursiveTask:用于有返回结果的任务。
②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
斐波那契数列Java实现
递归实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | package com.tlk.chapter6; import java.util.concurrent.TimeUnit; public class FibCalculate { public static long Calculate( long n){ if (n < 0 ) { return 0 ; } if (n == 0 || n == 1 ) { return n; } else { return Calculate(n- 1 )+Calculate(n- 2 ); } } public static void main(String[] args) { long n= 50 ; long begin = System.nanoTime(); long f = FibCalculate.Calculate(n); long end = System.nanoTime(); System.out.println( "第" + n + "个斐波那契数是" + f + ", 耗时" + TimeUnit.NANOSECONDS.toMillis(end - begin) + "毫秒" ); } } |
第 50 个斐波那契数是 12586269025 , 耗时 80536 毫秒 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | package com.tlk.chapter6; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; /** * forkJoin实现斐波那契数列 * @author tanlk * @date 2017年5月23日下午5:43:59 */ public class FibTask extends RecursiveTask<Long> { long n; public FibTask( long n) { this .n = n; } public Long compute() { if (n <= 10 ) { // 小于10不再分解 return FibTask.calc(n); } FibTask f1 = new FibTask(n - 1 ); // 分解出计算n-1斐波那契数的子任务 f1.fork(); // 由ForkJoinPool分配线程执行子任务 FibTask f2 = new FibTask(n - 2 ); // 分解出计算n-2斐波那契数的子任务 return f2.compute() + f1.join(); } public static long calc( long n) { if (n < 0 ) { return 0 ; } if (n == 0 || n == 1 ) { return n; } else { return calc(n - 1 ) + calc(n - 2 ); } } public static void main(String[] args) { long n = 50 ; long begin = System.nanoTime(); FibTask FibTask = new FibTask(n); ForkJoinPool pool = new ForkJoinPool(); long f = pool.invoke(FibTask); long end = System.nanoTime(); System.out.println( "第" + n + "个斐波那契数是" + f + ", 耗时" + TimeUnit.NANOSECONDS.toMillis(end - begin) + "毫秒" ); } } |
第 50 个斐波那契数是 12586269025 , 耗时 45037 毫秒 |
非递归的方式实现,耗时0秒,递归代码是简单,但是不一定最优
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | //非递归的方式改写,过程中每个数只计算一次。 public static long calcWithoutRecursion( long n) { if (n < 0 ) return 0 ; if (n == 0 || n == 1 ) { return n; } long fib = 0 ; long fibOne = 1 ; long fibTwo = 1 ; for ( long i = 2 ; i < n; i++) { fib = fibOne + fibTwo; fibTwo = fibOne; fibOne = fib; } return fib; } |
相关文章推荐
- Java 并发容器和框架--ConcurrentLinkedQueue
- Java并发容器ConcurrentHashMap、ConcurrentLinkedQueue、BlockingQueue等
- 【死磕Java并发】-----J.U.C之Java并发容器:ConcurrentLinkedQueue
- 深入浅出 Java Concurrency (20): 并发容器 part 5 ConcurrentLinkedQueue
- Java:concurrent包下面的Collection接口框架图( CopyOnWriteArraySet, CopyOnWriteArrayList,ConcurrentLinkedQueue,BlockingQueue)
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- java挑战高并发(14): LinkedBlockingQueue和ConcurrentLinkedQueue的区别及用法
- Java并发容器之ConcurrentLinkedQueue
- Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors
- Java并发容器——ConcurrentLinkedQueue
- Java并发容器之非阻塞队列ConcurrentLinkedQueue
- Java并发容器:ConcurrentLinkedQueue
- java 非阻塞算法在并发容器中的实现(ConcurrentLinkedQueue源码)
- Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors
- java并发编程的艺术笔记第六章——java并发容器和框架
- java-并发集合-并发队列 ConcurrentLinkedQueue 演示
- 深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
- java多线程-专题-聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析
- Java并发包探秘 (一) ConcurrentLinkedQueue