您的位置:首页 > 产品设计 > UI/UE

精巧好用的DelayQueue

2014-02-16 13:04 288 查看



我们谈一下实际的场景吧。我们在开发中,有如下场景

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

一种笨笨的办法就是,使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。

这场景,使用DelayQueue最适合了。

DelayQueue是java.util.concurrent中提供的一个很有意思的类。很巧妙,非常棒!但是java doc和Java SE 5.0的source中都没有提供Sample。我最初在阅读ScheduledThreadPoolExecutor源码时,发现DelayQueue的妙用。随后在实际工作中,应用在session超时管理,网络应答通讯协议的请求超时处理。

本文将会对DelayQueue做一个介绍,然后列举应用场景。并且提供一个Delayed接口的实现和Sample代码。

DelayQueue是一个BlockingQueue,其特化的参数是Delayed。(不了解BlockingQueue的同学,先去了解BlockingQueue再看本文)

Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

他们的基本定义如下

public interface Comparable<T> {

public int compareTo(T o);

}

public interface Delayed extends Comparable<Delayed> {

long getDelay(TimeUnit unit);

}

public class DelayQueue<E extends Delayed> implements BlockingQueue<E> {

private final PriorityQueue<E> q = new PriorityQueue<E>();

}

DelayQueue内部的实现使用了一个优先队列。当调用DelayQueue的offer方法时,把Delayed对象加入到优先队列q中。如下:

public boolean offer(E e) {

final ReentrantLock lock = this.lock;

lock.lock();

try {

E first = q.peek();

q.offer(e);

if (first == null || e.compareTo(first) < 0)

available.signalAll();

return true;

} finally {

lock.unlock();

}

}

DelayQueue的take方法,把优先队列q的first拿出来(peek),如果没有达到延时阀值,则进行await处理。如下:

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

for (;;) {

E first = q.peek();

if (first == null) {

available.await();

} else {

long delay = first.getDelay(TimeUnit.NANOSECONDS);

if (delay > 0) {

long tl = available.awaitNanos(delay);

} else {

E x = q.poll();

assert x != null;

if (q.size() != 0)

available.signalAll(); // wake up other takers

return x;

}

}

}

} finally {

lock.unlock();

}

}

-------------------

以下是Sample,是一个缓存的简单实现。共包括三个类Pair、DelayItem、Cache。如下:

public class Pair<K, V> {

public K first;

public V second;

public Pair() {}

public Pair(K first, V second) {

this.first = first;

this.second = second;

}

}

--------------

以下是Delayed的实现

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicLong;

public class DelayItem<T> implements Delayed {

/** Base of nanosecond timings, to avoid wrapping */

private static final long NANO_ORIGIN = System.nanoTime();

/**

* Returns nanosecond time offset by origin

*/

final static long now() {

return System.nanoTime() - NANO_ORIGIN;

}

/**

* Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied

* entries.

*/

private static final AtomicLong sequencer = new AtomicLong(0);

/** Sequence number to break ties FIFO */

private final long sequenceNumber;

/** The time the task is enabled to execute in nanoTime units */

private final long time;

private final T item;

public DelayItem(T submit, long timeout) {

this.time = now() + timeout;

this.item = submit;

this.sequenceNumber = sequencer.getAndIncrement();

}

public T getItem() {

return this.item;

}

public long getDelay(TimeUnit unit) {

long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);

return d;

}

public int compareTo(Delayed other) {

if (other == this) // compare zero ONLY if same object

return 0;

if (other instanceof DelayItem) {

DelayItem x = (DelayItem) other;

long diff = time - x.time;

if (diff < 0)

return -1;

else if (diff > 0)

return 1;

else if (sequenceNumber < x.sequenceNumber)

return -1;

else

return 1;

}

long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));

return (d == 0) ? 0 : ((d < 0) ? -1 : 1);

}

}

以下是Cache的实现,包括了put和get方法,还包括了可执行的main函数。

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentMap;

import java.util.concurrent.DelayQueue;

import java.util.concurrent.TimeUnit;

import java.util.logging.Level;

import java.util.logging.Logger;

public class Cache<K, V> {

private static final Logger LOG = Logger.getLogger(Cache.class.getName());

private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>();

private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>();

private Thread daemonThread;

public Cache() {

Runnable daemonTask = new Runnable() {

public void run() {

daemonCheck();

}

};

daemonThread = new Thread(daemonTask);

daemonThread.setDaemon(true);

daemonThread.setName("Cache Daemon");

daemonThread.start();

}

private void daemonCheck() {

if (LOG.isLoggable(Level.INFO))

LOG.info("cache service started.");

for (;;) {

try {

DelayItem<Pair<K, V>> delayItem = q.take();

if (delayItem != null) {

// 超时对象处理

Pair<K, V> pair = delayItem.getItem();

cacheObjMap.remove(pair.first, pair.second); // compare and remove

}

} catch (InterruptedException e) {

if (LOG.isLoggable(Level.SEVERE))

LOG.log(Level.SEVERE, e.getMessage(), e);

break;

}

}

if (LOG.isLoggable(Level.INFO))

LOG.info("cache service stopped.");

}

// 添加缓存对象

public void put(K key, V value, long time, TimeUnit unit) {

V oldValue = cacheObjMap.put(key, value);

if (oldValue != null)

q.remove(key);

long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);

q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime));

}

public V get(K key) {

return cacheObjMap.get(key);

}

// 测试入口函数

public static void main(String[] args) throws Exception {

Cache<Integer, String> cache = new Cache<Integer, String>();

cache.put(1, "aaaa", 3, TimeUnit.SECONDS);

Thread.sleep(1000 * 2);

{

String str = cache.get(1);

System.out.println(str);

}

Thread.sleep(1000 * 2);

{

String str = cache.get(1);

System.out.println(str);

}

}

}

运行Sample,main函数执行的结果是输出两行,第一行为aaa,第二行为null。
转自:/article/4729485.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: