您的位置:首页 > 编程语言 > Java开发

jdk 源码分析(11)java ArrayBlockingQueue 缓存队列分析

2017-08-03 22:29 926 查看
队列没有什么,如图(图片都是网上找的),分为头和尾,都是FIFO(先进先出),用数组和链表都能存储数据,数组当poll 数据后,需要整体移位(当然循环数组也是可以不移位的。),链表就方便很多。插入时在头部添加一个,删除是在尾部直接删除,



一般的队列基本操作就是,添加,删除,

添加:如果队列满了,就直接返回线程满了,如果没满肯定就直接插入,

提取数据:如果有数据,立即返回,如果没有,也直接返回null。

如果队列作为多个线程的共享,当数据满了的时候,数据不能丢失,所以必须等待,等待到队列被其他线程使用,同理,线程提取的时候,也必须等待,

这样就队列里需要唤醒工具。

查看源代码:定义了重入锁,并且定义两个condition

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;


初始化需要指定队列长度:

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}


里面定义了两个非阻塞的插入和提取:

插入:当满了时候i直接返回false ,不做其他操作。

public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}


提取:当空时,直接返回null。

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}


阻塞插入:当数据满了的时候,notFull进入等待。

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}


阻塞提取:同样,空的时候进入等待。

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}


那什么时候唤醒呢?????

notEmpty 到插入数据的时候唤醒。

private void enqueue(E x) {

// assert lock.getHoldCount() == 1;

// assert items[putIndex] == null;

final Object[] items = this.items;

items[putIndex] = x;

if (++putIndex == items.length)

putIndex = 0;

count++;

        //有数据进去了,不再空了,可以唤醒了

notEmpty.signal();

}


private E dequeue() {

// assert lock.getHoldCount() == 1;

// assert items[takeIndex] != null;

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

items[takeIndex] = null;

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

        //有数据提取了,不再是满的了

notFull.signal();

return x;

}


过程很简单。另外还有一种操作。

如果没有数据,会等待一会,如果一会还是没有数据,就返回空。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {

long nanos = unit.toNanos(timeout);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == 0) {

if (nanos <= 0)

return null;

    //等待定时的时间长度,如果时间过了,还是没有数据,就返回空

nanos = notEmpty.awaitNanos(nanos);

}

return dequeue();

} finally {

lock.unlock();

}

}


好了缓存队列很简单,如果不去看源代码觉得很高深,其实里面什么都没有。。。对吧 

其实我们可以用同步+wait + notify 也可以自定义一个缓存队列,还有就是现在开发中其实大部分时候会用工具类。比如订阅者和发布者模式,队列。比如redis 比如RabbitMq 等。hbase 用 distropter 等作为队列。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  jdk 源代码