您的位置:首页 > 数据库 > Mongodb

mongodb中mapreduce的使用以及使用pymongo调用mapreduce

2013-07-24 17:01 351 查看
先简单介绍一下map/reduce

map函数:接受一个键值对(key-value pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。

reduce函数:接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。

关于mapreduce的详细介绍可以看这一篇博客/article/7791596.html

mongodb直接调用mapreduce不是本文重点,因此引述网上一个十分经典的例子

往数据库中插入数据

首先我们通过以下命令创建两本书:

>book1 = {name :"Understanding JAVA",pages :100}

>book2 = {name :"Understanding JSON",pages :200}

然后将这两本书保持到名为 books 的集合中:

>db.books.save(book1)

>db.books.save(book2)

上述命令将在 library 数据库中创建一个名为 books 的集合(也就是SQL数据库中的表),下面命令将列出我们刚添加的两本书:

>db.books.find();

{"_id" :ObjectId("4f365b1ed6d9d6de7c7ae4b1"),"name" :"Understanding JAVA","pages" :100 }

{"_id" :ObjectId("4f365b28d6d9d6de7c7ae4b2"),"name" :"Understanding JSON","pages" :200 }

添加更多的记录:

>book = {name :"Understanding XML",pages :300}

>db.books.save(book)

>book = {name :"Understanding Web Services",pages :400}

>db.books.save(book)

>book = {name :"Understanding Axis2",pages :150}

>db.books.save(book)

6. 编写 Map 函数

接下来我们编写一个搜索功能,用来查找超过250页的图书:

1
>
var
map
=
function
()
{
2
var
category;
3
if
(
this
.pages
>= 250 )
4
category
=
'Big
Books'
;
5
else
6
category
=
"Small
Books"
;
7
emit(category,
{name:
this
.name});
8
};
所返回的结果:

{"Big Books",[{name:"Understanding XML"},{name :"Understanding Web Services"}]);

{"Small Books",[{name:"Understanding JAVA"},{name :"Understanding JSON"},{name:"Understanding Axis2"}]);

7. 编写 Reduce 函数

1
>
var
reduce
=
function
(key,
values) {
2
var
sum
= 0;
3
values.forEach(
function
(doc)
{
4
sum
+= 1;
5
});
6
return
{books:
sum};
7
};
8. 在 books 集合中运行 MapReduce

1
>
var
count
= db.books.mapReduce(map,reduce,{out:
"book_results"
});
2
>
db[count.result].find()
3
4
{
"_id"
:
"Big
Books"
,
"value"
:
{
"books"
:
2 } }
5
{
"_id"
:
"Small
Books"
,
"value"
:
{
"books"
:
3 } }
上述结果表明我们有两本大书和三本小书。
下面开始书说本文重点,使用pymongo调用mapreduce统计一个存着网络攻击信息数据库中,每秒每条链路的攻击量

首先引入要使用的库

import pymongo
from bson.code import Code


连接数据库

conn = pymongo.Connection("localhost",27017)
db = conn[DBNAME]
account = db[ACCOUNTNAME]


下面写map函数

mapper = Code("""
function(){
var time = parseInt(this.time);
switch(time){
"""+codestring+"""
}
}
""")
写map函数中codestring代码的生成

    for i in range(maxtime):
juge = str(i)
key = str(i+1)
codestring += "case {0}:emit({1}+':'+this.srcnode+'->'+this.dstnode,0); break;".format(juge,key)
codestring += "defualt:return;"


写reduce函数

    reducer = Code("""
function(key,values){
key = key.split(':');
return {"time":parseInt(key[0]),"package":values.length};
}
""")


执行mapreduce

result = account.map_reduce(mapper,reducer,"myresult",query = {})


打印result结果

for i in result.find():print i
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: