您的位置:首页 > 产品设计 > UI/UE

Java并发-类库新组件 - PriorityBlockingQueue 理解

2016-02-26 19:56 429 查看
直接上代码进行理解:

package com.xyw.concurrent.blog;

import java.util.concurrent.*;
import java.util.*;

/*
* 优先队列, 任务按照优先级顺序从队列中出现的任务, PrioritizedTask 被赋予一个优先级数 以此来提供这种顺序
*/
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>{//能放进队列的任务是实现了 Comparable 的情况
private Random rand  = new Random(47);
private static int counter = 0;
private final int id  = counter ++; // 任务的唯一标识
private final int priority;
protected static List<PrioritizedTask> sequence =
new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority){
this.priority = priority;
sequence.add(this); // 用于添加创建的顺序的情况
}
public int compareTo(PrioritizedTask arg){ // 实现 comparable 的固定的接口
return priority < arg.priority ? 1: (priority > arg.priority) ? -1 : 0;
}
public void run(){
try{
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
}catch(InterruptedException e){

}
System.out.println(this);
}
public String toString(){
return String.format("[%1$-3d]", priority) + "Task" + id;
}
public String summary(){
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask{ // 内部类,用于终结的任务
private ExecutorService exec;
public EndSentinel(ExecutorService e){
super(-1);
exec = e ;
}
public void run(){
int count  = 0;
for(PrioritizedTask pt : sequence){
System.out.println(pt.summary());
if(++count % 5 == 0){
System.out.println();
}
}
System.out.println();
System.out.println(this + "Calling shutdownNow()");
exec.shutdownNow();
}
}
}

class PrioritizedTaskProducer implements Runnable{
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e){
queue = q;
exec	 = e ;
}
public void run(){
for(int i  =0; i < 20; i++){
queue.add(new PrioritizedTask(rand.nextInt(10)));// 这里面加入的是一个线程的对象
//Thread.yield(); 这个地方出现了情况,如果加入了Thread.yield(); 这个函数,那么会在第一个出现非优先队列的情况
}
try{
for(int i = 0; i < 10; i++){
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
for(int i = 0; i < 10; i++){
queue.add(new PrioritizedTask(i));
}
queue.add(new PrioritizedTask.EndSentinel(exec));
}catch(InterruptedException e){

}
System.out.println("Finished PrioritizedTaskProducer");
}
}

class PrioritizedTaskConsumer implements Runnable{
private PriorityBlockingQueue<Runnable> q; // 使用了 PriorityBlockingQueue 这个队列的情况
public PrioritizedTaskConsumer( PriorityBlockingQueue<Runnable> q){
this.q= q;
}
public void run(){
try{
while(!Thread.interrupted()){
q.take().run();
}
}catch(InterruptedException	e){

}
System.out.println("Finished PrioritizedTaskConsumer");
}
}

public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception{
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable>	queue = new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec)); // 任务的生产者
exec.execute(new PrioritizedTaskConsumer(queue)); // 任务的消费者
}
}


运行结果,注意 (注释之处的 Thread.yield() )这个加与不加与运行的结果有关

[9 ]Task5

[9 ]Task13

[9 ]Task14

[8 ]Task10

[8 ]Task15

[8 ]Task16

[8 ]Task0

[8 ]Task19

[8 ]Task11

[8 ]Task6

[7 ]Task9

[5 ]Task1

[3 ]Task2

[2 ]Task8

[1 ]Task12

[1 ]Task17

[1 ]Task4

[1 ]Task3

[0 ]Task7

[0 ]Task18

[10 ]Task20

[10 ]Task21

[10 ]Task22

[10 ]Task23

[10 ]Task24

[10 ]Task25

[10 ]Task26

[10 ]Task27

[10 ]Task28

Finished PrioritizedTaskProducer

[10 ]Task29

[9 ]Task39

[8 ]Task38

[7 ]Task37

[6 ]Task36

[5 ]Task35

[4 ]Task34

[3 ]Task33

[2 ]Task32

[1 ]Task31

[0 ]Task30

(0:8)

(1:5)

(2:3)

(3:1)

(4:1)

(5:9)

(6:8)

(7:0)

(8:2)

(9:7)

(10:8)

(11:8)

(12:1)

(13:9)

(14:9)

(15:8)

(16:8)

(17:1)

(18:0)

(19:8)

(20:10)

(21:10)

(22:10)

(23:10)

(24:10)

(25:10)

(26:10)

(27:10)

(28:10)

(29:10)

(30:0)

(31:1)

(32:2)

(33:3)

(34:4)

(35:5)

(36:6)

(37:7)

(38:8)

(39:9)

(40:-1)

[-1 ]Task40Calling shutdownNow()

Finished PrioritizedTaskConsumer
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: