mongodb 的MapReduce doc及example
2012-04-28 13:59
357 查看
来源:
http://www.mongodb.org/display/DOCS/MapReduce
map/reduce is invoked via a database command.
Typically the database creates a collection to hold output of the operation. mapand reduce functions
are written in JavaScript and execute on the server.
Command syntax:
* finalize - function to apply to all the results when finished
keeptemp - if true, the generated collection is not treated as temporary. Defaults to false. When out is
specified, the collection is automatically made permanent. (MongoDB <=1.6)
scope - can pass in variables that can be access from map/reduce/finalize. Note that updates to scope variables' values are not shared among shard members, so in a sharded
cluster you should treat scope variables as global constants.
example mr5
verbose - provide statistics on job execution time
If the data set over which you'd like to perform map-reduce aggregations is constantly growing, then you may want to take advantage of incremental map-reduce. The prevents you from having to aggregate over the entire data set each time you want to see your
results.
To perform incremental map-reduce, take the following steps:
1. First, run a map-reduce job over an existing collection, and output the data to it's own output collection.
2. When you have more data to process, run a second map-reduce job, but use the query option to the filter the documents to include only new documents.
3. Use the reduce output option. This will use your reduce function to merge the new data into the existing output collection.
pre-v1.8: If you do not specify a value for out, then the results will be placed
into a temporary collection whose name will be given in command's output (see below). Otherwise, you can specify the name of a collection for the out option and the results
will be placed there.
v1.8+: the output options have changed. Map-reduce no longer generates temporary collections (thus, keepTemp has
been removed). Now, you must always supply a value for out. The out directives are:
"collectionName" - By default the output will by of type "replace".
{ replace : "collectionName" } - the output will be inserted into a collection which will atomically replace any existing collection with the same name.
{ merge : "collectionName" } - This option will merge new data into the old output collection. In other words, if the same key exists in both the result set and the old
collection, the new key will overwrite the old one.
{ reduce : "collectionName" } - If documents exists for a given key in the result set and in the old collection, then a reduce operation (using the specified reduce function)
will be performed on the two values and the result will be written to the output collection. If a finalize function was provided, this will be run after the reduce as well.
{ inline : 1} - With this option, no collection will be created, and the whole map-reduce operation will happen in RAM. Also, the results of the map-reduce will be returned
within the result object. Note that this option is possible only when the result set fits within the 16MB limit of a single document. In v2.0, this is your only available
option on a replica set secondary.
For example:
Additional options within out objects are:
"db" - the db name to output to.
{ sharded : 1} - MongoDB 1.9+ If true and combined with an output mode that writes
to a collection, the output collection will be sharded using the _id field. See details in the sharding section.
Note: the order of the objects in the out parameter matter.
Either the result or the results field will be present depending on your output type.
The results element is only present if the inline output option was used. The value of the results element
is an array of embedded documents containing the results. If you chose any other output type the result field will be a string with the name of the collection holding the
results, or an embedded document containing the db and collection if you chose to output to another db.
A command helper is available in the MongoDB shell :
map, reduce, and finalize functions
are written in JavaScript.
The map function references the variable this to inspect the current object under consideration.
A map function callsemit(key,value) any number of times to feed data to the reducer. In most cases you will emit once per input document, but in some cases such as counting
tags, a given document may have one, many, or even zero tags. Each emit is limited to 50% of the maximum document size (e.g. 4MB for 1.6.x and 8MB for 1.8.x).
When you run a map/reduce, the reduce function will receive an array of emitted values and reduce them to a single value. Because the reduce function might be invoked more
than once for the same key, the structure of the object returned by the reduce function must be identical to the structure of the map function's emitted value. We can clarify
this with a simple example.
Suppose we're iterating over a collection of documents that represent user comments. A sample document might look like this:
We want to use map/reduce to count the total number of comments per user and aggregate the total number of "likes" received across all of a user's comments. To do this, we'd first write a map function
like this one:
This essentially says that we'll be grouping by username and aggregating using an object with fields for count and likes.
When map/reduce is actually run, an array of values for each username will be sent to the reduce function. That's why the reducefunction
is always written to process an array of values. Here's the appropriate function for this example:
Notice that the result document has the same structure as the documents emitted by the map function. This is important because, when the reduce function
is run against a given key, it's not guaranteed to process every single value for that key (or username). In fact, the reduce function may have to run more than once. For
example, while processing the comments collection, the map function might encounter ten comments from the user "jones." It then sends those comments' data to be reduced,
and this results in the following aggregate object:
Later, the map function encounters one more comment document by "jones." When this happens, the values in the extra comment must be reduced against the already existing aggregate
value. If the new emitted document looks like this:
Then the reduce function will be invoked in this way:
And the resulting document will be a simple combination (or reduction) of those values:
So long as you understand that the reduce function might be invoked more than once for the same key, it's easy to see why the this function must return a value whose structure
matches the map function's emitted value.
OR
The map/reduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function:
This also means the following is true:
If you need to perform an operation only once, use a finalize function.
A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value,
and returns a finalized value.
Your reduce function may be called multiple times for the same object. Use finalize when something should only be done a single time at the end; for example calculating an average.
v2.0+
Normally, map/reduce execution follows the steps:
convert from BSON to JS, execute map, convert from JS to BSON
convert from BSON to JS, execute reduce, convert from JS to BSON
Thus it requires several translations but it can handle very large datasets during mapping by using a temporary collection.
It is possible to make the execution stay in JS by using {jsMode: true} which performs the following steps:
convert from BSON to JS, execute map
execute reduce, convert from JS to BSON
The execution time may be significantly reduced. Note that this mode is limited by either
There are 2 aspects of sharding with Map/Reduce, input and output.
If the input collection is sharded, MongoS will automatically dispatch the map/reduce job to each of the shard, to be executed in parallel. There is no special option required. MongoS will then wait for jobs on all shards to finish.
By default the output collection will not be sharded. The process is:
MongoS dispatches a map/reduce finish job to the shard that will store the target collection.
that mongod will pull results from all other shards, run a final reduce/finalize, and write to the output.
If using the "sharded" option in the "out" object, the output will be sharded using "_id" as the shard key.
The process is:
MongoS pulls the results from each shard, doing a merge sort to get them ordered.
on the fly, it does reduce/finalize as needed. Then writes the result to the output collection in sharded mode.
Notes about sharded output:
though MongoS does some processing, only a small amount of memory is required even for large datasets.
there is currently a limitation in that shard chunks do not get automatically split and migrated during insertion. Some manual commands may be required until the chunks are granular and balanced.
the limit option is not supported.
The following example assumes we have an events collection with objects of the form:
We then use MapReduce to extract all users who have had at least one event of type "sale":
If we also wanted to output the number of times the user had experienced the event in question, we could modify the reduce function like so:
Note, here, that we cannot simply return vals.length, as the reduce may be called multiple times.
* gist
This example is a JavaScript script file. The map-reduce can be run repeatedly on different dates to incrementally augment the result. The finalize option computes averages.
The output of commands and the queries themselves are saved to variables so that they can be examined after the sample script is run via the load() command in the shell.
example mr1
Finalize example: example mr2
Even when a permanent collection name is specified, a temporary collection name will be used during processing. At map/reduce completion, the temporary collection will be renamed to the permanent name atomically. Thus, one can perform a map/reduce job periodically
with the same target collection name without worrying about a temporary state of incomplete data. This is very useful when generating statistical output collections on a regular basis.
See info on Concurrency
Map/reduce, geospatial indexing, and other cool features - Kristina Chodorow
at MongoSF (April 2010)
See Troubleshooting MapReduce
http://www.mongodb.org/display/DOCS/MapReduce
Overview
map/reduce is invoked via a database command.Typically the database creates a collection to hold output of the operation. mapand reduce functions
are written in JavaScript and execute on the server.
Command syntax:
db.runCommand( { mapreduce : <collection>, map : <mapfunction>, reduce : <reducefunction> [, query : <query filter object>] [, sort : <sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces>] [, limit : <number of objects to return from collection, not supported with sharding>] [, out : <see output options below>] [, keeptemp: <true|false>] [, finalize : <finalizefunction>] [, scope : <object where fields go into javascript global scope >] [, jsMode : true] [, verbose : true] } );
* finalize - function to apply to all the results when finished
keeptemp - if true, the generated collection is not treated as temporary. Defaults to false. When out is
specified, the collection is automatically made permanent. (MongoDB <=1.6)
scope - can pass in variables that can be access from map/reduce/finalize. Note that updates to scope variables' values are not shared among shard members, so in a sharded
cluster you should treat scope variables as global constants.
example mr5
verbose - provide statistics on job execution time
Incremental Map-reduce
If the data set over which you'd like to perform map-reduce aggregations is constantly growing, then you may want to take advantage of incremental map-reduce. The prevents you from having to aggregate over the entire data set each time you want to see yourresults.
To perform incremental map-reduce, take the following steps:
1. First, run a map-reduce job over an existing collection, and output the data to it's own output collection.
2. When you have more data to process, run a second map-reduce job, but use the query option to the filter the documents to include only new documents.
3. Use the reduce output option. This will use your reduce function to merge the new data into the existing output collection.
Output options
pre-v1.8: If you do not specify a value for out, then the results will be placedinto a temporary collection whose name will be given in command's output (see below). Otherwise, you can specify the name of a collection for the out option and the results
will be placed there.
v1.8+: the output options have changed. Map-reduce no longer generates temporary collections (thus, keepTemp has
been removed). Now, you must always supply a value for out. The out directives are:
"collectionName" - By default the output will by of type "replace".
{ replace : "collectionName" } - the output will be inserted into a collection which will atomically replace any existing collection with the same name.
{ merge : "collectionName" } - This option will merge new data into the old output collection. In other words, if the same key exists in both the result set and the old
collection, the new key will overwrite the old one.
{ reduce : "collectionName" } - If documents exists for a given key in the result set and in the old collection, then a reduce operation (using the specified reduce function)
will be performed on the two values and the result will be written to the output collection. If a finalize function was provided, this will be run after the reduce as well.
{ inline : 1} - With this option, no collection will be created, and the whole map-reduce operation will happen in RAM. Also, the results of the map-reduce will be returned
within the result object. Note that this option is possible only when the result set fits within the 16MB limit of a single document. In v2.0, this is your only available
option on a replica set secondary.
For example:
db.users.mapReduce(map, reduce, {out: { inline : 1}});
Additional options within out objects are:
"db" - the db name to output to.
out : {replace : "collectionName", db : "otherDB"}
{ sharded : 1} - MongoDB 1.9+ If true and combined with an output mode that writes
to a collection, the output collection will be sharded using the _id field. See details in the sharding section.
Note: the order of the objects in the out parameter matter.
Result object
{ [results : <document_array>,] [result : <collection_name> | {db: <db>, collection: <collection_name>},] timeMillis : <job_time>, counts : { input : <number of objects scanned>, emit : <number of times emit was called>, output : <number of items in output collection> } , ok : <1_if_ok> [, err : <errmsg_if_error>] }
Either the result or the results field will be present depending on your output type.
The results element is only present if the inline output option was used. The value of the results element
is an array of embedded documents containing the results. If you chose any other output type the result field will be a string with the name of the collection holding the
results, or an embedded document containing the db and collection if you chose to output to another db.
A command helper is available in the MongoDB shell :
db.collection.mapReduce(mapfunction,reducefunction[,options]);
map, reduce, and finalize functions
are written in JavaScript.
Map Function
The map function references the variable this to inspect the current object under consideration.A map function callsemit(key,value) any number of times to feed data to the reducer. In most cases you will emit once per input document, but in some cases such as counting
tags, a given document may have one, many, or even zero tags. Each emit is limited to 50% of the maximum document size (e.g. 4MB for 1.6.x and 8MB for 1.8.x).
function map(void) -> void
Reduce Function
When you run a map/reduce, the reduce function will receive an array of emitted values and reduce them to a single value. Because the reduce function might be invoked morethan once for the same key, the structure of the object returned by the reduce function must be identical to the structure of the map function's emitted value. We can clarify
this with a simple example.
Suppose we're iterating over a collection of documents that represent user comments. A sample document might look like this:
{ username: "jones", likes: 20, text: "Hello world!" }
We want to use map/reduce to count the total number of comments per user and aggregate the total number of "likes" received across all of a user's comments. To do this, we'd first write a map function
like this one:
function() { emit( this.username, {count: 1, likes: this.likes} ); }
This essentially says that we'll be grouping by username and aggregating using an object with fields for count and likes.
When map/reduce is actually run, an array of values for each username will be sent to the reduce function. That's why the reducefunction
is always written to process an array of values. Here's the appropriate function for this example:
function(key, values) { var result = {count: 0, likes: 0}; values.forEach(function(value) { result.count += value.count; result.likes += value.likes; }); return result; }
Notice that the result document has the same structure as the documents emitted by the map function. This is important because, when the reduce function
is run against a given key, it's not guaranteed to process every single value for that key (or username). In fact, the reduce function may have to run more than once. For
example, while processing the comments collection, the map function might encounter ten comments from the user "jones." It then sends those comments' data to be reduced,
and this results in the following aggregate object:
{ count: 10, likes: 247 }
Later, the map function encounters one more comment document by "jones." When this happens, the values in the extra comment must be reduced against the already existing aggregate
value. If the new emitted document looks like this:
{ count: 1, likes: 5 }
Then the reduce function will be invoked in this way:
reduce("jones", [ {count: 10, likes: 247}, { count: 1, likes: 5} ] )
And the resulting document will be a simple combination (or reduction) of those values:
{ count: 11, likes: 252 }
So long as you understand that the reduce function might be invoked more than once for the same key, it's easy to see why the this function must return a value whose structure
matches the map function's emitted value.
A more technical explanation
function reduce(key, array_of_value) -> value
OR
function reduce(key_obj, [value_obj, value_obj, ...]) -> value_obj
The map/reduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function:
for all k,vals : reduce( k, [reduce(k,vals)] ) == reduce(k,vals)
This also means the following is true:
reduce( k, [A, B] ) == reduce( k, [B, A] )
If you need to perform an operation only once, use a finalize function.
The output of the map function's emit (the second argument) and the value returned by reduce should be the same format to make iterative reduce possible. If not, there will be weird bugs that are hard to debug. |
Currently, the return value from a reduce function cannot be an array (it's typically an object or a number). |
Finalize Function
A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value,and returns a finalized value.
function finalize(key, value) -> final_value
Your reduce function may be called multiple times for the same object. Use finalize when something should only be done a single time at the end; for example calculating an average.
jsMode flag
v2.0+Normally, map/reduce execution follows the steps:
convert from BSON to JS, execute map, convert from JS to BSON
convert from BSON to JS, execute reduce, convert from JS to BSON
Thus it requires several translations but it can handle very large datasets during mapping by using a temporary collection.
It is possible to make the execution stay in JS by using {jsMode: true} which performs the following steps:
convert from BSON to JS, execute map
execute reduce, convert from JS to BSON
The execution time may be significantly reduced. Note that this mode is limited by either
jsMode is limited by the JS heap size and a maximum of 500k unique keys. Consequently it is not suitable for large jobs in which case mongo may revert to regular mode. |
Sharded Environments
There are 2 aspects of sharding with Map/Reduce, input and output.
Sharded input
If the input collection is sharded, MongoS will automatically dispatch the map/reduce job to each of the shard, to be executed in parallel. There is no special option required. MongoS will then wait for jobs on all shards to finish.
Sharded output
By default the output collection will not be sharded. The process is:MongoS dispatches a map/reduce finish job to the shard that will store the target collection.
that mongod will pull results from all other shards, run a final reduce/finalize, and write to the output.
If using the "sharded" option in the "out" object, the output will be sharded using "_id" as the shard key.
The process is:
MongoS pulls the results from each shard, doing a merge sort to get them ordered.
on the fly, it does reduce/finalize as needed. Then writes the result to the output collection in sharded mode.
Notes about sharded output:
though MongoS does some processing, only a small amount of memory is required even for large datasets.
there is currently a limitation in that shard chunks do not get automatically split and migrated during insertion. Some manual commands may be required until the chunks are granular and balanced.
the limit option is not supported.
Examples
Shell Example 1
The following example assumes we have an events collection with objects of the form:{ time : <time>, user_id : <userid>, type : <type>, ... }
We then use MapReduce to extract all users who have had at least one event of type "sale":
> m = function() { emit(this.user_id, 1); } > r = function(k,vals) { return 1; } > res = db.events.mapReduce(m, r, { query : {type:'sale'} }); > // or in v1.8+: > // res = db.events.mapReduce(m, r, { query : {type:'sale'}, out : 'example1' }); > db[res.result].find().limit(2) { "_id" : 8321073716060 , "value" : 1 } { "_id" : 7921232311289 , "value" : 1 }
If we also wanted to output the number of times the user had experienced the event in question, we could modify the reduce function like so:
> r = function(k,vals) { ... var sum=0; ... for(var i in vals) sum += vals[i]; ... return sum; ... }
Note, here, that we cannot simply return vals.length, as the reduce may be called multiple times.
Shell Example 2
$ ./mongo > db.things.insert( { _id : 1, tags : ['dog', 'cat'] } ); > db.things.insert( { _id : 2, tags : ['cat'] } ); > db.things.insert( { _id : 3, tags : ['mouse', 'cat', 'dog'] } ); > db.things.insert( { _id : 4, tags : [] } ); > // map function > m = function(){ ... this.tags.forEach( ... function(z){ ... emit( z , { count : 1 } ); ... } ... ); ...}; > // reduce function > r = function( key , values ){ ... var total = 0; ... for ( var i=0; i<values.length; i++ ) ... total += values[i].count; ... return { count : total }; ...}; > res = db.things.mapReduce(m, r, { out : "myoutput" } ); > res { "result" : "myoutput", "timeMillis" : 12, "counts" : { "input" : 4, "emit" : 6, "output" : 3 }, "ok" : 1, } > db.myoutput.find() {"_id" : "cat" , "value" : {"count" : 3}} {"_id" : "dog" , "value" : {"count" : 2}} {"_id" : "mouse" , "value" : {"count" : 1}} > db.myoutput.drop()
* gist
Mongo Shell Script with Incremental Map-Reduce and Finalize
This example is a JavaScript script file. The map-reduce can be run repeatedly on different dates to incrementally augment the result. The finalize option computes averages.The output of commands and the queries themselves are saved to variables so that they can be examined after the sample script is run via the load() command in the shell.
// work in the map-reduce example db db = db.getSiblingDB("mrex"); // clean out from previous runs of this sample -- you wouldn't do this in production db.session.drop(); db.session_stat.drop(); // simulate saving records that log the lengths of user sessions in seconds db.session.save({userid:"a", ts: ISODate('2011-11-03 14:17:00'), length: 95}); db.session.save({userid:"b", ts: ISODate('2011-11-03 14:23:00'), length: 110}); db.session.save({userid:"c", ts: ISODate('2011-11-03 15:02:00'), length: 120}); db.session.save({userid:"d", ts: ISODate('2011-11-03 16:45:00'), length: 45}); db.session.save({userid:"a", ts: ISODate('2011-11-04 11:05:00'), length: 105}); db.session.save({userid:"b", ts: ISODate('2011-11-04 13:14:00'), length: 120}); db.session.save({userid:"c", ts: ISODate('2011-11-04 17:00:00'), length: 130}); db.session.save({userid:"d", ts: ISODate('2011-11-04 15:37:00'), length: 65}); /* For each user, count up the number of sessions, and figure out the average session length. Note that to be able to find the average session length, we need to keep a total of the all the session lengths, and then divide at the end. We're also going to set this up so that we can repeat the process to get incremental results over time. */ function mapf() { emit(this.userid, {userid:this.userid, total_time:this.length, count:1, avg_time:0}); } function reducef(key, values) { var r = {userid:key, total_time:0, count:0, avg_time:0}; values.forEach(function(v) { r.total_time += v.total_time; r.count += v.count; }); return r; } function finalizef(key, value) { if (value.count > 0) value.avg_time = value.total_time / value.count; return value; } /* Here's the initial run. The query isn't technically necessary, but is included here to demonstrate how this is the same map-reduce command that will be issued later to do incremental adjustment of the computed values. The query is assumed to run once a day at midnight. */ var mrcom1 = db.runCommand( { mapreduce:"session", map:mapf, reduce:reducef, query: {ts: {$gt:ISODate('2011-11-03 00:00:00')}}, out: { reduce: "session_stat" }, finalize:finalizef }); function saveresults(a) { /* append everything from the cursor to the argument array */ var statcurs = db.session_stat.find(); while(statcurs.hasNext()) a.push(statcurs.next()); } /* save the results into mrres1 */ var mrres1 = []; saveresults(mrres1); /* add more session records (the next day) */ db.session.save({userid:"a", ts: ISODate('2011-11-05 14:17:00'), length: 100}); db.session.save({userid:"b", ts: ISODate('2011-11-05 14:23:00'), length: 115}); db.session.save({userid:"c", ts: ISODate('2011-11-05 15:02:00'), length: 125}); db.session.save({userid:"d", ts: ISODate('2011-11-05 16:45:00'), length: 55}); /* Run map reduce again. This time, the query date is the next midnight, simulating a daily job that is used to update the values in session_stat. This can be repeated daily (or on other periods, with suitable adjustments to the time). */ var mrcom2 = db.runCommand( { mapreduce:"session", map:mapf, reduce:reducef, query: {ts: {$gt:ISODate('2011-11-05 00:00:00')}}, out: { reduce: "session_stat" }, finalize:finalizef }); /* save the results into mrres2 */ var mrres2 = []; saveresults(mrres2);
More Examples
example mr1Finalize example: example mr2
Note on Permanent Collections
Even when a permanent collection name is specified, a temporary collection name will be used during processing. At map/reduce completion, the temporary collection will be renamed to the permanent name atomically. Thus, one can perform a map/reduce job periodicallywith the same target collection name without worrying about a temporary state of incomplete data. This is very useful when generating statistical output collections on a regular basis.
Parallelism
See info on Concurrency
Presentations
Map/reduce, geospatial indexing, and other cool features - Kristina Chodorowat MongoSF (April 2010)
Troubleshooting
See Troubleshooting MapReduce
相关文章推荐
- logstash 如何处理 mongodb 导出来的 _id value数据。 how to custom fields of logstash by mongo mapreduce exported data.(example format: {_id:"xxx"} , value:{})
- MongoDB:MapReduce基础及实例
- 【MongoDB数据库】Java MongoDB CRUD Example
- ES-MongoDB学习错误1_Could not import mongo_connector.doc_managers.elastic_doc_manager
- Spring Data MongoDB hello world example
- 【mongoDB高级篇②】大数据聚集运算之mapReduce(映射化简)
- MongoDB--MapReduce分组去重用法
- mongodb&nbsp;mapReduce实例
- MongoDb 用 mapreduce 统计留存率
- 【MongoDB数据库】Java MongoDB CRUD Example
- mongodb mapreduce示例
- MongoDB--MapReduce分布统计s
- Mongodb MapReduce示例1个
- mongodb 分片配置 以及mapreduce
- 初识mongodb-mapreduce
- MongoDB JAVA-API执行mapreduce的几种方法
- MapReduce program example code
- MongoDB使用mapReduce实例
- mongodb mapreduce应该注意的两个地方
- [Spring Data MongoDB]学习笔记--MapReduce