java编程思想笔记-并发之DelayQueue和PriorityBlockingQueue
2017-08-12 18:02
477 查看
DelayQueue是无界的BlockingQueue,用于防止实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走,这种队列是有序的且队头对象的延迟到期的时间越长。如果没有任何的延迟到期就不会有头元素,并且poll()将返回null
以下两个示例都是典型的队列元素,生产者,消费者示例
示例:
PriorityBlockingQueue示例
以下两个示例都是典型的队列元素,生产者,消费者示例
示例:
class DelayedTask implements Runnable ,Delayed{ private static int counter=0; private final int id=counter++; //延迟的时间 private final int delta; //执行的时间 private final long trigger; protected static List<DelayedTask>queue=new ArrayList<DelayedTask>(); public DelayedTask(int delayInMilliseconds){ delta=delayInMilliseconds; trigger=System.nanoTime()+TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS); queue.add(this); } @Override public int compareTo(Delayed o) { DelayedTask that=(DelayedTask)o; if (trigger<that.trigger) { return -1; } if (trigger>that.trigger) { return 1; } return 0; } //告知延迟到期还有多长时间 @Override public long getDelay(TimeUnit unit) { //将时间转换为调用者希望的时间单位 return unit.convert(trigger-System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public void run() { System.out.println(this+" "); } @Override public String toString() { return String.format("[%1$-4d]", delta) + " Task " + id; } public String summary(){ return "("+id+":"+delta+")"; } public long getTrigger() { return trigger; } //作为延迟队列最后一个执行的任务,并关闭线程池 public static class Endsentinel extends DelayedTask{ private ExecutorService exec; public Endsentinel(int delayInMilliseconds,ExecutorService exec) { super(delayInMilliseconds); this.exec=exec; } @Override public void run(){ for (DelayedTask pt : DelayedTask.queue) { System.out.println(pt.summary()+" "); } System.out.println(); System.out.println(this+" Calling shutdownNow()"); exec.shutdownNow(); } } } class DelayedTaskConsumer implements Runnable{ private DelayQueue<DelayedTask>queue; public DelayedTaskConsumer(DelayQueue<DelayedTask> queue){ this.queue=queue; } @Override public void run() { try { while (!Thread.interrupted()) { queue.take().run(); } } catch (InterruptedException e) { // TODO: handle exception } System.out.println("Finished DelayedTaskConsumer"); } } public class DelayQueueDemo { public static void main(String[] args) { Random rand=new Random(47); ExecutorService exec=Executors.newCachedThreadPool(); DelayQueue<DelayedTask>queue=new DelayQueue<DelayedTask>(); for (int i = 0; i < 20; i++) { int time=rand.nextInt(5000); queue.put(new DelayedTask(time)); } queue.add(new DelayedTask.Endsentinel(5000, exec)); exec.execute(new DelayedTaskConsumer(queue)); } }
PriorityBlockingQueue示例
class PrioritizedTask implements Runnable ,Comparable<PrioritizedTask>{ private Random random=new Random(47); private static int counter=0; private final int id=counter++; private final int priority; protected static List<PrioritizedTask>sequece=new ArrayList<PrioritizedTask>(); public PrioritizedTask(int priority){ sequece.add(this); this.priority=priority; } @Override public int compareTo(PrioritizedTask o) { return priority<o.priority?1:(priority>o.priority?-1:0); } @Override public String toString() { return String.format("[%1$-3d]", priority)+" Task "+id; } public String summary() { return "("+id+":"+priority+")"; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(250)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this); } public static class EndSentinel extends PrioritizedTask{ private ExecutorService exec; public EndSentinel(ExecutorService exec) { super(-1); } @Override public void run(){ int count=0; for (PrioritizedTask prioritizedTask : PrioritizedTask.sequece) { System.out.println(prioritizedTask.summary()); if (++count%5==0) { System.out.println(); } } System.out.println(); System.out.println(this+"Calling shutdownNow()"); exec.shutdownNow(); } } } //生产者 class PrioritizedTaskProducer implements Runnable{ private Random rand=new Random(47); private Queue<Runnable>queue; private ExecutorService exec; public PrioritizedTaskProducer(Queue<Runnable>q,ExecutorService e){ this.queue=q; this.exec=e; } @Override public void run() { for (int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); Thread.yield(); } try { //TimeUnit.SECONDS.sleep(1); for (int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); } for (int i = 0; i < 10; i++) { queue.add(new PrioritizedTask(i)); } queue.add(new PrioritizedTask.EndSentinel(exec)); } catch (InterruptedException e) { // TODO: handle exception } System.out.println("Finshed PrioritizedTask Producer"); } } //消费者 class PrioritizedTaskConsumer implements Runnable{ PriorityBlockingQueue<Runnable>q; public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable>queue){ this.q=queue; } @Override public void run() { try { while (!Thread.interrupted()) { q.take().run(); } } catch (InterruptedException e) { } System.out.println("Finished PrioritizedTaskConsumer"); } } public class PriorityBlockingQueueTest { public static void main(String[] args) { Random rand=new Random(47); ExecutorService exec=Executors.newCachedThreadPool(); PriorityBlockingQueue<Runnable>queue=new PriorityBlockingQueue<Runnable>(); exec.execute(new PrioritizedTaskProducer(queue, exec)); exec.execute(new PrioritizedTaskConsumer(queue)); System.out.println("done"); }
相关文章推荐
- Java并发之BlockingQueue 阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、SynchronousQueue)
- java 编程思想中的PriorityBlockingQueue 部分里对PrioritizedTask的排序没有显式调用collection的sort方法。而是通过take时做的这个动作
- 并发(java编程思想)笔记
- JDK并发工具类源码学习系列——PriorityBlockingQueue
- Java并发学习(二十四)-PriorityBlockingQueue分析
- Java多线程--并发中集合的使用PriorityBlockingQueue
- Java并发编程-31-阻塞式优先级列表-PriorityBlockingQueue
- java编程思想笔记-并发之死锁
- java编程思想笔记-并发之CyclicBarrier
- java编程思想笔记-并发之线程协作(二)
- java编程思想笔记-并发之线程协作(四)
- 并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究
- java编程思想笔记-并发之Synchronized嵌套调用
- 并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究
- Java并发学习笔记(八)-LinkedBlockingQueue
- JDK并发工具类源码学习系列——PriorityBlockingQueue
- Java并发学习笔记(七)-ArrayBlockingQueue
- 并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究
- 【死磕Java并发】-----J.U.C之阻塞队列:PriorityBlockingQueue
- java编程思想笔记-并发之后台线程