探索c#之storm的TimeCacheMap
2015-09-14 08:32
519 查看
阅读目录:
概述
算法介绍
清理线程
获取、插入、删除
总结
思考一下如果需要一个带过期淘汰的缓存容器,我们通常会使用定时器或线程去扫描容器,以便判断是否过期从而删除。但这样性能并不友好,在数据量较大时O(n)检查是一笔不小的开销,并且在大量过期数据删除时需要频繁对容器加锁,这会多少会影响到正常的数据读写删除。
Storm设计了一种比较高效的时间缓存容器TimeCacheMap,它的算法可以在某个时间周期内将数据批量删除,一次批量删除只需要加一次锁即可,并且其读写删除复杂度均为O(1)。
为了更详细的描述,用代码和例子介绍如下:
上面使用了k、v的形式作为缓存数据结构,每个Dictionary是一个桶,然后使用链表把多个桶存储起来。Obj是要锁的对象,NumBuckets是桶的数量,cleaner是清理线程。
在缓存初始化的时候,会实例三个空桶加入到buckets,清理线程开始启动循环检查,假设过期时间时30秒,桶的数量为3,当有新数据进来时,会全部加入到第一个桶中。
为了删除性能,清理线程会定期把整个桶给删除掉,一般我们会每次把链表中最后一个桶给清理掉,然后再加入一个新桶到链表头部。
这种情况下就不能按照缓存过期时间去触发线程清理了,因为有三个桶,如果每30秒触发线程清理掉最后一个桶,那么第三个桶要等到第90秒才开始清理,很明显这样是不合理的。 正确的应该是第30秒开始清理,这时就需要调整线程触发时间,比如调整成10秒,继续模拟下:
触发前1秒插入新数据到第一个桶,如果调整成10秒触发,等到触发删除这个桶时才过了20秒,跟缓存过期时间30秒不一致同样不合理,不管是1秒还是9秒都会导致提前删除数据,需要继续调整触发时间。
如上缓存提前删除不能允许的,但延迟删除一般是可以接受的,因此可以加入一些冗余时间来保证不会提前删除。 这里调整到15秒触发,触发前1秒插入的缓存桶正好在30秒后触发删除,达到不会提前删除的目的。
如上在触发前14秒插入数据,那就需要过了30秒+14秒才能删除。
根据上面的模拟,调整到15秒触发是一个比较合理的值,因此推出缓存最长过期时间的公式为:
如果过期时间是30秒,其最长删除时间是:
因此其过期时间范围即为expirationSecs到expirationSecs * (1 + 1 / (numBuckets-1))之间。
代码执行步骤:
初始化桶加入到链表
计算缓存数据最长过期时间,并作为线程休眠的时间。
线程触发时删除最后一个桶并加入新的桶
不断循环休眠触发触发
启动线程
整个桶的数据删除只需要加一次锁即可,保证其高效。
在插入时删除对应的key,保证不会有重复的key出现。
删除对应的key
完整代码中有容器Size、ContainsKey的实现,github-TimeCacheMap.c#。
在storm中,spout发射的消息和acker的消息即保存在各自的TimeCacheMap里,如果消息超时后会自动通知spout的fail方法。 在storm0.8后TimeCacheMap被弃用了,使用的是新的RotatingMap,但设计和实现基本没变,github-TimeCacheMap.java及github-RotatingMap.java。
概述
算法介绍
清理线程
获取、插入、删除
总结
概述
最近在看storm,发现其中的TimeCacheMap算法设计颇为高效,就简单分享介绍下。思考一下如果需要一个带过期淘汰的缓存容器,我们通常会使用定时器或线程去扫描容器,以便判断是否过期从而删除。但这样性能并不友好,在数据量较大时O(n)检查是一笔不小的开销,并且在大量过期数据删除时需要频繁对容器加锁,这会多少会影响到正常的数据读写删除。
Storm设计了一种比较高效的时间缓存容器TimeCacheMap,它的算法可以在某个时间周期内将数据批量删除,一次批量删除只需要加一次锁即可,并且其读写删除复杂度均为O(1)。
算法介绍
TimeCacheMap把要缓存的数据分拆存储到多个小容器内,这里称为桶。另外有个线程专门在一定时间内去扫描这些桶,一旦发现过期后就把整个桶的数据给删除掉。 其中第二步比较关键,它并不是传统意义上的去定时扫描,而是根据过期时间来触发,比如如果一个桶过期时间10s,那么这个线程就10秒触发一次把整个桶删除即可,当然多个桶的触发策略会有所不同,但思路是同一个。为了更详细的描述,用代码和例子介绍如下:
private LinkedList<Dictionary<K, V>> buckets; private readonly object Obj = new object(); private static readonly int NumBuckets = 3; private Thread cleaner;
上面使用了k、v的形式作为缓存数据结构,每个Dictionary是一个桶,然后使用链表把多个桶存储起来。Obj是要锁的对象,NumBuckets是桶的数量,cleaner是清理线程。
在缓存初始化的时候,会实例三个空桶加入到buckets,清理线程开始启动循环检查,假设过期时间时30秒,桶的数量为3,当有新数据进来时,会全部加入到第一个桶中。
为了删除性能,清理线程会定期把整个桶给删除掉,一般我们会每次把链表中最后一个桶给清理掉,然后再加入一个新桶到链表头部。
这种情况下就不能按照缓存过期时间去触发线程清理了,因为有三个桶,如果每30秒触发线程清理掉最后一个桶,那么第三个桶要等到第90秒才开始清理,很明显这样是不合理的。 正确的应该是第30秒开始清理,这时就需要调整线程触发时间,比如调整成10秒,继续模拟下:
触发前1秒插入新数据到第一个桶,如果调整成10秒触发,等到触发删除这个桶时才过了20秒,跟缓存过期时间30秒不一致同样不合理,不管是1秒还是9秒都会导致提前删除数据,需要继续调整触发时间。
如上缓存提前删除不能允许的,但延迟删除一般是可以接受的,因此可以加入一些冗余时间来保证不会提前删除。 这里调整到15秒触发,触发前1秒插入的缓存桶正好在30秒后触发删除,达到不会提前删除的目的。
如上在触发前14秒插入数据,那就需要过了30秒+14秒才能删除。
根据上面的模拟,调整到15秒触发是一个比较合理的值,因此推出缓存最长过期时间的公式为:
expirationSecs * (1 + 1 / (numBuckets-1))
如果过期时间是30秒,其最长删除时间是:
30*(1+1/(3-1))=30*(1+0.5)=45
因此其过期时间范围即为expirationSecs到expirationSecs * (1 + 1 / (numBuckets-1))之间。
清理线程
如上算法的介绍,我们在类型的构造函数中,实例化并启动清理线程:public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallBack ex) { if (numBuckets < 2) throw new ArgumentException("numBuckets must be >=2"); this.buckets = new LinkedList<Dictionary<K, V>>(); for (int i = 0; i < numBuckets; i++) buckets.AddFirst(new Dictionary<K, V>()); var expirationMillis = expirationSecs * 1000; var sleepTime = expirationMillis / (numBuckets - 1); cleaner = new Thread(() => { while (true) { Dictionary<K, V> dead = null; Thread.Sleep(sleepTime); lock (Obj) { dead = buckets.Last(); buckets.RemoveLast(); buckets.AddFirst(new Dictionary<K, V>()); } if (ex != null) ex(dead); } }); cleaner.IsBackground = true; cleaner.Start(); }
代码执行步骤:
初始化桶加入到链表
计算缓存数据最长过期时间,并作为线程休眠的时间。
线程触发时删除最后一个桶并加入新的桶
不断循环休眠触发触发
启动线程
整个桶的数据删除只需要加一次锁即可,保证其高效。
获取、插入、删除
遍历整个链表,查询到第一个满足key的立即返回,这需要保证不会有重复key。public V Get(K key) { lock (Obj) { foreach (var item in buckets) { if (item.ContainsKey(key)) return item[key]; } return default(V); } }
在插入时删除对应的key,保证不会有重复的key出现。
public void Put(K key, V value) { lock (Obj) { foreach (var item in buckets) { item.Remove(key); } buckets.First().Add(key, value); } }
删除对应的key
public void Remove(K key) { lock (Obj) { foreach (var item in buckets) { if (item.ContainsKey(key)) item.Remove(key); } } }
总结
在那些年我们一起追过的缓存写法(三)中有介绍过关于惰性删除及高效LRU算法优化缓存容器的过期,有兴趣的童鞋可以看看。完整代码中有容器Size、ContainsKey的实现,github-TimeCacheMap.c#。
在storm中,spout发射的消息和acker的消息即保存在各自的TimeCacheMap里,如果消息超时后会自动通知spout的fail方法。 在storm0.8后TimeCacheMap被弃用了,使用的是新的RotatingMap,但设计和实现基本没变,github-TimeCacheMap.java及github-RotatingMap.java。
相关文章推荐
- 群蚁算法理论与实践全攻略——旅行商等路径优化问题的新方法【附C#群蚁算法完整项目代码】
- C#中的try catch 和finally
- C#如何调用REST
- C#删除只读文件或文件夹(解决File.Delete无法删除文件)
- C#反射机制(转)
- 关于C#的静态类和静态构造函数
- C#实现所有经典排序算法
- c#配置文件的简单操作
- C#读写config配置文件
- 在制作水晶报表时遇到此错误未处理System.IO.FileNotFoundException HResult=-2147024894
- 读书心得20150913
- 巧用FileShare解决C#读写文件时文件正由另一进程使用的bug
- C#读取Word模板替换相应的字符串(标签)生成新的Word
- C#内存释放(垃圾回收)
- C#在Winform中改变Textbox高度三种方法
- c#开发安卓学习
- C#中DataGridView控件使用大全
- C# mvc中为Controller或Action添加定制特性实现登录验证
- C#跨线程调用
- C#~异步编程续~.net4.5主推的await&async应用(转)