在 MongoDB 上使用 Map/Reduce 進行并行 "統計" 很容易。
db.runCommand(
{
mapreduce : <collection>,
map : <mapfunction>,
reduce : <reducefunction>
[, query : <query filter object>]
[, sort : <sort the query. useful optimization>] for
[, limit : <number of objects to from collection>] return
[, out : <output-collection name>]
[, keeptemp: < | >] true false
[, finalize : <finalizefunction>]
[, scope : <object where fields go into javascript global scope >]
[, verbose : ] true
});
參數說明:
mapreduce: 要操作的目標集合。
map: 映射函數 (生成鍵值對序列,作為 reduce 函數參數)。
reduce: 統計函數。
query: 目標記錄過濾。
sort: 目標記錄排序。
limit: 限制目標記錄數量。
out: 統計結果存放集合 (不指定則使用臨時集合,在客戶端斷開后自動刪除)。
keeptemp: 是否保留臨時集合。
finalize: 最終處理函數 (對 reduce 返回結果進行最終整理后存入結果集合)。
scope: 向 map、reduce、finalize 導入外部變量。
verbose: 顯示詳細的時間統計信息。
官方文檔有幾句話很重要:
map/reduce is invoked via a database. The database creates a temporary collection to hold output of the operation. The collection is command cleaned up when the client connection closes, or when explicitly dropped. Alternatively, one can specify a permanent output collection name. map and reduce functions are written in JavaScript and execute on the server.
In sharded environments, data processing of map/reduce operations runs in parallel on all shards.
MapReduce jobs on a single mongod process are single threaded. This is due to a design limitation in current JavaScript engines. We are looking into alternatives to solve this issue, but for now if you want to parallelize your MapReduce jobs, you will need to either use sharding or do the aggregation client-side in your code.
先準備點簡單的數據練練手。
> for (var i = 0; i < 1000; i++) {
... var u = { name : "user" + i, age : i % 40 + 1, sex : i % 2 };
... db.users.insert(u);
... }
> db.users.ensureIndex({name:1})
> db.users.ensureIndex({age:1})
> db.users.count()
1000
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "name" : "user0", "age" : 1, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "name" : "user1", "age" : 2, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "name" : "user2", "age" : 3, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "name" : "user3", "age" : 4, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "name" : "user4", "age" : 5, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "name" : "user5", "age" : 6, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "name" : "user6", "age" : 7, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "name" : "user7", "age" : 8, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "name" : "user8", "age" : 9, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "name" : "user9", "age" : 10, "sex" : 1 }
1. Map
Map 函數必須調用 emit(key, value) 返回鍵值對,使用 this 訪問當前待處理的 Document。
> m = function() { emit(this.age, 1) }
function () {
emit(this.age, 1);
}
value 可以使用 JSON Object 傳遞 (支持多個屬性值)。
例如:
emit(this.age, {count:1})
2. Reduce
Reduce 函數接收的參數類似 Group 效果,將 Map 返回的鍵值序列組合成 { key, [value1, value2, value3, value...] } 傳遞給 reduce。
> r = function(key, values) {
... var x = 0;
... values.forEach(function(v) { x += v });
... return x;
... }
function (key, values) {
var x = 0;
values.forEach(function (v) {x += v;});
return x;
}
Reduce 函數對這些 values 進行 "統計" 操作,返回結果可以使用 JSON Object。
3. Result
我們不必使用 runCommand,改用 db.<collection>.mapReduce() 更方便一些。
> res = db.users.mapReduce(m, r)
{
"result" : "tmp.mr.mapreduce_1284097299_10",
"timeMillis" : 156,
"counts" : {
"input" : 1000,
"emit" : 1000,
"output" : 40
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : 25 }
{ "_id" : 2, "value" : 25 }
{ "_id" : 3, "value" : 25 }
{ "_id" : 4, "value" : 25 }
{ "_id" : 5, "value" : 25 }
{ "_id" : 6, "value" : 25 }
{ "_id" : 7, "value" : 25 }
{ "_id" : 8, "value" : 25 }
{ "_id" : 9, "value" : 25 }
{ "_id" : 10, "value" : 25 }
{ "_id" : 11, "value" : 25 }
{ "_id" : 12, "value" : 25 }
{ "_id" : 13, "value" : 25 }
{ "_id" : 14, "value" : 25 }
{ "_id" : 15, "value" : 25 }
{ "_id" : 16, "value" : 25 }
{ "_id" : 17, "value" : 25 }
{ "_id" : 18, "value" : 25 }
{ "_id" : 19, "value" : 25 }
{ "_id" : 20, "value" : 25 }
has more
mapReduce() 將結果存儲在 "tmp.mr.mapreduce_1284097299_10" 臨時集合中。
4. Finalize
利用 finalize() 我們可以對 reduce() 的結果做進一步處理。
> f = function(key, value) { return {age:key, count:value}; }
function (key, value) {
return {age:key, count:value};
}
> res = db.users.mapReduce(m, r, {finalize:f})
{
"result" : "tmp.mr.mapreduce_1284098036_26",
"timeMillis" : 158,
"counts" : {
"input" : 1000,
"emit" : 1000,
"output" : 40
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : { "age" : 1, "count" : 25 } }
{ "_id" : 2, "value" : { "age" : 2, "count" : 25 } }
{ "_id" : 3, "value" : { "age" : 3, "count" : 25 } }
{ "_id" : 4, "value" : { "age" : 4, "count" : 25 } }
{ "_id" : 5, "value" : { "age" : 5, "count" : 25 } }
{ "_id" : 6, "value" : { "age" : 6, "count" : 25 } }
{ "_id" : 7, "value" : { "age" : 7, "count" : 25 } }
{ "_id" : 8, "value" : { "age" : 8, "count" : 25 } }
{ "_id" : 9, "value" : { "age" : 9, "count" : 25 } }
{ "_id" : 10, "value" : { "age" : 10, "count" : 25 } }
{ "_id" : 11, "value" : { "age" : 11, "count" : 25 } }
{ "_id" : 12, "value" : { "age" : 12, "count" : 25 } }
{ "_id" : 13, "value" : { "age" : 13, "count" : 25 } }
{ "_id" : 14, "value" : { "age" : 14, "count" : 25 } }
{ "_id" : 15, "value" : { "age" : 15, "count" : 25 } }
{ "_id" : 16, "value" : { "age" : 16, "count" : 25 } }
{ "_id" : 17, "value" : { "age" : 17, "count" : 25 } }
{ "_id" : 18, "value" : { "age" : 18, "count" : 25 } }
{ "_id" : 19, "value" : { "age" : 19, "count" : 25 } }
{ "_id" : 20, "value" : { "age" : 20, "count" : 25 } }
has more
5. Options
我們還可以添加更多的控制細節。
> res = db.users.mapReduce(m, r, {query:{age:{$lt:10}}, sort:{name:1}, limit:5})
{
"result" : "tmp.mr.mapreduce_1284097888_25",
"timeMillis" : 20,
"counts" : {
"input" : 5,
"emit" : 5,
"output" : 3
},
"ok" : 1,
}
> db[res.result].find()
{ "_id" : 1, "value" : 2 }
{ "_id" : 2, "value" : 2 }
{ "_id" : 3, "value" : 1 }
6. Example
MapReduce 的作用不僅僅是 "統計",我們可以直接用這種在服務器端高速并發執行機制批量修改數據。
> m = function() { emit(this._id, this) }
function () {
emit(this._id, this);
}
> r = function(key, values) {
... update = function(v) {
... db.users.update({_id:key}, {$inc:{age:1}}, false, false);
... }
... values.forEach(update);
... return key;
... }
function (key, values) {
update = function (v) {db.users.update({_id:key}, {$inc:{age:1}}, false, false);};
values.forEach(update);
return key;
}
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "name" : "user0", "age" : 1, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "name" : "user1", "age" : 2, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "name" : "user2", "age" : 3, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "name" : "user3", "age" : 4, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "name" : "user4", "age" : 5, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "name" : "user5", "age" : 6, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "name" : "user6", "age" : 7, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "name" : "user7", "age" : 8, "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "name" : "user8", "age" : 9, "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "name" : "user9", "age" : 10, "sex" : 1 }
> res = db.users.mapReduce(m, r, {limit:10})
{
"result" : "tmp.mr.mapreduce_1284098486_27",
"timeMillis" : 28,
"counts" : {
"input" : 10,
"emit" : 10,
"output" : 10
},
"ok" : 1,
}
> db.users.find().limit(10)
{ "_id" : ObjectId("4c89bf5b24280691541787b8"), "age" : 2, "name" : "user0", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787b9"), "age" : 3, "name" : "user1", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787ba"), "age" : 4, "name" : "user2", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bb"), "age" : 5, "name" : "user3", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787bc"), "age" : 6, "name" : "user4", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bd"), "age" : 7, "name" : "user5", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787be"), "age" : 8, "name" : "user6", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787bf"), "age" : 9, "name" : "user7", "sex" : 1 }
{ "_id" : ObjectId("4c89bf5b24280691541787c0"), "age" : 10, "name" : "user8", "sex" : 0 }
{ "_id" : ObjectId("4c89bf5b24280691541787c1"), "age" : 11, "name" : "user9", "sex" : 1 }
7. PyMongo
最后當然得在 Python 調用一下。
In [1]: from pymongo import *
In [2]: conn = Connection()
In [3]: db = conn.test
In [4]: m = "function() { emit(this.age, 1); }"
In [5]: r = "function(key, values) { var x = 0; values.forEach(function(v){ x += v }); return x; }"
In [6]: res = db.users.map_reduce(m, r, True)
In [7]: for k in db[res["result"]].find(): print k
....:
{u'_id': 1.0, u'value': 24.0}
{u'_id': 2.0, u'value': 25.0}
{u'_id': 3.0, u'value': 25.0}
{u'_id': 4.0, u'value': 25.0}
{u'_id': 5.0, u'value': 25.0}
{u'_id': 6.0, u'value': 25.0}
{u'_id': 7.0, u'value': 25.0}
{u'_id': 8.0, u'value': 25.0}
{u'_id': 9.0, u'value': 25.0}
{u'_id': 10.0, u'value': 25.0}
{u'_id': 11.0, u'value': 26.0}
{u'_id': 12.0, u'value': 25.0}
{u'_id': 13.0, u'value': 25.0}
{u'_id': 14.0, u'value': 25.0}
{u'_id': 15.0, u'value': 25.0}
{u'_id': 16.0, u'value': 25.0}
{u'_id': 17.0, u'value': 25.0}
{u'_id': 18.0, u'value': 25.0}
{u'_id': 19.0, u'value': 25.0}
{u'_id': 20.0, u'value': 25.0}
{u'_id': 21.0, u'value': 25.0}
{u'_id': 22.0, u'value': 25.0}
{u'_id': 23.0, u'value': 25.0}
{u'_id': 24.0, u'value': 25.0}
{u'_id': 25.0, u'value': 25.0}
{u'_id': 26.0, u'value': 25.0}
{u'_id': 27.0, u'value': 25.0}
{u'_id': 28.0, u'value': 25.0}
{u'_id': 29.0, u'value': 25.0}
{u'_id': 30.0, u'value': 25.0}
{u'_id': 31.0, u'value': 25.0}
{u'_id': 32.0, u'value': 25.0}
{u'_id': 33.0, u'value': 25.0}
{u'_id': 34.0, u'value': 25.0}
{u'_id': 35.0, u'value': 25.0}
{u'_id': 36.0, u'value': 25.0}
{u'_id': 37.0, u'value': 25.0}
{u'_id': 38.0, u'value': 25.0}
{u'_id': 39.0, u'value': 25.0}
{u'_id': 40.0, u'value': 25.0}
附加參數也很容易。
In [10]: res = db.users.map_reduce(m, r, True, limit=10)
In [11]: res
Out[11]:
{u'counts': {u'emit': 10, u'input': 10, u'output': 10},
u'ok': 1.0,
u'result': u'tmp.mr.mapreduce_1284099468_31',
u'timeMillis': 20}
In [12]: for k in db[res["result"]].find(): print k
....:
{u'_id': 2.0, u'value': 1.0}
{u'_id': 3.0, u'value': 1.0}
{u'_id': 4.0, u'value': 1.0}
{u'_id': 5.0, u'value': 1.0}
{u'_id': 6.0, u'value': 1.0}
{u'_id': 7.0, u'value': 1.0}
{u'_id': 8.0, u'value': 1.0}
{u'_id': 9.0, u'value': 1.0}
{u'_id': 10.0, u'value': 1.0}
{u'_id': 11.0, u'value': 1.0}
In [13]: res = db.users.map_reduce(m, r, True, query={"age":{"$lt":20}})
In [14]: res
Out[14]:
{u'counts': {u'emit': 475, u'input': 475, u'output': 19},
u'ok': 1.0,
u'result': u'tmp.mr.mapreduce_1284099533_33',
u'timeMillis': 77}
In [15]: for k in db[res["result"]].find(): print k
....:
{u'_id': 1.0, u'value': 24.0}
{u'_id': 2.0, u'value': 25.0}
{u'_id': 3.0, u'value': 25.0}
{u'_id': 4.0, u'value': 25.0}
{u'_id': 5.0, u'value': 25.0}
{u'_id': 6.0, u'value': 25.0}
{u'_id': 7.0, u'value': 25.0}
{u'_id': 8.0, u'value': 25.0}
{u'_id': 9.0, u'value': 25.0}
{u'_id': 10.0, u'value': 25.0}
{u'_id': 11.0, u'value': 26.0}
{u'_id': 12.0, u'value': 25.0}
{u'_id': 13.0, u'value': 25.0}
{u'_id': 14.0, u'value': 25.0}
{u'_id': 15.0, u'value': 25.0}
{u'_id': 16.0, u'value': 25.0}
{u'_id': 17.0, u'value': 25.0}
{u'_id': 18.0, u'value': 25.0}
{u'_id': 19.0, u'value': 25.0}
更多細節請參照官方文檔