TimeCacheMap的解析与使用
2015-11-29 23:28
302 查看
在storm的开发中,经常有这样的业务场景,比如统计1分钟或者5分钟粒度的UV,我们可能会这样想设置一个set,其中用用户user_id与logtime组合起来作为key存入set,每次来一条用户消息先到set中查找,如果存在该条消息则丢弃,如果不存在向set add key;同时设置一个内存缓存map以logtime作为key,根据刚刚的set结果决定是否进行UV的+1操作;同时为了防止set过大开启一个线程每过一定的时间把无效的key进行删除。
其实在storm中,有一个TimeCacheMap的数据结构,可以解决这类问题,TimeCacheMap用于在内存中保存近期活跃的对象,它的实现非常高效,而且可以自动删除过期不再活跃的对象,同时由于其采用了分桶的策略来缩小锁的粒度,所以其有高效的并发读写特性。
(一)TimeCacheMap的实现原理
1. 底层数据结构
timecachemap底层为一个桶链表,也就是一个LinkList,其中链表中每一个元素是一个HashMap,用于保存Key、Value格式的数据。
2. 构造函数
如下所示为timecachemap的构造函数,其中LinkList的默认大小为3,如果构造函数设置其链表的大小小于2,将会抛出IllegalArgumentException。
初始化指定链表的长度,即桶bucket的个数,每个bucket中放入一个空的HashMap。
设置清理线程:
1)设置线程休眠,sleep时间为sleepTime(sleepTime=expirationMillis / (numBuckets-1)毫秒时间;
2)对_lock对象上锁,然后将LinkList中最后一个元素移除,同时在链表的头部加入一个一个空的HashMap,解除_lock对象;
备注:_lock为锁对象,为了保证操作的原子性
3)如果设置了callback函数,则进行回调
3. get方法
获取_lock对象,遍历桶链表,如果存在key则返回。
4. put方法
获取_lock对象,向桶链表中第一个桶中put数据,同时将桶链表中后续的桶中存在的相应key的数据删除。
5. remove方法
获取_lock对象,遍历桶链表,如果存在key,删除相应的记录并返回删除的记录,否则返回null.
6. size方法
获取Timecachemap的大小,由于其是桶链表,所以需要确定每一个桶的大小。
7. timecachemap删除线程举例分析
在TimeCacheMap类的注释中有如下一段话,也就是说清理线程在expirationSecs和expirationSecs*(1+1/(numBuckets-1))之间清理过期消息,为啥这样讲?
再来看看清理线程,线程的sleep时间为sleepTime,sleepTime=expirationSecs/(numBuckets-1)
来看一个例子说明一下:假设numBuckets=3,expirationSecs=2,先想timecachemap添加一条记录<001,001>,其状态如下
1s之后的状态为:
此时{001,001}从put进入timecachemap到移除总共花了3秒
再过一秒之后,timecachemap的状态为:
此时{001,001}从put进入timecachemap到移除总共花了2秒
(二)TimeCacheMap的应用
以下是一个粗糙的简单设计:假设统计一分钟的uv,上游的bolt发送的为map格式,map中包含logtime(精确到分钟)与userid
其实在storm中,有一个TimeCacheMap的数据结构,可以解决这类问题,TimeCacheMap用于在内存中保存近期活跃的对象,它的实现非常高效,而且可以自动删除过期不再活跃的对象,同时由于其采用了分桶的策略来缩小锁的粒度,所以其有高效的并发读写特性。
(一)TimeCacheMap的实现原理
1. 底层数据结构
timecachemap底层为一个桶链表,也就是一个LinkList,其中链表中每一个元素是一个HashMap,用于保存Key、Value格式的数据。
private LinkedList<HashMap<K, V>> _buckets;
2. 构造函数
如下所示为timecachemap的构造函数,其中LinkList的默认大小为3,如果构造函数设置其链表的大小小于2,将会抛出IllegalArgumentException。
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if(numBuckets<2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } _buckets = new LinkedList<HashMap<K, V>>(); for(int i=0; i<numBuckets; i++) { _buckets.add(new HashMap<K, V>()); } _callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets-1); _cleaner = new Thread(new Runnable() { public void run() { try { while(true) { Map<K, V> dead = null; Time.sleep(sleepTime); synchronized(_lock) { dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); } if(_callback!=null) { for(Entry<K, V> entry: dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } } } catch (InterruptedException ex) { } } }); _cleaner.setDaemon(true);//作为守护线程运行,一旦主线程不在,这个线程自动结束</span> _cleaner.start(); }
初始化指定链表的长度,即桶bucket的个数,每个bucket中放入一个空的HashMap。
_buckets = new LinkedList<HashMap<K, V>>(); for(int i=0; i<numBuckets; i++) { _buckets.add(new HashMap<K, V>()); }
设置清理线程:
1)设置线程休眠,sleep时间为sleepTime(sleepTime=expirationMillis / (numBuckets-1)毫秒时间;
2)对_lock对象上锁,然后将LinkList中最后一个元素移除,同时在链表的头部加入一个一个空的HashMap,解除_lock对象;
备注:_lock为锁对象,为了保证操作的原子性
private final Object _lock = new Object();
3)如果设置了callback函数,则进行回调
while(true) { Map<K, V> dead = null; Time.sleep(sleepTime); synchronized(_lock) { dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); } if(_callback!=null) { for(Entry<K, V> entry: dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } }</span>
3. get方法
获取_lock对象,遍历桶链表,如果存在key则返回。
public V get(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.get(key); } } return null; } }
4. put方法
获取_lock对象,向桶链表中第一个桶中put数据,同时将桶链表中后续的桶中存在的相应key的数据删除。
public void put(K key, V value) { synchronized(_lock) { Iterator<HashMap<K, V>> it = _buckets.iterator(); HashMap<K, V> bucket = it.next(); bucket.put(key, value); while(it.hasNext()) { bucket = it.next(); bucket.remove(key); } } }
5. remove方法
获取_lock对象,遍历桶链表,如果存在key,删除相应的记录并返回删除的记录,否则返回null.
public Object remove(K key) { synchronized(_lock) { for(HashMap<K, V> bucket: _buckets) { if(bucket.containsKey(key)) { return bucket.remove(key); } } return null; } }
6. size方法
获取Timecachemap的大小,由于其是桶链表,所以需要确定每一个桶的大小。
public int size() { synchronized(_lock) { int size = 0; for(HashMap<K, V> bucket: _buckets) { size+=bucket.size(); } return size; } }
7. timecachemap删除线程举例分析
在TimeCacheMap类的注释中有如下一段话,也就是说清理线程在expirationSecs和expirationSecs*(1+1/(numBuckets-1))之间清理过期消息,为啥这样讲?
/** * Expires keys that have not been updated in the configured number of seconds. * The algorithm used will take between expirationSecs and * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message. */
再来看看清理线程,线程的sleep时间为sleepTime,sleepTime=expirationSecs/(numBuckets-1)
final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets-1);<pre name="code" class="java"> ......
Time.sleep(sleepTime);
来看一个例子说明一下:假设numBuckets=3,expirationSecs=2,先想timecachemap添加一条记录<001,001>,其状态如下
[{001,001},{},{}]情况1:在刚刚put完数据之后,_cleaner线程刚刚清理完数据,此时需要等待expirationSecs/(numBuckets-1)=2/3-1=1s
1s之后的状态为:
[{},{001,001},{}]再过一秒之后,timecachemap的状态为:
[{},{},{001,001}]再过一秒之后,timecachemap的状态为:
[{},{},{}]
此时{001,001}从put进入timecachemap到移除总共花了3秒
3=expirationSecs /(numBuckets - 1)*numBuckets =expirationSecs * ( 1 + 1 / (numBuckets - 1))情况2:在刚刚put完数据之后,_cleaner线程立即清理数据,此时timecachemap中的状态迅速变为
[{},{001,001},{}]再过一秒之后,timecachemap的状态为:
<pre name="code" class="java">[{},{},{001,001}]
再过一秒之后,timecachemap的状态为:
[{},{},{}]
此时{001,001}从put进入timecachemap到移除总共花了2秒
2=expirationSecs /(numBuckets - 1)*(numBuckets-1) =expirationSecs
(二)TimeCacheMap的应用
以下是一个粗糙的简单设计:假设统计一分钟的uv,上游的bolt发送的为map格式,map中包含logtime(精确到分钟)与userid
private TimeCacheMap<String,String> timerCache=new TimeCacheMap<String, String>(1000,5);//过期时间设置为1000s消息过来之后检测timcache是否存在该分钟内的userid,存在的话直接丢弃,不存在的话进行uv+1操作
String logtime =map.get("logtime"); String userid =map.get("userid"); if(!timerCache.containsKey(logtime+userid)){ timerCache.put(logtime+userid, ""); emit(map); }else{ return; }
相关文章推荐
- python协程
- linux-文件系统管理09-盘符漂移问题
- hdu-- 计算机学院大学生程序设计竞赛(2015’11)
- Ubuntu 14.04 Web服务器Tomcat安装测试
- 对象的上转型对象,接口回调
- 121 js中(function(){…})()立即执行函数写法理解
- Java多态性理解
- ios 在appdelegate里添加UINavigationController
- 自定义控件-ViewPager篇
- [软件渲染器入门]四-额外章节,使用技巧和并行处理来提高性能
- 判断两个数组是不是有相同元素
- 关于前端写代码之前应该做到的事情
- Linux之最简字符驱动的编码模型
- XP支持AHCI硬盘工作模式
- 读《工业4.0时代,怎样为孩子筹备未来的教育》有感
- 011自定义适配器 Adapter
- 杭电2015’11校赛 1008游乐场
- 练习
- 表设计
- english