您的位置:首页 > 数据库 > Mongodb

mongodb 的MapReduce doc及example

2012-04-28 13:59 357 查看
来源:

http://www.mongodb.org/display/DOCS/MapReduce


Overview

map/reduce is invoked via a database command.
Typically the database creates a collection to hold output of the operation. mapand reduce functions
are written in JavaScript and execute on the server.

Command syntax:

db.runCommand(
{ mapreduce : <collection>,
map : <mapfunction>,
reduce : <reducefunction>
[, query : <query filter object>]
[, sort : <sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces>]
[, limit : <number of objects to return from collection, not supported with sharding>]
[, out : <see output options below>]
[, keeptemp: <true|false>]
[, finalize : <finalizefunction>]
[, scope : <object where fields go into javascript global scope >]
[, jsMode : true]
[, verbose : true]
}
);


* finalize - function to apply to all the results when finished

keeptemp - if true, the generated collection is not treated as temporary. Defaults to false. When out is
specified, the collection is automatically made permanent. (MongoDB <=1.6)

scope - can pass in variables that can be access from map/reduce/finalize. Note that updates to scope variables' values are not shared among shard members, so in a sharded
cluster you should treat scope variables as global constants.

example mr5

verbose - provide statistics on job execution time


Incremental Map-reduce

If the data set over which you'd like to perform map-reduce aggregations is constantly growing, then you may want to take advantage of incremental map-reduce. The prevents you from having to aggregate over the entire data set each time you want to see your
results.

To perform incremental map-reduce, take the following steps:

1. First, run a map-reduce job over an existing collection, and output the data to it's own output collection.

2. When you have more data to process, run a second map-reduce job, but use the query option to the filter the documents to include only new documents.

3. Use the reduce output option. This will use your reduce function to merge the new data into the existing output collection.


Output options

pre-v1.8: If you do not specify a value for out, then the results will be placed
into a temporary collection whose name will be given in command's output (see below). Otherwise, you can specify the name of a collection for the out option and the results
will be placed there.

v1.8+: the output options have changed. Map-reduce no longer generates temporary collections (thus, keepTemp has
been removed). Now, you must always supply a value for out. The out directives are:

"collectionName" - By default the output will by of type "replace".

{ replace : "collectionName" } - the output will be inserted into a collection which will atomically replace any existing collection with the same name.

{ merge : "collectionName" } - This option will merge new data into the old output collection. In other words, if the same key exists in both the result set and the old
collection, the new key will overwrite the old one.

{ reduce : "collectionName" } - If documents exists for a given key in the result set and in the old collection, then a reduce operation (using the specified reduce function)
will be performed on the two values and the result will be written to the output collection. If a finalize function was provided, this will be run after the reduce as well.

{ inline : 1} - With this option, no collection will be created, and the whole map-reduce operation will happen in RAM. Also, the results of the map-reduce will be returned
within the result object. Note that this option is possible only when the result set fits within the 16MB limit of a single document. In v2.0, this is your only available
option on a replica set secondary.

For example:

db.users.mapReduce(map, reduce, {out: { inline : 1}});


Additional options within out objects are:

"db" - the db name to output to.

out : {replace : "collectionName", db : "otherDB"}


{ sharded : 1} - MongoDB 1.9+ If true and combined with an output mode that writes
to a collection, the output collection will be sharded using the _id field. See details in the sharding section.

Note: the order of the objects in the out parameter matter.


Result object

{
[results : <document_array>,]
[result : <collection_name> | {db: <db>, collection: <collection_name>},]
timeMillis : <job_time>,
counts : {
input :  <number of objects scanned>,
emit  : <number of times emit was called>,
output : <number of items in output collection>
} ,
ok : <1_if_ok>
[, err : <errmsg_if_error>]
}


Either the result or the results field will be present depending on your output type.
The results element is only present if the inline output option was used. The value of the results element
is an array of embedded documents containing the results. If you chose any other output type the result field will be a string with the name of the collection holding the
results, or an embedded document containing the db and collection if you chose to output to another db.

A command helper is available in the MongoDB shell :

db.collection.mapReduce(mapfunction,reducefunction[,options]);


map, reduce, and finalize functions
are written in JavaScript.


Map Function

The map function references the variable this to inspect the current object under consideration.
A map function callsemit(key,value) any number of times to feed data to the reducer. In most cases you will emit once per input document, but in some cases such as counting
tags, a given document may have one, many, or even zero tags. Each emit is limited to 50% of the maximum document size (e.g. 4MB for 1.6.x and 8MB for 1.8.x).

function map(void) -> void



Reduce Function

When you run a map/reduce, the reduce function will receive an array of emitted values and reduce them to a single value. Because the reduce function might be invoked more
than once for the same key, the structure of the object returned by the reduce function must be identical to the structure of the map function's emitted value. We can clarify
this with a simple example.

Suppose we're iterating over a collection of documents that represent user comments. A sample document might look like this:

{ username: "jones",
likes: 20,
text: "Hello world!"
}


We want to use map/reduce to count the total number of comments per user and aggregate the total number of "likes" received across all of a user's comments. To do this, we'd first write a map function
like this one:

function() {
emit( this.username, {count: 1, likes: this.likes} );
}


This essentially says that we'll be grouping by username and aggregating using an object with fields for count and likes.

When map/reduce is actually run, an array of values for each username will be sent to the reduce function. That's why the reducefunction
is always written to process an array of values. Here's the appropriate function for this example:

function(key, values) {
var result = {count: 0, likes: 0};

values.forEach(function(value) {
result.count += value.count;
result.likes += value.likes;
});

return result;
}


Notice that the result document has the same structure as the documents emitted by the map function. This is important because, when the reduce function
is run against a given key, it's not guaranteed to process every single value for that key (or username). In fact, the reduce function may have to run more than once. For
example, while processing the comments collection, the map function might encounter ten comments from the user "jones." It then sends those comments' data to be reduced,
and this results in the following aggregate object:

{ count: 10, likes: 247 }


Later, the map function encounters one more comment document by "jones." When this happens, the values in the extra comment must be reduced against the already existing aggregate
value. If the new emitted document looks like this:

{ count: 1, likes: 5 }


Then the reduce function will be invoked in this way:

reduce("jones", [ {count: 10, likes: 247}, { count: 1, likes: 5} ] )


And the resulting document will be a simple combination (or reduction) of those values:

{ count: 11, likes: 252 }


So long as you understand that the reduce function might be invoked more than once for the same key, it's easy to see why the this function must return a value whose structure
matches the map function's emitted value.


A more technical explanation

function reduce(key, array_of_value) -> value


OR

function reduce(key_obj, [value_obj, value_obj, ...]) -> value_obj


The map/reduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function:

for all k,vals : reduce( k, [reduce(k,vals)] ) == reduce(k,vals)


This also means the following is true:

reduce( k, [A, B] ) == reduce( k, [B, A] )


If you need to perform an operation only once, use a finalize function.


The output of the map function's emit (the second argument) and the value returned by reduce should be the same format to make iterative reduce possible. If not, there will be weird bugs that are hard to debug.

