- Aggregation >
- Map-Reduce >
- Map-Reduce to Aggregation Pipeline
Map-Reduce to Aggregation Pipeline¶
Starting in version 4.4, MongoDB adds the $accumulator and
$function aggregation operators. These operators provide
users the ability to define custom aggregation expressions. Using these
operations, the map-reduce expressions can be approximately re-written
as in the following table.
Note
Various map-reduce expressions can be rewritten using
aggregation pipeline operators, such as $group,
$merge, etc., without requiring custom functions.
For examples, see Map-Reduce Examples.
Map-Reduce to Aggregation Pipeline Translation Table¶
The table is only an approximate translation. For instance, the table
shows an approximate translation of mapFunction using the
$project.
However, the
mapFunctionlogic may require additional stages, such as if the logic includes iteration over an array:Then, the aggregation pipeline includes an
$unwindand a$project:The
emitsfield in$projectmay be named something else. For visual comparison, the field nameemitswas chosen.
| Map-Reduce | Aggregation Pipeline |
|---|---|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: <collection>
}
)
|
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }}
} },
{ $out: <collection> }
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { replace: <collection>, db:<db> }
}
)
|
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }}
} },
{ $out: { db: <db>, coll: <collection> } }
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { merge: <collection>, db: <db> }
}
)
|
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }}
} },
{ $merge: {
into: { db: <db>, coll: <collection>},
on: “_id”
whenMatched: “replace”,
whenNotMatched: “insert”
} },
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { reduce: <collection>, db: <db> }
}
)
|
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }}
} },
{ $merge: {
into: { db: <db>, coll: <collection> },
on: “_id”
whenMatched: [
{ $project: {
value: { $function: {
body: <reduceFunction>,
args: [
“$_id”,
[ “$value”, “$$new.value” ]
],
lang: “js”
} }
} }
]
whenNotMatched: “insert”
} },
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { inline: 1 }
}
)
|
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }}
} }
] )
|
Examples¶
Various map-reduce expressions can be rewritten using aggregation
pipeline operators, such as
$group, $merge, etc., without requiring custom
functions. However, for illustrative purposes, the following examples
provide both alternatives.
Example 1¶
The following map-reduce operation on the orders collection groups
by the cust_id, and calculates the sum of the price for each
cust_id:
Alternative 1: (Recommended) You can rewrite the operation into an aggregation pipeline without translating the map-reduce function to equivalent pipeline stages:
Alternative 2: (For illustrative purposes only) The
following aggregation pipeline provides a translation of the various
map-reduce functions, using $accumulator to define custom
functions:
First, the
$projectstage outputs documents with anemitfield. Theemitfield is a document with the fields:keythat contains thecust_idvalue for the documentvaluethat contains thepricevalue for the document
Then, the
$groupuses the$accumulatoroperator to add the emitted values:Finally, the
$outwrites the output to the collectionagg_alternative_2. Alternatively, you could use$mergeinstead of$out.
Example 2¶
The following map-reduce operation on the orders collection
groups by the item.sku field and calculates the number of
orders and the total quantity ordered for each sku. The operation
then calculates the average quantity per order for each sku value
and merges the results into the output collection.
Alternative 1: (Recommended) You can rewrite the operation into an aggregation pipeline without translating the map-reduce function to equivalent pipeline stages:
Alternative 2: (For illustrative purposes only) The following
aggregation pipeline provides a translation of the various
map-reduce functions, using $accumulator to define custom
functions:
The
$matchstage selects only those documents withord_dategreater than or equal tonew Date("2020-03-01").The
$unwindsstage breaks down the document by theitemsarray field to output a document for each array element. For example:The
$projectstage outputs documents with anemitfield. Theemitfield is a document with the fields:keythat contains theitems.skuvaluevaluethat contains a document with theqtyvalue and acountvalue
The
$groupuses the$accumulatoroperator to add the emittedcountandqtyand calculate theavgfield:Finally, the
$mergewrites the output to the collectionagg_alternative_4. If an existing document has the same key_idas the new result, the operation overwrites the existing document. If there is no existing document with the same key, the operation inserts the document.
See also