您的位置:首页 > 产品设计 > UI/UE

java编程思想笔记-并发之DelayQueue和PriorityBlockingQueue

2017-08-12 18:02 477 查看
DelayQueue是无界的BlockingQueue,用于防止实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走,这种队列是有序的且队头对象的延迟到期的时间越长。如果没有任何的延迟到期就不会有头元素,并且poll()将返回null

以下两个示例都是典型的队列元素,生产者,消费者示例

示例:

class DelayedTask implements Runnable ,Delayed{
private static int counter=0;
private final int id=counter++;
//延迟的时间
private final int delta;
//执行的时间
private final long trigger;
protected static List<DelayedTask>queue=new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds){
delta=delayInMilliseconds;
trigger=System.nanoTime()+TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
queue.add(this);
}
@Override
public int compareTo(Delayed o) {
DelayedTask that=(DelayedTask)o;
if (trigger<that.trigger) {
return -1;
}
if (trigger>that.trigger) {
return 1;
}
return 0;
}

//告知延迟到期还有多长时间
@Override
public long getDelay(TimeUnit unit) {
//将时间转换为调用者希望的时间单位
return unit.convert(trigger-System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public void run() {
System.out.println(this+" ");
}

@Override
public String toString() {
return String.format("[%1$-4d]", delta) +
" Task " + id;
}

public String summary(){
return "("+id+":"+delta+")";
}

public long getTrigger() {
return trigger;
}

//作为延迟队列最后一个执行的任务,并关闭线程池
public static class Endsentinel extends DelayedTask{
private ExecutorService exec;
public Endsentinel(int delayInMilliseconds,ExecutorService exec) {
super(delayInMilliseconds);
this.exec=exec;
}
@Override
public void run(){
for (DelayedTask pt : DelayedTask.queue) {
System.out.println(pt.summary()+" ");
}
System.out.println();
System.out.println(this+" Calling shutdownNow()");
exec.shutdownNow();
}
}
}

class DelayedTaskConsumer implements Runnable{
private DelayQueue<DelayedTask>queue;
public DelayedTaskConsumer(DelayQueue<DelayedTask> queue){
this.queue=queue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
queue.take().run();
}
} catch (InterruptedException e) {
// TODO: handle exception
}
System.out.println("Finished DelayedTaskConsumer");
}
}

public class DelayQueueDemo {
public static void main(String[] args) {
Random rand=new Random(47);
ExecutorService exec=Executors.newCachedThreadPool();
DelayQueue<DelayedTask>queue=new DelayQueue<DelayedTask>();
for (int i = 0; i < 20; i++) {
int time=rand.nextInt(5000);
queue.put(new DelayedTask(time));
}
queue.add(new DelayedTask.Endsentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}


PriorityBlockingQueue示例

class PrioritizedTask implements Runnable ,Comparable<PrioritizedTask>{

private Random random=new Random(47);
private static int counter=0;
private final int id=counter++;
private final int priority;
protected static List<PrioritizedTask>sequece=new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority){
sequece.add(this);
this.priority=priority;
}

@Override
public int compareTo(PrioritizedTask o) {
return priority<o.priority?1:(priority>o.priority?-1:0);
}

@Override
public String toString() {
return String.format("[%1$-3d]", priority)+" Task "+id;
}

public String summary() {
return "("+id+":"+priority+")";
}

@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this);
}

public static class EndSentinel extends PrioritizedTask{
private ExecutorService exec;
public EndSentinel(ExecutorService exec) {
super(-1);
}

@Override
public void run(){
int count=0;
for (PrioritizedTask prioritizedTask : PrioritizedTask.sequece) {
System.out.println(prioritizedTask.summary());
if (++count%5==0) {
System.out.println();
}
}
System.out.println();
System.out.println(this+"Calling shutdownNow()");
exec.shutdownNow();
}
}
}
//生产者
class PrioritizedTaskProducer implements Runnable{
private Random rand=new Random(47);
private Queue<Runnable>queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable>q,ExecutorService e){
this.queue=q;
this.exec=e;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
try {
//TimeUnit.SECONDS.sleep(1);
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
for (int i = 0; i < 10; i++) {
queue.add(new PrioritizedTask(i));
}
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {
// TODO: handle exception
}
System.out.println("Finshed PrioritizedTask Producer");
}
}
//消费者
class PrioritizedTaskConsumer implements Runnable{
PriorityBlockingQueue<Runnable>q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable>queue){
this.q=queue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
q.take().run();
}
} catch (InterruptedException e) {

}
System.out.println("Finished PrioritizedTaskConsumer");
}
}

public class PriorityBlockingQueueTest {
public static void main(String[] args) {
Random rand=new Random(47);
ExecutorService exec=Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable>queue=new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
System.out.println("done");
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: