您的位置:首页 > 其它

一种自动定时更新缓存值的缓存实现

2012-10-29 10:47 246 查看
       前不久,在项目中碰到这样的一个需求:要求在服务初始化的时候把数据库持久化的一些DO对象存入到缓存中,然后每隔指定的时间间隔刷新缓存。同时后台可以让维护人员根据需要手动清除缓存和刷新缓存。由于此需求所应用的缓存规模较小,所以当初就不太想用一些memcached等的缓存数据库,觉得有些过重了。类似的可作缓存使用的mongodb等nosql数据库也是由于附加的应用成本太高被排除。后面自己琢磨着是否针对这个具体的小需求写一个缓存实现。由于是在同一JVM里面,性能问题不是太大的瓶颈,主要是在并发方面考虑了一下,后面参考了jdk5并发包中的ConcurrentHashMap的思想,自己实现了一个简单的缓存。记录一下。

 

package com.lee.framework.common.cache;

import java.util.concurrent.locks.ReentrantLock;

/**
* A cache implementation can update the cached value automatically which is thread safe.
* you can specified a cache timeout time and a instance of {@link Updater}
* to determine the update action.<br/><br/>
*
* <var>cacheTimeout</var> determines the cached value whether expired or not.<br/>
* <var>updater</var> provides a {@link Updater} implementation to complete the update
* action.<br/><br/>
* besides, there have some customized arguments to satisfy the performance requirement, like follows:<br/><br/>
* <var>initialCapacity</var> provides a way to specify cache initial capacity to avoid frequently capacity
* expansion.<br/>
* <var>concurrencyLevel</var> provides a way to specify the estimated number of concurrently operation
* threads for concurrent performance.<br/><br/>
*
* <b>note:</b> when you get the cached value from this cache by a <var>key</var>, if detect the mapped cache
* value expired, it first acquires a lock, then invoke <var>updater</var> to update the cached value. if this
* operation spend too much time, performance is very poor, so you must avoid it.<br/><br/>
*
* <b>note:</b> when you get the cached value from this cache by a <var>key</var>, if no cache entry mapped the
* <var>key</var> was found, this cache didn't use the <var>updater</var> to automatically update cached value.
* so you must explicitly invoke {@link #set(Object, Object)} to add a cached value mapped for <var>key</var> first,
* then this cache can automatically update it.
*
* @author lee
*
*/
public class AutoUpdateCache {

/** The default initial capacity for this cache **/
private static final int DEFAULT_INITIAL_CAPACITY = 0x10;

/** The default load factor for this cache **/
private static final float DEFAULT_LOAD_FACTOR = 0.75f;

/** The default concurrency level for this cache **/
private static final int DEFAULT_CONCURRENCY_LEVEL = 0x10;

/** The maximum capacity used to ensure that entries are indexable by int type **/
private static final int MAX_CAPACITY = (int) ((1L << 31) - 1);

/** The maximum number of segment permitted for this cache **/
private static final int MAX_SEGMENTS = 1 << 16;

/** encode a <code>null</code> cached value **/
private static final Object NULL = new Object();

/**
* Mask value for indexing into segments.
* The upper bits of a key's hash code are used to choose the segment.
*/
final int segmentMask;

/** Shift value for indexing within segments */
final int segmentShift;

/** each element is a hash table to store the cached value **/
final Segment[] segments;

/** cached value effective time interval **/
final long cacheTimeInterval;

/** updater for cached value **/
final Updater updater;

/** copy from jdk(ConcurrentHashMap)
* Applies a supplemental hash function to a given hashCode, which
* defends against poor quality hash functions.
*/
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h <<  15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h <<   3);
h ^= (h >>>  6);
h += (h <<   2) + (h << 14);
return h ^ (h >>> 16);
}

/** copy from jdk(ConcurrentHashMap)
* Returns the segment that should be used for key with given hash
* @param hash the hash code for the key
* @return the segment
*/
final Segment segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

static final class Entry {
final Object key;
final int hash;
final Entry next;
long preOperationTime;
Object value;

/** use current system time for cache value's effective time **/
Entry(Object key, int hash, Object value, Entry next) {
this(key, hash, value, System.currentTimeMillis(), next);
}

Entry(Object key, int hash, Object value, long preOperationTime, Entry next) {
this.key = key;
this.hash = hash;
this.next = next;
this.preOperationTime = preOperationTime;
this.value = value;
}
}

final class Segment extends ReentrantLock {
private static final long serialVersionUID = 4301265348067449004L;

int threshold;
volatile int count;
volatile Entry[] values;

Segment(int initialCapacity) {
threshold = (int) (initialCapacity * DEFAULT_LOAD_FACTOR);
values = new Entry[initialCapacity];
}

Entry head(int hash) {
Entry[] oldValues = values;
return oldValues[hash & (oldValues.length - 1)];
}

/** encode <code>null</code> to {@link AutoUpdateCache#NULL} Object **/
Object encode(Object value) {
if(value == null) {
value = NULL;
}
return value;
}

/** decode {@link AutoUpdateCache#NULL} Object to <code>null</code> **/
Object decode(Object value) {
if(value == NULL) {
value = null;
}
return value;
}

/**
* return the cached value mapped the <code>key</code> and <code>hash</code>.
* if no mapping, return <code>null</code>; if mapping exists and cached value didn't expired,
* return the cached value; otherwise, invoke the {@link AutoUpdateCache#updater} to update
* the value and return it.
* @param key
* @param hash
* @return
*/
Object get(Object key, int hash) {
if(count != 0) {
Entry e = head(hash);
while(e != null) {
if(e.hash == hash && e.key.equals(key)) {
long effectiveTimePoint = System.currentTimeMillis() - cacheTimeInterval;

Object value = null;
lock();
try {
if(e.preOperationTime <= effectiveTimePoint) {
e.value = encode(updater.update(e.key));
e.preOperationTime = System.currentTimeMillis();
}else {
value = e.value;
}
}catch(Throwable t) {
/* catch exception from updater, don't update the cache.
* so you may get the expired cached value.
*/
}finally {
unlock();
}

return decode(value);
}
}
}

return null;
}

/**
* set the cached value mapped the <code>key</code> and <code>hash</code> to <code>newValue</code>.
* if no mapping, add a new cache entry which has <code>key</code>, <code>hash</code> and <code>newValue</code>;
* otherwise, replace the old cached value.
* <b>note:</b> at the same time, cached value effective time has been updated.<br/>
* @param key
* @param hash
* @param newValue
bce0
*/
void set(Object key, int hash, Object newValue) {
lock();
try {
Entry[] oldValues = values;
int index = hash & (oldValues.length - 1);
Entry e = oldValues[index];
for(; e != null && (e.hash != hash || !e.key.equals(key)); e = e.next);
if(e != null) {
e.value = encode(newValue);	// update value
e.preOperationTime = System.currentTimeMillis();	// expand the effective time
}else {	// put a new cache entry
int c = count;
if(c++ > threshold) {
rehash();
}
Entry head = head(hash);
oldValues[index] = new Entry(key, hash, newValue, head);
count = c;
}
}finally {
unlock();
}
}

/**
* while cache entry number extends the {@link #threshold}, expand the cache capacity.
* this method must run with a lock to keep the data consistency.
**/
void rehash() {
Entry[] oldValues = values;
int oldCapacity = oldValues.length;
if(oldCapacity >= MAX_CAPACITY) { return; }

/*
* copy from jdk(ConcurrentHashMap)
* Reclassify nodes in each list to new Map
*/
int newCapacity = oldCapacity << 1;
if(newCapacity > MAX_CAPACITY) {
newCapacity = MAX_CAPACITY;
}
Entry[] newValues = new Entry[newCapacity];
threshold = (int) (newCapacity * DEFAULT_LOAD_FACTOR);
int indexMask = newCapacity - 1;
for(int i=0; i<oldCapacity; i++) {
/* We need to guarantee that any existing reads of old Map can
* proceed. So we cannot yet null out each bin.
*/
Entry e = oldValues[i];
if(e != null) {
Entry next = e.next;
int index = e.hash & indexMask;

if(next == null) {
newValues[index] = e;
}else {
// Reuse trailing consecutive sequence at same slot
Entry lastSeq = e;
int lastSeqIndex = index;
for(Entry entry = next; entry != null; entry = entry.next) {
int idx = entry.hash & indexMask;
if(idx != lastSeqIndex) {
lastSeq = entry;
lastSeqIndex = idx;
}
}
newValues[lastSeqIndex] = lastSeq;

// Clone all remaining nodes
for(Entry entry = e; entry != lastSeq; entry = entry.next) {
int idx = entry.hash & indexMask;
Entry temp = newValues[idx];
newValues[idx] = new Entry(entry.key, entry.hash, entry.value, temp);
}
}
}
}

values = newValues;
}

void remove(Object key, int hash) {
lock();
try {
int c = count - 1;
Entry[] oldValues = values;
int index = hash & (oldValues.length - 1);
Entry head = oldValues[index];
Entry e = head;
for(; e != null && (e.hash != hash || !e.key.equals(key)); e = e.next);
if(e != null) {	// remove the cache entry
Entry newHead = e.next;
for(Entry p = head; p != e; p = p.next) {
newHead = new Entry(p.key, p.hash, p.value, newHead);
}
oldValues[index] = newHead;
count = c;
}
}finally {
unlock();
}
}

void removeAll() {
lock();
try {
Entry[] oldValues = values;
for(int i=0; i<oldValues.length; i++) {
oldValues[i] = null;
}
count = 0;
}finally {
unlock();
}
}
}

/* ----------------------- exposed method ------------------------- */
/**
* Creates a cache with specified cache timeout, updater and with default initial capacity(16)
* and concurrencyLevel(). the factor used to control resizing is 0.75f by default, which
* function liked the {@code loadFactor} of {@link java.util.HashMap}.
* @param cacheTimeout	cache value expired time, unit: milliseconds
* @param updater		a inner updater for update cached value while it is expired
* @throws IllegalArgumentException if {@code cacheTimeout <= 0} or updater is {@code null}
*/
public AutoUpdateCache(long cacheTimeout, Updater updater) {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL, cacheTimeout, updater);
}

/**
* Creates a cache with specified initial capacity, cache timeout, updater and default
* concurrency level(). the factor used to control resizing is 0.75f by default, which function
* liked the {@code loadFactor} of {@link java.util.HashMap}.
* @param initialCapacity	cache initial capacity
* @param cacheTimeout		cache value expired time, unit: milliseconds
* @param updater			a inner updater for update cached value while it is expired
* @throws IllegalArgumentException if {@code initialCapacity <= 0} or {@code cacheTimeout <= 0}
* 		   or updater is {@code null}
*/
public AutoUpdateCache(int initialCapacity, long cacheTimeout, Updater updater) {
this(initialCapacity, DEFAULT_CONCURRENCY_LEVEL, cacheTimeout, updater);
}

/**
* Creates a cache with specified initial capacity, concurrency level, cache timeout and updater.
* the factor used to control resizing is 0.75f by default, which function liked the {@code loadFactor}
* of {@link java.util.HashMap}.
* @param initialCapacity	cache initial capacity
* @param concurrencyLevel	the estimated number of concurrently operation threads
* @param cacheTimeout		cache value expired time, unit: milliseconds
* @param updater			a inner updater for update cached value while it is expired
* @throws IllegalArgumentException if {@code initialCapacity <= 0} or {@code concurrencyLevel <= 0}
* 		   or {@code cacheTimeout <= 0} or updater is {@code null}
*/
public AutoUpdateCache(int initialCapacity, int concurrencyLevel, long cacheTimeout, Updater updater) {
if(initialCapacity <= 0 || concurrencyLevel <= 0 || cacheTimeout <= 0
|| updater == null) {
throw new IllegalArgumentException();
}

if(concurrencyLevel > MAX_SEGMENTS) {
concurrencyLevel = MAX_SEGMENTS;
}

int shift = 0;
int size = 1;
while(size < concurrencyLevel) {
++shift;
size <<= 1;
}
segmentShift = 32 - shift;
segmentMask = size - 1;
segments = new Segment[size];

if(initialCapacity > MAX_CAPACITY) {
initialCapacity = MAX_CAPACITY;
}
int c = initialCapacity / size;
if(c * size < initialCapacity) {
++c;
}

int segmentCapacity = 1;
for(; segmentCapacity < c; segmentCapacity <<= 1);
for(int i=0; i<segments.length; i++) {
segments[i] = new Segment(segmentCapacity);
}

this.cacheTimeInterval = cacheTimeout;
this.updater = updater;
}

/**
* get the cached value mapped the {@code key}.
* note: while you get the cached value by the {@code key}, if the cached value isn't expires, return it;
* otherwise, use the {@link Updater} to acquire the new updated value and return. the {@link Updater} may
* return a <code>null</code> value, it it illegal. if {@link Updater} throws a exception when acquire the
* new updated value, this method will return a expired cached value.
* <br/>
* if the cache don't have a value mapped the {@code key}, it will return {@code null}.
* @param key
* @return
*/
public Object get(Object key) {
if(key == null) {
throw new NullPointerException("key is null");
}

int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}

/**
* An explicit approach to update the value mapped the <code>key</code> in the cache.
* if no mapping for the <code>key</code>, add the <code>value</code> mapped the <code>key</code>.
* As you invoke this method, the cached value's expired time will be extended.
* @param key
* @param value		new value mapped the <code>null</code>. it permits <code>null</code> value.
*/
public void set(Object key, Object value) {
if(key == null || value == null) {
throw new NullPointerException("key or value is null");
}

int hash = hash(key.hashCode());
segmentFor(hash).set(key, hash, value);
}

/**
* remove a cached value mapped <code>key</code>. if no mapping, do nothing.
* @param key
*/
public void remove(Object key) {
if(key == null) {
throw new NullPointerException("key is null");
}

int hash = hash(key.hashCode());
segmentFor(hash).remove(key, hash);
}

/**
* remove all the cached value.
*/
public void removeAll() {
for(int i=0; i<segments.length; i++) {
segments[i].removeAll();
}
}

/**
* An implicit updater provides a approach to update the value mapped the {@code key} in the cache.<br/>
* <b>note:</b> while {@link AutoUpdateCache#get(Object)} detect the cached value expired, it acquires a lock,
* then invoke this updater to update the cached value. if this operation spend a lot of time, performance is very
* slow, so you must avoid it.
*/
public interface Updater {

/**
* cached value update approach
* @param key	key of cached value
* @return		new value	if it is <code>null<code>, means that a <code>null<code> cached value mapped for <code>key</code>
* @throws Exception	if this method throws an exception, cache update the value failed, may return the expired value.
*/
public Object update(Object key) throws Exception;
}
}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