您的位置:首页 > 产品设计 > UI/UE

第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)

2017-07-17 16:50 483 查看
1.ConcurrentHashMap

    1)HashMap多线程put死循环
    HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
    
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30
package
 
com.tlk.chapter6;

 
import
 
java.util.HashMap;

import
 
java.util.UUID;

 
/**

 
* 在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%

 
* @author tanlk

 
* @date 2017年5月17日下午7:39:18

 
*/

public
 
class
 
HashMapTest {

    
public
 
static
 
void
 
main(String[] args) 
throws
 
InterruptedException {

        
final
 
HashMap<String, String> map = 
new
 
HashMap<String, String>(
2
);

        
Thread t = 
new
 
Thread(
new
 
Runnable() {

            
@Override

            
public
 
void
 
run() {

                
for
 
(
int
 
i = 
0
; i < 
10000
; i++) {

                    
new
 
Thread(
new
 
Runnable() {

                        
@Override

                        
public
 
void
 
run() {

                            
map.put(UUID.randomUUID().toString(), 
""
);

                        
}

                    
}, 
"ftf"
 
+ i).start();

                
}

            
}

        
}, 
"ftf"
);

        
t.start();

        
t.join();
//等待该线程终止。终止才会执行main结束

    
}

}


    2)HashTable效率低下
    3)ConcurrentHashMap的锁分段技术可有效提升并发访问率
   ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁    (ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个
ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。

2. ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现

3.BlockingQueue(阻塞队列)

    ·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
    ·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
    ·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
    ·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
            ·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从                              
              DelayQueue中获取元素时,表示缓存有效期到了。
            ·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始                                      执行,比如TimerQueue就是使用DelayQueue实现的
    ·SynchronousQueue:一个不存储元素的阻塞队列。
    ·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    ·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    阻塞队列的实现原理:使用通知模式实现。
     以ArrayBlockingQueue为例,其使用了Condition来实现
    
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43
private
 
final
 
Condition notFull;

    
private
 
final
 
Condition notEmpty;

 
    
public
 
ArrayBlockingQueue(
int
 
capacity, 
boolean
 
fair) {

    
// 省略其他代码

        
notEmpty = lock.newCondition();

        
notFull = lock.newCondition();

    
}

 
    
public
 
void
 
put(E e) 
throws
 
InterruptedException {

        
checkNotNull(e);

        
final
 
ReentrantLock lock = 
this
.lock;

        
lock.lockInterruptibly();

        
try
 
{

            
while
 
(count == items.length)

                
notFull.await();

            
insert(e);

        
finally
 
{

            
lock.unlock();

        
}

    
}

 
    
p
ublic 
E take() 
throws
 
InterruptedException {

        
final
 
ReentrantLock lock = 
this
.lock;

        
lock.lockInterruptibly();

        
try
 
{

            
while
 
(count == 
0
)

                
notEmpty.await();

            
return
 
extract();

        
finally
 
{

            
lock.unlock();

        
}

    
}

 
    
private
 
void
 
insert(E x) {

        
items[putIndex] = x;

        
putIndex = inc(putIndex);

        
++count;

        
notEmpty.signal();

    
}


ConditionObject中await()实现为:
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18
 
public
 
final
 
void
 
await() 
throws
 
InterruptedException {

            
if
 
(Thread.interrupted())

                
throw
 
new
 
InterruptedException();

            
Node node = addConditionWaiter();

            
int
 
savedState = fullyRelease(node);

            
int
 
interruptMode = 
0
;

            
while
 
(!isOnSyncQueue(node)) {

                
LockSupport.park(
this
);

                
if
 
((interruptMode = checkInterruptWhileWaiting(node)) != 
0
)

                    
break
;

            
}

            
if
 
(acquireQueued(node, savedState) && interruptMode != THROW_IE)

                
interruptMode = REINTERRUPT;

            
if
 
(node.nextWaiter != 
null
// clean up if cancelled

                
unlinkCancelledWaiters();

            
if
 
(interruptMode != 
0
)

                
reportInterruptAfterWait(interruptMode);

        
}


        当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过
LockSupport.park(this)来实现。

1

2

3

4

5

6
  
public
 
static
 
void
 
park(Object blocker) {

        
Thread t = Thread.currentThread();

        
setBlocker(t, blocker);

        
UNSAFE.park(
false
, 0L);

        
setBlocker(t, 
null
);

    
}


4.Fork/Join框架

    Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

    1)工作窃取算法



    2)Fork/Join框架的设计

步骤1分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据

Fork/Join使用两个类来完成以上两件事情。
①ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
        ·RecursiveAction:用于没有返回结果的任务。
        ·RecursiveTask:用于有返回结果的任务。
②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

斐波那契数列Java实现
递归实现:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24
package
 
com.tlk.chapter6;

 
import
 
java.util.concurrent.TimeUnit;

 
public
 
class
 
FibCalculate {

    
public
 
static
 
long
 
Calculate(
long
 
n){

        
if
(n < 
0
) {  

            
return
 
0
;  

        
}  

        
if
(n == 
0
 
|| n == 
1
) {  

            
return
 
n;

        
}
else
 
{

            
return
 
Calculate(n-
1
)+Calculate(n-
2
);

        
}

    
}

    
 
    
public
 
static
 
void
 
main(String[] args) {

        
long
 
n=
50
;

        
long
 
begin = System.nanoTime();

        
long
 
f = FibCalculate.Calculate(n);

        
long
 
end = System.nanoTime();

        
System.out.println(
"第"
 
+ n + 
"个斐波那契数是"
 
+ f + 
", 耗时"
 
+ TimeUnit.NANOSECONDS.toMillis(end - begin) + 
"毫秒"
);

    
}

}


50
个斐波那契数是
12586269025
, 耗时
80536
毫秒


Fork/Join 实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50
package
 
com.tlk.chapter6;

 
import
 
java.util.concurrent.ForkJoinPool;

import
 
java.util.concurrent.RecursiveTask;

import
 
java.util.concurrent.TimeUnit;

 
/**

 
* forkJoin实现斐波那契数列

 
* @author tanlk

 
* @date 2017年5月23日下午5:43:59

 
*/

public
 
class
 
FibTask 
extends
 
RecursiveTask<Long> {

    
long
 
n;

 
    
public
 
FibTask(
long
 
n) {

        
this
.n = n;

    
}

 
    
public
 
Long compute() {

        
if
 
(n <= 
10
) { 
// 小于10不再分解

            
return
 
FibTask.calc(n);

        
}

        
FibTask f1 = 
new
 
FibTask(n - 
1
); 
// 分解出计算n-1斐波那契数的子任务

        
f1.fork(); 
// 由ForkJoinPool分配线程执行子任务

        
FibTask f2 = 
new
 
FibTask(n - 
2
); 
// 分解出计算n-2斐波那契数的子任务

        
return
 
f2.compute() + f1.join();

    
}

 
    
public
 
static
 
long
 
calc(
long
 
n) {

        
if
 
(n < 
0
) {

            
return
 
0
;

        
}

        
if
 
(n == 
0
 
|| n == 
1
) {

            
return
 
n;

        
else
 
{

            
return
 
calc(n - 
1
) + calc(n - 
2
);

        
}

    
}

 
    
public
 
static
 
void
 
main(String[] args) {

        
long
 
n = 
50
;

        
long
 
begin = System.nanoTime();

        
FibTask FibTask = 
new
 
FibTask(n);

        
ForkJoinPool pool = 
new
 
ForkJoinPool();

        
long
 
f = pool.invoke(FibTask);

        
long
 
end = System.nanoTime();

        
System.out.println(
"第"
 
+ n + 
"个斐波那契数是"
 
+ f + 
", 耗时"
 
+ TimeUnit.NANOSECONDS.toMillis(end - begin) + 
"毫秒"
);

    
}

 
}


结果如下:

50
个斐波那契数是
12586269025
, 耗时
45037
毫秒


Fork/Join实现耗时有一定的提升。

非递归的方式实现,耗时0秒,递归代码是简单,但是不一定最优

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17
//非递归的方式改写,过程中每个数只计算一次。

    
public
 
static
 
long
 
calcWithoutRecursion(
long
 
n) {  

        
if
(n < 
0
)  

            
return
 
0
;  

        
if
(n == 
0
 
|| n == 
1
) {  

            
return
 
n;  

        
}  

        
long
 
fib = 
0
;  

        
long
 
fibOne = 
1
;  

        
long
 
fibTwo = 
1
;  

        
for
(
long
 
i = 
2
; i < n; i++) {  

            
fib = fibOne + fibTwo;  

            
fibTwo = fibOne;  

            
fibOne = fib;  

        
}  

        
return
 
fib;  

    
}
 


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java juc