如何将 MongoDB MapReduce 速度提升 20 倍
2013-11-02 19:58
288 查看
分析在MongoDB中正成为越来越重要的话题,因为它在越来越多的大型项目中使用。人们厌倦了使用不同的软件来做分析(包括Hadoop),它们显然需要传输大量开销的数据。
MongoDB提供了两种内置分析数据的方法:Map Reduce和Aggregation框架。MR非常灵活,很容易部署。它通过分区工作良好,并允许大量输出。MR在MongoDB v2.4中,通过使用JavaScript引擎把Spider Monkey替换成V8,性能提升很多。老板抱怨它太慢了,尤其是和Agg框架(使用C++)相比。让我们看看能否从中榨出点果汁
从这其中,我们想要计算出现的不同值的个数。可以用下列MR任务轻松完成这个工作:
>db.runCommand(
{mapreduce:"uniques",map:function () {emit(this.dim0, 1); },reduce:function (key, values) {return Array.sum(values); },out:"mrout" })
{
"result" :"mrout","timeMillis" :1161960,"counts" :{
"input" :10000000,"emit" :10000000,"reduce" :1059138,"output" :999961
},"ok" :1
}
正如你在输出内容中看到的,这耗费了大概1200秒(在EC2 M3实例上进行的测试)。有1千万个map,1百万个reduce,输出了999961个文档。结果就像下面这样:
>db.mrout.find()
{"_id" :1, "value" :10 }
{"_id" :2, "value" :5 }
{"_id" :3, "value" :6 }
{"_id" :4, "value" :10 }
{"_id" :5, "value" :9 }
{"_id" :6, "value" :12 }
{"_id" :7, "value" :5 }
{"_id" :8, "value" :16 }
{"_id" :9, "value" :10 }
{"_id" :10, "value" :13 }
...
>db.runCommand(
{mapreduce:"uniques",map:function () {emit(this.dim0, 1); },reduce:function (key, values) {return Array.sum(values); },out:"mrout",sort:{dim0:1}})
{
"result" :"mrout","timeMillis" :192589,"counts" :{
"input" :10000000,"emit" :10000000,"reduce" :1000372,"output" :999961
},"ok" :1
}
确实大有助益!我们下降到192秒,已经提升了6倍。reduce的数量基本相同,但现在它们在写入磁盘前,可以在RAM内完成。
>db.runCommand({splitVector:"test.uniques", keyPattern:{dim0:1}, maxChunkSizeBytes:32000000})
{
"timeMillis" :6006,"splitKeys" :[
{
"dim0" :18171
},{
"dim0" :36378
},{
"dim0" :54528
},{
"dim0" :72717
},…
{
"dim0" :963598
},{
"dim0" :981805
}
],"ok" :1
}
这个命令在超过1千万个文档中找到分割点仅仅需要花费5秒,很快!那么现在我们仅仅需要一个方法来创建多个MR作业。从一个应用服务器,使用多线程和为MR命令使用$gt/$It查询
相当简单。通过shell,你可以使用ScopedThread,使用方法如下:
>var t =new ScopedThread(mapred, 963598, 981805)
>t.start()
>t.join()
现在我们把一些快速运行的js代码放在一起,它们会产生4个线程(或者更多的线程),执行后呈现出下面的结果:
第一个线程时间确实超过了其他的线程,但是平均每个线程仍然用了大约190s的时间.这意味着并没有一个线程快!这有点奇怪,自从用了‘top’,在某种程度上,你可以看到所有的内核运行情况。
这才像话!我们现在降到了100秒,这意味着相比一个线程而言已经提升了2倍。还算差强人意吧。现在我们只有4个核所以只快了2倍,要是在8核CPU上将会快4倍,以此类推。
现在我们降到了70秒,就搞定了任务!jsMode真心有用,尤其是当对象有很多字段的时候。这里只有一个数字字段就已经下降了30%。
这是明显的提高了3倍的运行速度,时间降低到了60s,大约10-15%。这种变化也提高了整体JS引擎的堆消耗。
结语
回顾一下,对于同一个MR作业,我们开始时花费1200秒,最后花费60秒,提升了20倍!这项提高应该对大部分应用都有效,即使有些trick不太理想(例如,使用多种输出dbs/collections)。至少这能提供给人们思路,如何加速他们的MR作业,希望这些特征在将来会更加易于使用。接下来的票将使得‘splitVector’命令更加有用,这张票将在同一个数据库中提升多MR作业。干杯!
MongoDB提供了两种内置分析数据的方法:Map Reduce和Aggregation框架。MR非常灵活,很容易部署。它通过分区工作良好,并允许大量输出。MR在MongoDB v2.4中,通过使用JavaScript引擎把Spider Monkey替换成V8,性能提升很多。老板抱怨它太慢了,尤其是和Agg框架(使用C++)相比。让我们看看能否从中榨出点果汁
练习
让我们插入1千万条文档,每个文档包含一个从0到1000000的整数。这意味着平均有10个文档会具有相同的值。>for (var i = 0; i < 10000000; ++i){db.uniques.insert({dim0:Math.floor(Math.random()*1000000) });} >db.uniques.findOne() {"_id" :ObjectId("51d3c386acd412e22c188dec"), "dim0" :570859 } >db.uniques.ensureIndex({dim0:1}) >db.uniques.stats() { "ns" :"test.uniques", "count" :10000000, "size" :360000052, "avgObjSize" :36.0000052, "storageSize" :582864896, "numExtents" :18, "nindexes" :2, "lastExtentSize" :153874432, "paddingFactor" :1, "systemFlags" :1, "userFlags" :0, "totalIndexSize" :576040080, "indexSizes" :{ "_id_" :324456384, "dim0_1" :251583696 }, "ok" :1 }
从这其中,我们想要计算出现的不同值的个数。可以用下列MR任务轻松完成这个工作:
>db.runCommand(
{mapreduce:"uniques",map:function () {emit(this.dim0, 1); },reduce:function (key, values) {return Array.sum(values); },out:"mrout" })
{
"result" :"mrout","timeMillis" :1161960,"counts" :{
"input" :10000000,"emit" :10000000,"reduce" :1059138,"output" :999961
},"ok" :1
}
正如你在输出内容中看到的,这耗费了大概1200秒(在EC2 M3实例上进行的测试)。有1千万个map,1百万个reduce,输出了999961个文档。结果就像下面这样:
>db.mrout.find()
{"_id" :1, "value" :10 }
{"_id" :2, "value" :5 }
{"_id" :3, "value" :6 }
{"_id" :4, "value" :10 }
{"_id" :5, "value" :9 }
{"_id" :6, "value" :12 }
{"_id" :7, "value" :5 }
{"_id" :8, "value" :16 }
{"_id" :9, "value" :10 }
{"_id" :10, "value" :13 }
...
使用排序
我在上一篇博文中提到了在MR中使用排序多么有益。这个特性很少被理解。在这个例子中,处理未排序的输入意味着MR引擎将得到随机顺序的值,在RAM中根本无法reduce。相反,它将不得不把所有文章写入一个临时收集的磁盘,然后按顺序读取并reduce。让我们看看使用排序是否有助:>db.runCommand(
{mapreduce:"uniques",map:function () {emit(this.dim0, 1); },reduce:function (key, values) {return Array.sum(values); },out:"mrout",sort:{dim0:1}})
{
"result" :"mrout","timeMillis" :192589,"counts" :{
"input" :10000000,"emit" :10000000,"reduce" :1000372,"output" :999961
},"ok" :1
}
确实大有助益!我们下降到192秒,已经提升了6倍。reduce的数量基本相同,但现在它们在写入磁盘前,可以在RAM内完成。
使用多线程
MongoDB对单独的MR作业并不使用多线程——它仅仅对多作业使用多线程。但通过多核CPU,在单个服务器使用Hadoop风格来并行作业非常有优势。我们需要做的是把输入分成几块,通过各个块来加速一个MR作业。也许数据集有简单的方法来分割,但其他使用splitVector命令(不明确)可以使你很快的找到分割点:>db.runCommand({splitVector:"test.uniques", keyPattern:{dim0:1}, maxChunkSizeBytes:32000000})
{
"timeMillis" :6006,"splitKeys" :[
{
"dim0" :18171
},{
"dim0" :36378
},{
"dim0" :54528
},{
"dim0" :72717
},…
{
"dim0" :963598
},{
"dim0" :981805
}
],"ok" :1
}
这个命令在超过1千万个文档中找到分割点仅仅需要花费5秒,很快!那么现在我们仅仅需要一个方法来创建多个MR作业。从一个应用服务器,使用多线程和为MR命令使用$gt/$It查询
相当简单。通过shell,你可以使用ScopedThread,使用方法如下:
>var t =new ScopedThread(mapred, 963598, 981805)
>t.start()
>t.join()
现在我们把一些快速运行的js代码放在一起,它们会产生4个线程(或者更多的线程),执行后呈现出下面的结果:
> var res = db.runCommand({splitVector: "test.uniques" , keyPattern:{dim0:1}, maxChunkSizeBytes:32 *1024 * 1024 }) |
02 | > var keys = res.splitKeys |
03 | >keys.length |
04 | 39 |
05 | > var mapred = function (min, max) { |
06 | return db.runCommand({mapreduce: "uniques" , |
07 | map: function () {emit( this .dim0, 1); }, |
08 | reduce: function (key, values) { return Array.sum(values); }, |
09 | out: "mrout" + min, |
10 | sort:{dim0:1}, |
11 | query:{dim0:{$gte:min, $lt:max }}}) } |
12 | > var numThreads = 4 |
13 | > var inc = Math.floor(keys.length / numThreads) + 1 |
14 | >threads = []; for ( var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 :keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey :keys[i * inc + inc].dim0 ; print( "min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } |
15 | min:0 max:274736 |
16 | min:274736 max:524997 |
17 | min:524997 max:775025 |
18 | min:775025 max:{ "$maxKey" :1 } |
19 | connecting to:test |
20 | connecting to:test |
21 | connecting to:test |
22 | connecting to:test |
23 | > for ( var i in threads) { var t =threads[i]; t.join(); printjson(t.returnData()); } |
24 | { |
25 | "result" : "mrout0" , |
26 | "timeMillis" :205790, |
27 | "counts" :{ |
28 | "input" :2750002, |
29 | "emit" :2750002, |
30 | "reduce" :274828, |
31 | "output" :274723 |
32 | }, |
33 | "ok" :1 |
34 | } |
35 | { |
36 | "result" : "mrout274736" , |
37 | "timeMillis" :189868, |
38 | "counts" :{ |
39 | "input" :2500013, |
40 | "emit" :2500013, |
41 | "reduce" :250364, |
42 | "output" :250255 |
43 | }, |
44 | "ok" :1 |
45 | } |
46 | { |
47 | "result" : "mrout524997" , |
48 | "timeMillis" :191449, |
49 | "counts" :{ |
50 | "input" :2500014, |
51 | "emit" :2500014, |
52 | "reduce" :250120, |
53 | "output" :250019 |
54 | }, |
55 | "ok"
|
56 | } |
57 | { |
58 | "result" : "mrout775025" , |
59 | "timeMillis" :184945, |
60 | "counts" :{ |
61 | "input" :2249971, |
62 | "emit" :2249971, |
63 | "reduce" :225057, |
64 | "output" :224964 |
65 | }, |
66 | "ok" :1 |
67 | } |
68 | "ok" :1 |
69 | } |
70 | { |
71 | "result" : "mrout775025" , |
72 | "timeMillis" :184945, |
73 | "counts" :{ |
74 | "input" :2249971, |
75 | "emit" :2249971, |
76 | "reduce" :225057, |
77 | "output" :224964 |
78 | }, |
79 | "ok" :1 |
80 | } |
使用多数据库
问题是在多线程之间会有很多锁竞争。在上锁时,MR并不是那么无私的(它每1000次读操作就会产生一次锁定),而且MR任务还会执行许多写操作,导致线程最终都会在等待另一个线程。由于每个MongoDB数据库都有私有锁,让我们尝试为每一个线程使用一个不同的输出数据库:> var mapred = function (min, max) { |
02 | return db.runCommand({mapreduce: "uniques" , |
03 | map: function () {emit( this .dim0, 1); }, |
04 | reduce: function (key, values) { return Array.sum(values); }, |
05 | out:{replace: "mrout" + min, db: "mrdb" + min }, |
06 | sort:{dim0:1}, |
07 | query:{dim0:{$gte:min, $lt:max }}}) } |
08 | >threads = []; for ( var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 :keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey :keys[i * inc + inc].dim0 ; print( "min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } |
09 | min:0 max:274736 |
10 | min:274736 max:524997 |
11 | min:524997 max:775025 |
12 | min:775025 max:{ "$maxKey" :1 } |
13 | connecting to:test |
14 | connecting to:test |
15 | connecting to:test |
16 | connecting to:test |
17 | > for ( var i in threads) { var t =threads[i]; t.join(); printjson(t.returnData()); } |
18 | ... |
19 | { |
20 | "result" :{ |
21 | "db" : "mrdb274736" , |
22 | "collection" : "mrout274736" |
23 | }, |
24 | "timeMillis" :105821, |
25 | "counts" :{ |
26 | "input" :2500013, |
27 | "emit" :2500013, |
28 | "reduce" :250364, |
29 | "output" :250255 |
30 | }, |
31 | "ok" :1 |
32 | } |
33 | ... |
使用纯JavaScript模式
当把输入数据拆分到不同线程上去的时候,发生了一些有趣的事情:每个线程现在有大约250000个不同的值来输出,而不是1百万。这意味着我们可以使用“纯JS模式”,它可以通过使用jsMode:true来开启。开启后,MongoDB在处理时将不会把对象在JS和BSON之间来回翻译,相反,它使用一个限额500000个key的内部JS字典来化简所有对象。让我们看看这是否有用:> var mapred = function (min, max) { |
02 | return db.runCommand({mapreduce: "uniques" , |
03 | map: function () {emit( this .dim0, 1); }, |
04 | reduce: function (key, values) { return Array.sum(values); }, |
05 | out:{replace: "mrout" + min, db: "mrdb" + min }, |
06 | sort:{dim0:1}, |
07 | query:{dim0:{$gte:min, $lt:max }}, |
08 | jsMode: true }) } |
09 | >threads = []; for ( var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 :keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey :keys[i * inc + inc].dim0 ; print( "min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } |
10 | min:0 max:274736 |
11 | min:274736 max:524997 |
12 | min:524997 max:775025 |
13 | min:775025 max:{ "$maxKey" :1 } |
14 | connecting to:test |
15 | connecting to:test |
16 | connecting to:test |
17 | connecting to:test |
18 | > for ( var i in threads) { var t =threads[i]; t.join(); printjson(t.returnData()); } |
19 | ... |
20 | { |
21 | "result" :{ |
22 | "db" : "mrdb274736" , |
23 | "collection" : "mrout274736" |
24 | }, |
25 | "timeMillis" :70507, |
26 | "counts" :{ |
27 | "input" :2500013, |
28 | "emit" :2500013, |
29 | "reduce" :250156, |
30 | "output" :250255 |
31 | }, |
32 | "ok" :1 |
33 | } |
34 | ... |
MongoDB在2.6版本上的改进
在很早的2.6版本中,在任何的js函数调用的时候,我们就通过一段代码设置一个可选参数”args“。这种做法并不标准,不在使用。但是它确有留下来的原因(查看 SERVER-4654)。让我们从Git资源库中导入MongoDB,编译并运行进行测试:... |
02 | { |
03 | "result" :{ |
04 | "db" : "mrdb274736" , |
05 | "collection" : "mrout274736" |
06 | }, |
07 | "timeMillis" :62785, |
08 | "counts" :{ |
09 | "input" :2500013, |
10 | "emit" :2500013, |
11 | "reduce" :250156, |
12 | "output" :250255 |
13 | }, |
14 | "ok" :1 |
15 | } |
16 | ... |
结语
回顾一下,对于同一个MR作业,我们开始时花费1200秒,最后花费60秒,提升了20倍!这项提高应该对大部分应用都有效,即使有些trick不太理想(例如,使用多种输出dbs/collections)。至少这能提供给人们思路,如何加速他们的MR作业,希望这些特征在将来会更加易于使用。接下来的票将使得‘splitVector’命令更加有用,这张票将在同一个数据库中提升多MR作业。干杯!
相关文章推荐
- 新技能Get:如何利用HTTP技术提升网页的加载速度
- 进阶篇:如何优化网站提升访问速度
- 如何通过预加载器提升网页加载速度
- 新手如何提升速度?
- 20 种提升网页速度的技巧
- 如何提升网站在移动端的打开速度
- 20 种提升网页速度的技巧
- 如何提升电脑的速度(五年时间收集各家精华,绝对史上最全)
- windows系统启动缓慢怎么办?如何提升开机速度设置图解
- 如何提升JavaScript的运行速度(DOM篇)
- 160830、如何运用最新的技术提升网页速度和性能
- Eclipse 中的多线程编译——如何成倍提升Gcc编译器的编译速度
- 如何提升Visual Studio 2010 的速度
- 如何提升JavaScript的运行速度(DOM篇)
- 如何把Docker镜像分发速度提升90%
- 我是如何让Eclipse的启动速度提升1.5秒的
- 20 种提升网页速度的技巧
- 如何提升JavaScript的运行速度(循环篇)
- 网站打开速度优化:如何提高网页加载速度,提升网页打开速度
- 如何快速提升windows7关机速度?win7关机速度提升小技巧