Currently, the return value from a reduce function cannot be an array (it's typically an object or a number).


Finalize Function

A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value,
and returns a finalized value.

function finalize(key, value) -> final_value


Your reduce function may be called multiple times for the same object. Use finalize when something should only be done a single time at the end; for example calculating an average.


jsMode flag

v2.0+

Normally, map/reduce execution follows the steps:

convert from BSON to JS, execute map, convert from JS to BSON

convert from BSON to JS, execute reduce, convert from JS to BSON

Thus it requires several translations but it can handle very large datasets during mapping by using a temporary collection.

It is possible to make the execution stay in JS by using {jsMode: true} which performs the following steps:

convert from BSON to JS, execute map

execute reduce, convert from JS to BSON

The execution time may be significantly reduced. Note that this mode is limited by either


jsMode is limited by the JS heap size and a maximum of 500k unique keys. Consequently it is not suitable for large jobs in which case mongo may revert to regular mode.


Sharded Environments

There are 2 aspects of sharding with Map/Reduce, input and output.


Sharded input

If the input collection is sharded, MongoS will automatically dispatch the map/reduce job to each of the shard, to be executed in parallel. There is no special option required. MongoS will then wait for jobs on all shards to finish.


Sharded output

By default the output collection will not be sharded. The process is:

MongoS dispatches a map/reduce finish job to the shard that will store the target collection.

that mongod will pull results from all other shards, run a final reduce/finalize, and write to the output.

If using the "sharded" option in the "out" object, the output will be sharded using "_id" as the shard key.

The process is:

MongoS pulls the results from each shard, doing a merge sort to get them ordered.

on the fly, it does reduce/finalize as needed. Then writes the result to the output collection in sharded mode.

Notes about sharded output:

though MongoS does some processing, only a small amount of memory is required even for large datasets.

there is currently a limitation in that shard chunks do not get automatically split and migrated during insertion. Some manual commands may be required until the chunks are granular and balanced.

the limit option is not supported.


Examples


Shell Example 1

The following example assumes we have an events collection with objects of the form:

{ time : <time>, user_id : <userid>, type : <type>, ... }


We then use MapReduce to extract all users who have had at least one event of type "sale":

> m = function() { emit(this.user_id, 1); }
> r = function(k,vals) { return 1; }
> res = db.events.mapReduce(m, r, { query : {type:'sale'} });
> // or in v1.8+:
> // res = db.events.mapReduce(m, r, { query : {type:'sale'}, out : 'example1' });
> db[res.result].find().limit(2)
{ "_id" : 8321073716060 , "value" : 1 }
{ "_id" : 7921232311289 , "value" : 1 }


If we also wanted to output the number of times the user had experienced the event in question, we could modify the reduce function like so:

> r = function(k,vals) {
...     var sum=0;
...     for(var i in vals) sum += vals[i];
...     return sum;
... }


Note, here, that we cannot simply return vals.length, as the reduce may be called multiple times.


Shell Example 2

$ ./mongo
> db.things.insert( { _id : 1, tags : ['dog', 'cat'] } );
> db.things.insert( { _id : 2, tags : ['cat'] } );
> db.things.insert( { _id : 3, tags : ['mouse', 'cat', 'dog'] } );
> db.things.insert( { _id : 4, tags : []  } );

> // map function
> m = function(){
...    this.tags.forEach(
...        function(z){
...            emit( z , { count : 1 } );
...        }
...    );
...};

> // reduce function
> r = function( key , values ){
...    var total = 0;
...    for ( var i=0; i<values.length; i++ )
...        total += values[i].count;
...    return { count : total };
...};

> res = db.things.mapReduce(m, r, { out : "myoutput" } );
> res
{
"result" : "myoutput",
"timeMillis" : 12,
"counts" : {
"input" : 4,
"emit" : 6,
"output" : 3
},
"ok" : 1,
}
> db.myoutput.find()
{"_id" : "cat" , "value" : {"count" : 3}}
{"_id" : "dog" , "value" : {"count" : 2}}
{"_id" : "mouse" , "value" : {"count" : 1}}

> db.myoutput.drop()


* gist


Mongo Shell Script with Incremental Map-Reduce and Finalize

This example is a JavaScript script file. The map-reduce can be run repeatedly on different dates to incrementally augment the result. The finalize option computes averages.

The output of commands and the queries themselves are saved to variables so that they can be examined after the sample script is run via the load() command in the shell.

// work in the map-reduce example db
db = db.getSiblingDB("mrex");

// clean out from previous runs of this sample -- you wouldn't do this in production
db.session.drop();
db.session_stat.drop();

// simulate saving records that log the lengths of user sessions in seconds
db.session.save({userid:"a", ts: ISODate('2011-11-03 14:17:00'), length: 95});
db.session.save({userid:"b", ts: ISODate('2011-11-03 14:23:00'), length: 110});
db.session.save({userid:"c", ts: ISODate('2011-11-03 15:02:00'), length: 120});
db.session.save({userid:"d", ts: ISODate('2011-11-03 16:45:00'), length: 45});

db.session.save({userid:"a", ts: ISODate('2011-11-04 11:05:00'), length: 105});
db.session.save({userid:"b", ts: ISODate('2011-11-04 13:14:00'), length: 120});
db.session.save({userid:"c", ts: ISODate('2011-11-04 17:00:00'), length: 130});
db.session.save({userid:"d", ts: ISODate('2011-11-04 15:37:00'), length: 65});

/*
For each user, count up the number of sessions, and figure out the average
session length.

Note that to be able to find the average session length, we need to keep a
total of the all the session lengths, and then divide at the end.

We're also going to set this up so that we can repeat the process to get
incremental results over time.
*/

function mapf()
{
emit(this.userid,
{userid:this.userid, total_time:this.length, count:1, avg_time:0});
}

function reducef(key, values)
{
var r = {userid:key, total_time:0, count:0, avg_time:0};
values.forEach(function(v)
{
r.total_time += v.total_time;
r.count += v.count;
});
return r;
}

function finalizef(key, value)
{
if (value.count > 0)
value.avg_time = value.total_time / value.count;

return value;
}

/*
Here's the initial run.

The query isn't technically necessary, but is included here to demonstrate
how this is the same map-reduce command that will be issued later to do
incremental adjustment of the computed values.  The query is assumed to run
once a day at midnight.
*/
var mrcom1 = db.runCommand( { mapreduce:"session",
map:mapf,
reduce:reducef,
query: {ts: {$gt:ISODate('2011-11-03 00:00:00')}},
out: { reduce: "session_stat" },
finalize:finalizef
});

function saveresults(a)
{
/* append everything from the cursor to the argument array */
var statcurs = db.session_stat.find();
while(statcurs.hasNext())
a.push(statcurs.next());
}

/* save the results into mrres1 */
var mrres1 = [];
saveresults(mrres1);

/* add more session records (the next day) */
db.session.save({userid:"a", ts: ISODate('2011-11-05 14:17:00'), length: 100});
db.session.save({userid:"b", ts: ISODate('2011-11-05 14:23:00'), length: 115});
db.session.save({userid:"c", ts: ISODate('2011-11-05 15:02:00'), length: 125});
db.session.save({userid:"d", ts: ISODate('2011-11-05 16:45:00'), length: 55});

/*
Run map reduce again.

This time, the query date is the next midnight, simulating a daily job that
is used to update the values in session_stat.  This can be repeated daily
(or on other periods, with suitable adjustments to the time).
*/
var mrcom2 = db.runCommand( { mapreduce:"session",
map:mapf,
reduce:reducef,
query: {ts: {$gt:ISODate('2011-11-05 00:00:00')}},
out: { reduce: "session_stat" },
finalize:finalizef
});

/* save the results into mrres2 */
var mrres2 = [];
saveresults(mrres2);



More Examples

example mr1

Finalize example: example mr2


Note on Permanent Collections

Even when a permanent collection name is specified, a temporary collection name will be used during processing. At map/reduce completion, the temporary collection will be renamed to the permanent name atomically. Thus, one can perform a map/reduce job periodically
with the same target collection name without worrying about a temporary state of incomplete data. This is very useful when generating statistical output collections on a regular basis.


Parallelism

See info on Concurrency


Presentations

Map/reduce, geospatial indexing, and other cool features - Kristina Chodorow
at MongoSF (April 2010)


Troubleshooting

See Troubleshooting MapReduce
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: