MongoDB学习(五):聚合、管道与MapReduce
目录
聚合是MongoDB的高级查询框架,实际上在MySQL等关系数据库中,也有GROUP BY这样的类似功能。其主要作用是,从多个文档中提取、转换和整合数据,形成新的信息,可以用来发现文档间的一些关系,或者挖掘单个文档不具备的信息。例如,春节快到了,如果一家商店的店长想统计每月销售额、每种商品销售额、整年销售额,就必须以时间或商品ID作为分组条件进行统计。MongoDB提供了聚合和MapReduce两种工具,聚合要简单些。
一.聚合&管道
管道是计算机领域一个很普遍的概念,指的是对于一系列操作,前一个操作的结果通过管道输送给后一个操作,作为其输入。一个典型例子是linux的管道,通过配合grep、awk等工具,可以很方便的从命令行输出中提取出需要的信息。
1.操作
MongoDB有如下管道操作:
- $project:可以用来修改文档的结构,类似于SQL的select 可以用来对域进行重命名
- 可以用来增加成员域
- 可以删除指定域
- 对于时间:
$dayOfYear: 返回该日期是这一年的第几天(全年 366 天)。
- $sum:计算总和
聚合管道的使用形式为:
db.collection_name.aggregate(pipeline,options)
有如下选项:
- explain:打印执行计划,布尔值
- allowDiskUse:使用磁盘暂存中间结果,布尔值,主要是为了解决中间数据太大(100MB以上)导致的报错
- cursor:用于逐个获取管道结果,避免超出16MB限制
2.例子
下面举一些例子,先展示下user集合的数据,一共6条:
[code]> db.user.find() { "_id" : ObjectId("5c3eef6d7da85af675c7c107"), "name" : "zhangsan", "sex" : "man", "age" : 21, "hobby" : "programming" } { "_id" : ObjectId("5c3eefee7da85af675c7c108"), "name" : "lisi", "sex" : "woman", "age" : 16, "hobby" : "music" } { "_id" : ObjectId("5c3ef0037da85af675c7c109"), "name" : "wangwu", "sex" : "man", "age" : 18, "hobby" : "read book" } { "_id" : ObjectId("5c3f1885cce0b679769390fa"), "name" : "lucy", "age" : 22, "hobby" : "movie", "sex" : "woman" } { "_id" : ObjectId("5c3f1bbfcce0b67976939109"), "name" : "tom", "age" : 2 } { "_id" : ObjectId("5c3f22a87da85af675c7c10a"), "name" : 10 }
1)首先统计每种性别的数量:
[code]> db.user.aggregate({$group:{_id:'$sex',count:{$sum:1}}}) { "_id" : null, "count" : 2 } { "_id" : "woman", "count" : 2 } { "_id" : "man", "count" : 2 }
_id就是用来指定分组依据,这里是sex域,注意需要带上美元符号$对于集合内每一个文档,如果sex字段相同,则分入同一组,反之亦然;$sum操作符的值(即1)代表每个组中每有一个文档,统计结果的"count"字段的值就加1。这个查询可以翻译成:select count(*) from user group by sex。可以看到,对于不存在的字段,也会作为null值参与统计。
2)假如只想统计有sex字段的文档,就可以用上$match操作符:
[code]> db.user.aggregate({$match:{sex:{$exists:1}}},{$group:{_id:'$sex',count:{$sum:1}} }) { "_id" : "woman", "count" : 2 } { "_id" : "man", "count" : 2 }
可以看到,$match的用法和find()函数很像。另外,当aggregate方法中存在多个操作时(即管道有多个环节),按照从左到右顺序执行。
3)MongoDB为aggregate提供了一个forEach()方法,可接受一个JavaScript函数,并继续处理管道内的数据:
[code]> db.user.aggregate({$match:{sex:{$exists:1}}},{$group:{_id:'$sex',count:{$sum:1}} }).forEach(function(doc){if(doc.count%2 == 0) db.result.insert(doc);}) > db.result.find() { "_id" : "woman", "count" : 2 } { "_id" : "man", "count" : 2 }
这里使用的函数是,如果管道内的结果的count字段能整除2,则插入到result集合中,由于两个结果的count都是2,均满足条件,因此都插入了。
4)上例和以下语句作用相同:
[code]> db.result.drop() true > db.user.aggregate({$match:{sex:{$exists:1}}},{$group:{_id:'$sex',count:{$sum:1}} },{$match:{count:{$mod:[2,0]}}},{$out:"result"}) > db.result.find() { "_id" : "woman", "count" : 2 } { "_id" : "man", "count" : 2 }
在这个管道中,首先过滤了没有sex域的文档,然后根据sex域的值进行分组,之后取count字段的值可以整除2的分组结果写入result集合。
5)如果不想看到_id字段,就可以用上$project了:
[code]> db.user.aggregate({$match:{sex:{$exists:1}}},{$group:{_id:'$sex',count:{$sum:1}} },{$match:{count:{$mod:[2,0]}}},{$project:{_id:0}}) { "count" : 2 } { "count" : 2 }
6)$project还可以用来改变字段名:
[code]> db.user.aggregate({$match:{sex:{$exists:1}}},{$group:{_id:'$sex',count:{$sum:1}} },{$match:{count:{$mod:[2,0]}}},{$project:{"性别":"$_id","人数":"$count"}}) { "_id" : "woman", "性别" : "woman", "人数" : 2 } { "_id" : "man", "性别" : "man", "人数" : 2 }
显然,_id字段不能被改名,对其使用$project相当于把该域的值复制一份再改名
由于$project具有投影的能力,因此也可以使用一些操作符,来实现诸如大小写转换、数学运算等操作:
- 字符串类: $concat:将给定的若干字符串连接为一个字符串
- $strcasecmp:字符串比较(大小写不敏感)
- $sunstr:对给定字符串进行裁剪
- $toLower、$toUpper:全小写/全大写
- $add、$substract、$multiply、$divide:加减乘除
- $and:逻辑和,$not:逻辑非,$or:逻辑或
- $setEquals:比较两个集合是否相同(包含相同元素)
- $meta:返回一些全文搜索的相关信息
3.提高管道性能
管道虽然好用,但是以下因素会对其性能产生影响:
- 文档数量和文档的体积:应当尽可能小
- 索引:可以大大加速管道操作,不过只能用于$match和$sort
- 分片:如果集合进行了分片,那么只有$match和$project会在各个分片上运行,其他操作只会在主分片运行,一旦使用过其他操作,那么在此之后,管道只会在主分片上运行
例如:对$match和$group分别explain如下(需要先建立索引)
$match:
[code]> db.user.aggregate([{$match:{sex:{$exists:1}}}],{explain:true}) { …… "indexName" : "sex_1", …… }
可以看到,这里使用了索引
$group:
[code]> db.user.aggregate([{$group:{_id:'$sex',count:{$sum:1}}}],{explain:true}) { "stages" : [ { "$cursor" : { "query" : { }, "fields" : { "sex" : 1, "_id" : 0 }, "queryPlanner" : { "plannerVersion" : 1, "namespace" : "test.user", "indexFilterSet" : false, "parsedQuery" : { }, "winningPlan" : { "stage" : "COLLSCAN", "direction" : "forward" }, "rejectedPlans" : [ ] } } }, { "$group" : { "_id" : "$sex", "count" : { "$sum" : { "$const" : 1 } } } } ], "ok" : 1 }
并没有使用索引。在数据量小、管道短的情况下,感觉上没有什么差距,不过一旦任务复杂起来,差距就会很明显了。因此,在使用管道前,最好对相关域建立索引,或者建立专用的索引域。
二.Map/Reduce
MapReduce是MongoDB另一个数据处理工具,来源是Google的论文,主要思想是 分治-聚合 ,即将大的任务分割为小的任务并行处理,然后将结果聚合在一起。主要用在分布式、大数据条件下。
函数原型为:
db.collection_name.mapReduce(map,reduce,option)
map、reduce是两个函数。map函数生成键值对序列,使用emit返回,作为 reduce 函数参数;reduce函数将key-values变成key-value,也就是把values数组变成一个单一的值value。
option:
- out 统计结果存放集合 (不指定则使用临时集合,在客户端断开后自动删除)。
- query 一个筛选条件,只有满足条件的文档才会调用map函数。(query。limit,sort可以随意组合)
- sort 和limit结合的sort排序参数(也是在发往map函数前给文档排序),可以优化分组机制
- limit 发往map函数的文档数量的上限(要是没有limit,单独使用sort的用处不大)
示例如下:
[code]> map = function(){emit(this.sex,1)} function (){emit(this.sex,1)} > reduce = function(k,v){return Array.sum(v)} function (k,v){return Array.sum(v)} > db.user.mapReduce(map,reduce,{query:{sex:{$exists:1}},out:"result"}) { "result" : "result", "timeMillis" : 115, "counts" : { "input" : 4, "emit" : 4, "reduce" : 2, "output" : 2 }, "ok" : 1 } > db.result.find() { "_id" : "man", "value" : 2 } { "_id" : "woman", "value" : 2 }
效果和聚合一节中第二个例子一样
- 学习MongoDB--(6-2):聚合(MapReduce使用)
- 学习MongoDB--(6-2):聚合(MapReduce使用)
- MongoDB 聚合管道(Aggregation Pipeline)
- 学习MongoDB 十: MongoDB聚合(Map-Reduce)(二)
- Mongodb中数据聚合之聚合管道aggregate
- MongoDB 聚合管道(二)(Aggregation Pipeline)
- MongoDB小结31 - 聚合管道【$skip】
- MongoDB 聚合管道(Aggregation Pipeline)
- MongoDB小结34 - 聚合管道【$group】
- mongodb学习3---mongo的MapReduce
- MongoDB 聚合管道(一)(Aggregation Pipeline)
- MongoDB小结32 - 聚合管道【$skip】
- mongodb 学习笔记五 MapReduce
- 学习MongoDB--(6-1):聚合(初级聚合函数使用)
- MongoDB小结27 - 聚合管道【$project】
- MongoDB 聚合管道(一)(Aggregation Pipeline)
- MongoDB管道聚合各阶段
- 【翻译】MongoDB指南/聚合——聚合管道
- MongoDB小结35 - 聚合管道【$sort】
- MongoDB聚合操作 (group, aggregate, mapReduce操作)