您的位置:首页 > 其它

TimeCacheMap的解析与使用

2015-11-29 23:28 302 查看
在storm的开发中,经常有这样的业务场景,比如统计1分钟或者5分钟粒度的UV,我们可能会这样想设置一个set,其中用用户user_id与logtime组合起来作为key存入set,每次来一条用户消息先到set中查找,如果存在该条消息则丢弃,如果不存在向set add key;同时设置一个内存缓存map以logtime作为key,根据刚刚的set结果决定是否进行UV的+1操作;同时为了防止set过大开启一个线程每过一定的时间把无效的key进行删除。

其实在storm中,有一个TimeCacheMap的数据结构,可以解决这类问题,TimeCacheMap用于在内存中保存近期活跃的对象,它的实现非常高效,而且可以自动删除过期不再活跃的对象,同时由于其采用了分桶的策略来缩小锁的粒度,所以其有高效的并发读写特性。

(一)TimeCacheMap的实现原理

1.  底层数据结构

timecachemap底层为一个桶链表,也就是一个LinkList,其中链表中每一个元素是一个HashMap,用于保存Key、Value格式的数据。

private LinkedList<HashMap<K, V>> _buckets;

2. 构造函数

如下所示为timecachemap的构造函数,其中LinkList的默认大小为3,如果构造函数设置其链表的大小小于2,将会抛出IllegalArgumentException。

public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
if(numBuckets<2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
_buckets = new LinkedList<HashMap<K, V>>();
for(int i=0; i<numBuckets; i++) {
_buckets.add(new HashMap<K, V>());
}
_callback = callback;
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets-1);
_cleaner = new Thread(new Runnable() {
public void run() {
try {
while(true) {
Map<K, V> dead = null;
Time.sleep(sleepTime);
synchronized(_lock) {
dead = _buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
}
if(_callback!=null) {
for(Entry<K, V> entry: dead.entrySet()) {
_callback.expire(entry.getKey(), entry.getValue());
}
}
}
} catch (InterruptedException ex) {
}
}
});
_cleaner.setDaemon(true);//作为守护线程运行,一旦主线程不在,这个线程自动结束</span>
_cleaner.start();
}


初始化指定链表的长度,即桶bucket的个数,每个bucket中放入一个空的HashMap。

_buckets = new LinkedList<HashMap<K, V>>();
for(int i=0; i<numBuckets; i++) {
_buckets.add(new HashMap<K, V>());
}

设置清理线程:

1)设置线程休眠,sleep时间为sleepTime(sleepTime=expirationMillis / (numBuckets-1)毫秒时间;

2)对_lock对象上锁,然后将LinkList中最后一个元素移除,同时在链表的头部加入一个一个空的HashMap,解除_lock对象;

备注:_lock为锁对象,为了保证操作的原子性

private final Object _lock = new Object();


3)如果设置了callback函数,则进行回调

while(true) {
Map<K, V> dead = null;
Time.sleep(sleepTime);
synchronized(_lock) {
dead = _buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
}
if(_callback!=null) {
for(Entry<K, V> entry: dead.entrySet()) {
_callback.expire(entry.getKey(), entry.getValue());
}
}</span>




3. get方法

获取_lock对象,遍历桶链表,如果存在key则返回。

public V get(K key) {
synchronized(_lock) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return bucket.get(key);
}
}
return null;
}
}


4. put方法

获取_lock对象,向桶链表中第一个桶中put数据,同时将桶链表中后续的桶中存在的相应key的数据删除。

public void put(K key, V value) {
synchronized(_lock) {
Iterator<HashMap<K, V>> it = _buckets.iterator();
HashMap<K, V> bucket = it.next();
bucket.put(key, value);
while(it.hasNext()) {
bucket = it.next();
bucket.remove(key);
}
}
}


5. remove方法

获取_lock对象,遍历桶链表,如果存在key,删除相应的记录并返回删除的记录,否则返回null.

public Object remove(K key) {
synchronized(_lock) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return bucket.remove(key);
}
}
return null;
}
}


6. size方法

获取Timecachemap的大小,由于其是桶链表,所以需要确定每一个桶的大小。

public int size() {
synchronized(_lock) {
int size = 0;
for(HashMap<K, V> bucket: _buckets) {
size+=bucket.size();
}
return size;
}
}

7. timecachemap删除线程举例分析
在TimeCacheMap类的注释中有如下一段话,也就是说清理线程在expirationSecs和expirationSecs*(1+1/(numBuckets-1))之间清理过期消息,为啥这样讲?

/**
* Expires keys that have not been updated in the configured number of seconds.
* The algorithm used will take between expirationSecs and
* expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
*/


再来看看清理线程,线程的sleep时间为sleepTime,sleepTime=expirationSecs/(numBuckets-1)

final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets-1);<pre name="code" class="java">      ......
Time.sleep(sleepTime);




来看一个例子说明一下:假设numBuckets=3,expirationSecs=2,先想timecachemap添加一条记录<001,001>,其状态如下

[{001,001},{},{}]
情况1:在刚刚put完数据之后,_cleaner线程刚刚清理完数据,此时需要等待expirationSecs/(numBuckets-1)=2/3-1=1s
1s之后的状态为:

[{},{001,001},{}]
再过一秒之后,timecachemap的状态为:

[{},{},{001,001}]
再过一秒之后,timecachemap的状态为:

[{},{},{}]


此时{001,001}从put进入timecachemap到移除总共花了3秒

3=expirationSecs /(numBuckets - 1)*numBuckets
=expirationSecs * ( 1 + 1 / (numBuckets - 1))
情况2:在刚刚put完数据之后,_cleaner线程立即清理数据,此时timecachemap中的状态迅速变为

[{},{001,001},{}]
再过一秒之后,timecachemap的状态为:

<pre name="code" class="java">[{},{},{001,001}]


再过一秒之后,timecachemap的状态为:
[{},{},{}]

此时{001,001}从put进入timecachemap到移除总共花了2秒

2=expirationSecs /(numBuckets - 1)*(numBuckets-1)
=expirationSecs


(二)TimeCacheMap的应用

以下是一个粗糙的简单设计:假设统计一分钟的uv,上游的bolt发送的为map格式,map中包含logtime(精确到分钟)与userid

private TimeCacheMap<String,String> timerCache=new TimeCacheMap<String, String>(1000,5);//过期时间设置为1000s
消息过来之后检测timcache是否存在该分钟内的userid,存在的话直接丢弃,不存在的话进行uv+1操作
String logtime =map.get("logtime");
String userid =map.get("userid");
if(!timerCache.containsKey(logtime+userid)){
timerCache.put(logtime+userid, "");
emit(map);
}else{
return;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: