您的位置:首页 > 数据库 > Mongodb

如何将 MongoDB MapReduce 速度提升 20 倍

2013-11-02 19:58 288 查看
分析在MongoDB中正成为越来越重要的话题,因为它在越来越多的大型项目中使用。人们厌倦了使用不同的软件来做分析(包括Hadoop),它们显然需要传输大量开销的数据。

MongoDB提供了两种内置分析数据的方法:Map Reduce和Aggregation框架。MR非常灵活,很容易部署。它通过分区工作良好,并允许大量输出。MR在MongoDB v2.4中,通过使用JavaScript引擎把Spider Monkey替换成V8,性能提升很多。老板抱怨它太慢了,尤其是和Agg框架(使用C++)相比。让我们看看能否从中榨出点果汁

练习

让我们插入1千万条文档,每个文档包含一个从0到1000000的整数。这意味着平均有10个文档会具有相同的值。

>for (var i = 0; i < 10000000; ++i){db.uniques.insert({dim0:Math.floor(Math.random()*1000000) });}
>db.uniques.findOne()
{"_id" :ObjectId("51d3c386acd412e22c188dec"), "dim0" :570859 }
>db.uniques.ensureIndex({dim0:1})
>db.uniques.stats()
{
"ns" :"test.uniques",
"count" :10000000,
"size" :360000052,
"avgObjSize" :36.0000052,
"storageSize" :582864896,
"numExtents" :18,
"nindexes" :2,
"lastExtentSize" :153874432,
"paddingFactor" :1,
"systemFlags" :1,
"userFlags" :0,
"totalIndexSize" :576040080,
"indexSizes" :{
"_id_" :324456384,
"dim0_1" :251583696
},
"ok" :1
}

从这其中,我们想要计算出现的不同值的个数。可以用下列MR任务轻松完成这个工作:

>db.runCommand(

{mapreduce:"uniques",map:function () {emit(this.dim0, 1); },reduce:function (key, values) {return Array.sum(values); },out:"mrout" })

{

"result" :"mrout","timeMillis" :1161960,"counts" :{

"input" :10000000,"emit" :10000000,"reduce" :1059138,"output" :999961

},"ok" :1

}

正如你在输出内容中看到的,这耗费了大概1200秒(在EC2 M3实例上进行的测试)。有1千万个map,1百万个reduce,输出了999961个文档。结果就像下面这样:

>db.mrout.find()

{"_id" :1, "value" :10 }

{"_id" :2, "value" :5 }

{"_id" :3, "value" :6 }

{"_id" :4, "value" :10 }

{"_id" :5, "value" :9 }

{"_id" :6, "value" :12 }

{"_id" :7, "value" :5 }

{"_id" :8, "value" :16 }

{"_id" :9, "value" :10 }

{"_id" :10, "value" :13 }

...

使用排序

我在上一篇博文中提到了在MR中使用排序多么有益。这个特性很少被理解。在这个例子中,处理未排序的输入意味着MR引擎将得到随机顺序的值,在RAM中根本无法reduce。相反,它将不得不把所有文章写入一个临时收集的磁盘,然后按顺序读取并reduce。让我们看看使用排序是否有助:

>db.runCommand(

{mapreduce:"uniques",map:function () {emit(this.dim0, 1); },reduce:function (key, values) {return Array.sum(values); },out:"mrout",sort:{dim0:1}})

{

"result" :"mrout","timeMillis" :192589,"counts" :{

"input" :10000000,"emit" :10000000,"reduce" :1000372,"output" :999961

},"ok" :1

}

确实大有助益!我们下降到192秒,已经提升了6倍。reduce的数量基本相同,但现在它们在写入磁盘前,可以在RAM内完成。

使用多线程

MongoDB对单独的MR作业并不使用多线程——它仅仅对多作业使用多线程。但通过多核CPU,在单个服务器使用Hadoop风格来并行作业非常有优势。我们需要做的是把输入分成几块,通过各个块来加速一个MR作业。也许数据集有简单的方法来分割,但其他使用splitVector命令(不明确)可以使你很快的找到分割点:

>db.runCommand({splitVector:"test.uniques", keyPattern:{dim0:1}, maxChunkSizeBytes:32000000})

{

"timeMillis" :6006,"splitKeys" :[

{

"dim0" :18171

},{

"dim0" :36378

},{

"dim0" :54528

},{

"dim0" :72717

},…

{

"dim0" :963598

},{

"dim0" :981805

}

],"ok" :1

}

这个命令在超过1千万个文档中找到分割点仅仅需要花费5秒,很快!那么现在我们仅仅需要一个方法来创建多个MR作业。从一个应用服务器,使用多线程和为MR命令使用$gt/$It查询
相当简单。通过shell,你可以使用ScopedThread,使用方法如下:

>var t =new ScopedThread(mapred, 963598, 981805)

>t.start()

>t.join()

现在我们把一些快速运行的js代码放在一起,它们会产生4个线程(或者更多的线程),执行后呈现出下面的结果:

>
var

res = db.runCommand({splitVector:
"test.uniques"
, keyPattern:{dim0:1}, maxChunkSizeBytes:32 *1024 * 1024 })
02
>
var

keys = res.splitKeys
03
>keys.length 
04
39
05
>
var

mapred =
function
(min, max) {
06
return
db.runCommand({mapreduce:
"uniques"
,
07
map:
function

() {emit(
this
.dim0, 1); },
08
reduce:
function

(key, values) {
return

Array.sum(values); },
09
out:
"mrout"

+ min,
10
sort:{dim0:1},
11
query:{dim0:{$gte:min, $lt:max }}}) }
12
>
var

numThreads = 4
13
>
var

inc = Math.floor(keys.length / numThreads) + 1

14
>threads = []; 
for

(
var
i = 0; i < numThreads; ++i) {
var
min = (i == 0) ? 0 :keys[i * inc].dim0;
var

max = (i * inc + inc >= keys.length) ? MaxKey :keys[i * inc + inc].dim0 ; print(
"min:"

+ min +
" max:"

+ max);
var

t =
new
ScopedThread(mapred, min, max); threads.push(t); t.start() }
15
min:0 max:274736
16
min:274736 max:524997
17
min:524997 max:775025
18
min:775025 max:{
"$maxKey"

:1 }
19
connecting to:test
20
connecting to:test
21
connecting to:test
22
connecting to:test
23
>
for

(
var
i
in
threads) {
var
t =threads[i]; t.join(); printjson(t.returnData()); }
24
{
25
"result"

:
"mrout0"
,
26
"timeMillis"

:205790, 
27
"counts"

:{
28
"input"

:2750002, 
29
"emit"

:2750002, 
30
"reduce"

:274828, 
31
"output"

:274723 
32
},
33
"ok"

:1 
34
}
35
{
36
"result"

:
"mrout274736"
,
37
"timeMillis"

:189868, 
38
"counts"

:{
39
"input"

:2500013, 
40
"emit"

:2500013, 
41
"reduce"

:250364, 
42
"output"

:250255 
43
},
44
"ok"

:1 
45
}
46
{
47
"result"

:
"mrout524997"
,
48
"timeMillis"

:191449, 
49
"counts"

:{
50
"input"

:2500014, 
51
"emit"

:2500014, 
52
"reduce"

:250120, 
53
"output"

:250019 
54
},
55
"ok"

:1
56
}
57
{
58
"result"

:
"mrout775025"
,
59
"timeMillis"

:184945, 
60
"counts"

:{
61
"input"

:2249971, 
62
"emit"

:2249971, 
63
"reduce"

:225057, 
64
"output"

:224964 
65
},
66
"ok"

:1 
67
}
68
 
"ok"

:1 
69
}
70
{
71
"result"

:
"mrout775025"
,
72
"timeMillis"

:184945, 
73
"counts"

:{
74
"input"

:2249971, 
75
"emit"

:2249971, 
76
"reduce"

:225057, 
77
"output"

:224964 
78
},
79
"ok"

:1 
80
}
第一个线程时间确实超过了其他的线程,但是平均每个线程仍然用了大约190s的时间.这意味着并没有一个线程快!这有点奇怪,自从用了‘top’,在某种程度上,你可以看到所有的内核运行情况。

使用多数据库

问题是在多线程之间会有很多锁竞争。在上锁时,MR并不是那么无私的(它每1000次读操作就会产生一次锁定),而且MR任务还会执行许多写操作,导致线程最终都会在等待另一个线程。由于每个MongoDB数据库都有私有锁,让我们尝试为每一个线程使用一个不同的输出数据库:

>
var

mapred =
function
(min, max) {
02
return
db.runCommand({mapreduce:
"uniques"
,
03
map:
function

() {emit(
this
.dim0, 1); },
04
reduce:
function

(key, values) {
return

Array.sum(values); },
05
out:{replace:
"mrout"

+ min, db:
"mrdb"

+ min },
06
sort:{dim0:1},
07
query:{dim0:{$gte:min, $lt:max }}}) }
08
>threads = []; 
for

(
var
i = 0; i < numThreads; ++i) {
var
min = (i == 0) ? 0 :keys[i * inc].dim0;
var

max = (i * inc + inc >= keys.length) ? MaxKey :keys[i * inc + inc].dim0 ; print(
"min:"

+ min +
" max:"

+ max);
var

t =
new
ScopedThread(mapred, min, max); threads.push(t); t.start() }
09
min:0 max:274736
10
min:274736 max:524997
11
min:524997 max:775025
12
min:775025 max:{
"$maxKey"

:1 }
13
connecting to:test
14
connecting to:test
15
connecting to:test
16
connecting to:test
17
>
for

(
var
i
in
threads) {
var
t =threads[i]; t.join(); printjson(t.returnData()); }
18
...
19
{
20
"result"

:{
21
"db"

:
"mrdb274736"
,
22
"collection"

:
"mrout274736"
23
},
24
"timeMillis"

:105821, 
25
"counts"

:{
26
"input"

:2500013, 
27
"emit"

:2500013, 
28
"reduce"

:250364, 
29
"output"

:250255 
30
},
31
"ok"

:1 
32
}
33
...
这才像话!我们现在降到了100秒,这意味着相比一个线程而言已经提升了2倍。还算差强人意吧。现在我们只有4个核所以只快了2倍,要是在8核CPU上将会快4倍,以此类推。

使用纯JavaScript模式

当把输入数据拆分到不同线程上去的时候,发生了一些有趣的事情:每个线程现在有大约250000个不同的值来输出,而不是1百万。这意味着我们可以使用“纯JS模式”,它可以通过使用jsMode:true来开启。开启后,MongoDB在处理时将不会把对象在JS和BSON之间来回翻译,相反,它使用一个限额500000个key的内部JS字典来化简所有对象。让我们看看这是否有用:

>
var

mapred =
function
(min, max) {
02
return
db.runCommand({mapreduce:
"uniques"
,
03
map:
function

() {emit(
this
.dim0, 1); },
04
reduce:
function

(key, values) {
return

Array.sum(values); },
05
out:{replace:
"mrout"

+ min, db:
"mrdb"

+ min },
06
sort:{dim0:1},
07
query:{dim0:{$gte:min, $lt:max }},
08
jsMode:
true

}) }
09
>threads = []; 
for

(
var
i = 0; i < numThreads; ++i) {
var
min = (i == 0) ? 0 :keys[i * inc].dim0;
var

max = (i * inc + inc >= keys.length) ? MaxKey :keys[i * inc + inc].dim0 ; print(
"min:"

+ min +
" max:"

+ max);
var

t =
new
ScopedThread(mapred, min, max); threads.push(t); t.start() }
10
min:0 max:274736
11
min:274736 max:524997
12
min:524997 max:775025
13
min:775025 max:{
"$maxKey"

:1 }
14
connecting to:test
15
connecting to:test
16
connecting to:test
17
connecting to:test
18
>
for

(
var
i
in
threads) {
var
t =threads[i]; t.join(); printjson(t.returnData()); }
19
...
20
{
21
"result"

:{
22
"db"

:
"mrdb274736"
,
23
"collection"

:
"mrout274736"
24
},
25
"timeMillis"

:70507, 
26
"counts"

:{
27
"input"

:2500013, 
28
"emit"

:2500013, 
29
"reduce"

:250156, 
30
"output"

:250255 
31
},
32
"ok"

:1 
33
}
34
...
现在我们降到了70秒,就搞定了任务!jsMode真心有用,尤其是当对象有很多字段的时候。这里只有一个数字字段就已经下降了30%。

MongoDB在2.6版本上的改进

在很早的2.6版本中,在任何的js函数调用的时候,我们就通过一段代码设置一个可选参数”args“。这种做法并不标准,不在使用。但是它确有留下来的原因(查看 SERVER-4654)。让我们从Git资源库中导入MongoDB,编译并运行进行测试:

...
02
{
03
"result"

:{
04
"db"

:
"mrdb274736"
,
05
"collection"

:
"mrout274736"
06
},
07
"timeMillis"

:62785, 
08
"counts"

:{
09
"input"

:2500013, 
10
"emit"

:2500013, 
11
"reduce"

:250156, 
12
"output"

:250255 
13
},
14
"ok"

:1 
15
}
16
...
这是明显的提高了3倍的运行速度,时间降低到了60s,大约10-15%。这种变化也提高了整体JS引擎的堆消耗。

结语

回顾一下,对于同一个MR作业,我们开始时花费1200秒,最后花费60秒,提升了20倍!这项提高应该对大部分应用都有效,即使有些trick不太理想(例如,使用多种输出dbs/collections)。至少这能提供给人们思路,如何加速他们的MR作业,希望这些特征在将来会更加易于使用。接下来的将使得‘splitVector’命令更加有用,这张将在同一个数据库中提升多MR作业。干杯!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: