您的位置:首页 > 产品设计 > UI/UE

利用DelayQueue实现延时消息队列(简易版MQ)

2017-06-15 14:54 656 查看

1.上文

关于阻塞队列的介绍:http://blog.csdn.net/caicongyang/article/details/50649897

2.需求

 延迟消息队列:

1)2个小时后给用户发送短信。

2)15分钟后关闭网络连接。

3)2分钟后再次尝试回调。

3.案例demo

Message.Java

[java] view
plain copy

package com.ccy.concurrent;  

  

import java.util.concurrent.Delayed;  

import java.util.concurrent.TimeUnit;  

  

/** 

 * <p>  

 * Title: Message.java  

 * Package com.ccy.concurrent  

 * </p> 

 * <p> 

 * Description: 延迟执行的消息 

 * <p> 

 * @author Tom.Cai 

 * @created 2016-2-10 下午7:39:48  

 * @version V1.0  

 * 

 */  

public class Message implements Delayed{  

        private String id;  

        private String name;  

        private long activeTime;//执行时间     

          

        public Message(){  

              

        }  

          

        public Message(String id, String name,long activeTime) {  

            super();  

            this.id = id;  

            this.name = name;  

            this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS) + System.nanoTime();  

        }  

        public String getId() {  

            return id;  

        }  

        public void setId(String id) {  

            this.id = id;  

        }  

        public String getName() {  

            return name;  

        }  

        public void setName(String name) {  

            this.name = name;  

        }  

  

        @Override  

        public int compareTo(Delayed delayed) {  

            Message msg = (Message)delayed;  

            return Integer.valueOf(this.id)>Integer.valueOf(msg.id)?1:( Integer.valueOf(this.id)<Integer.valueOf(msg.id)?-1:0);  

        }  

  

        @Override  

        public long getDelay(TimeUnit unit) {  

             return unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS);   

        }  

          

          

          

}  

Producer.javaProducer

[java] view
plain copy

package com.ccy.concurrent;  

  

import java.util.concurrent.DelayQueue;  

  

public class Producer implements Runnable{  

    private DelayQueue<Message> queue;  

      

    public Producer(DelayQueue<Message> queue){  

        this.queue = queue;  

    }  

  

    @Override  

    public void run() {  

        //5秒后发送消息  

        Message m2 = new Message("2","Tom",5000);  

        queue.offer(m2);  

        System.out.println("消息生产者往消息队列放置消息:"+m2.getId()+":"+m2.getName());  

        //3秒后发送消息  

        Message m1 = new Message("1","Tom",3000);  

        queue.offer(m1);  

        System.out.println("消息生产者往消息队列放置消息:"+m1.getId()+":"+m1.getName());  

          

    }  

      

  

}  

Consumer.java

[java] view
plain copy

package com.ccy.concurrent;  

  

import java.util.concurrent.DelayQueue;  

  

public class Consumer implements Runnable{  

    private DelayQueue<Message> queue;  

      

    public Consumer(DelayQueue<Message> queue){  

        this.queue = queue;  

    }  

      

    @Override  

    public void run() {  

        while(true){  

            try {  

                Message take = queue.take();  

                System.out.println("消息需求者获取消息:"+take.getId()+":"+take.getName());  

            } catch (InterruptedException e) {  

                e.printStackTrace();  

            }  

        }  

    }  

  

}  

DelayQueueTest.java

[java] view
plain copy

package com.ccy.concurrent;  

  

import java.util.concurrent.DelayQueue;  

  

  

public class DelayQueueTest {  

    public static void main(String[] args) {  

        DelayQueue<Message> queue = new DelayQueue<Message>();  

        new Thread(new Producer(queue)).start();  

        new Thread(new Consumer(queue)).start();  

    }  

}  

4.效果



5.其他学习资源

http://zhangyp.net/rabbitmq-delayqueue/
...

6.后记

作者将学习Apache的ActiveMQ消息总线,其可以支持定时、延迟投递、重复投递和Cron调度。

更多多线程精彩内容请继续关注我的博客:http://blog.csdn.net/caicongyang
记录与分享,你我共成长 -from caicongyang
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: