我如何(在MongoDB)结合数据从多个集合到一个集合?
我可以使用地图减少,如果是,然后如何?
我非常感谢一些例子,因为我是一个新手。
我如何(在MongoDB)结合数据从多个集合到一个集合?
我可以使用地图减少,如果是,然后如何?
我非常感谢一些例子,因为我是一个新手。
当前回答
Mongorestore有这样一个特性,即在数据库中已经存在的数据之上追加数据,所以这个行为可以用于组合两个集合:
mongodump文物 collection2.rename(文物) mongorestore
还没有尝试过,但它可能比map/reduce方法执行得更快。
其他回答
是的,你可以,拿我今天写的这个效用函数来说
function shangMergeCol() {
tcol= db.getCollection(arguments[0]);
for (var i=1; i<arguments.length; i++){
scol= db.getCollection(arguments[i]);
scol.find().forEach(
function (d) {
tcol.insert(d);
}
)
}
}
你可以传递给这个函数任意数量的集合,第一个将是目标集合。其余所有集合都是要传输到目标集合的源。
Mongorestore有这样一个特性,即在数据库中已经存在的数据之上追加数据,所以这个行为可以用于组合两个集合:
mongodump文物 collection2.rename(文物) mongorestore
还没有尝试过,但它可能比map/reduce方法执行得更快。
从Mongo 4.4开始,我们可以通过将新的$unionWith聚合阶段与$group的新的$accumulator操作符耦合在聚合管道中实现这种连接:
// > db.users.find()
// [{ user: 1, name: "x" }, { user: 2, name: "y" }]
// > db.books.find()
// [{ user: 1, book: "a" }, { user: 1, book: "b" }, { user: 2, book: "c" }]
// > db.movies.find()
// [{ user: 1, movie: "g" }, { user: 2, movie: "h" }, { user: 2, movie: "i" }]
db.users.aggregate([
{ $unionWith: "books" },
{ $unionWith: "movies" },
{ $group: {
_id: "$user",
user: {
$accumulator: {
accumulateArgs: ["$name", "$book", "$movie"],
init: function() { return { books: [], movies: [] } },
accumulate: function(user, name, book, movie) {
if (name) user.name = name;
if (book) user.books.push(book);
if (movie) user.movies.push(movie);
return user;
},
merge: function(userV1, userV2) {
if (userV2.name) userV1.name = userV2.name;
userV1.books.concat(userV2.books);
userV1.movies.concat(userV2.movies);
return userV1;
},
lang: "js"
}
}
}}
])
// { _id: 1, user: { books: ["a", "b"], movies: ["g"], name: "x" } }
// { _id: 2, user: { books: ["c"], movies: ["h", "i"], name: "y" } }
$unionWith combines records from the given collection within documents already in the aggregation pipeline. After the 2 union stages, we thus have all users, books and movies records within the pipeline. We then $group records by $user and accumulate items using the $accumulator operator allowing custom accumulations of documents as they get grouped: the fields we're interested in accumulating are defined with accumulateArgs. init defines the state that will be accumulated as we group elements. the accumulate function allows performing a custom action with a record being grouped in order to build the accumulated state. For instance, if the item being grouped has the book field defined, then we update the books part of the state. merge is used to merge two internal states. It's only used for aggregations running on sharded clusters or when the operation exceeds memory limits.
代码片段。礼貌-关于堆栈溢出的多个帖子,包括这一篇。
db.cust.drop();
db.zip.drop();
db.cust.insert({cust_id:1, zip_id: 101});
db.cust.insert({cust_id:2, zip_id: 101});
db.cust.insert({cust_id:3, zip_id: 101});
db.cust.insert({cust_id:4, zip_id: 102});
db.cust.insert({cust_id:5, zip_id: 102});
db.zip.insert({zip_id:101, zip_cd:'AAA'});
db.zip.insert({zip_id:102, zip_cd:'BBB'});
db.zip.insert({zip_id:103, zip_cd:'CCC'});
mapCust = function() {
var values = {
cust_id: this.cust_id
};
emit(this.zip_id, values);
};
mapZip = function() {
var values = {
zip_cd: this.zip_cd
};
emit(this.zip_id, values);
};
reduceCustZip = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
if ("cust_id" in value) {
if (!("cust_ids" in result)) {
result.cust_ids = [];
}
result.cust_ids.push(value);
} else {
for (field in value) {
if (value.hasOwnProperty(field) ) {
result[field] = value[field];
}
};
}
});
return result;
};
db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();
mapCZ = function() {
var that = this;
if ("cust_ids" in this.value) {
this.value.cust_ids.forEach(function(value) {
emit(value.cust_id, {
zip_id: that._id,
zip_cd: that.value.zip_cd
});
});
}
};
reduceCZ = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"});
db.cust_zip_joined.find().pretty();
var flattenMRCollection=function(dbName,collectionName) {
var collection=db.getSiblingDB(dbName)[collectionName];
var i=0;
var bulk=collection.initializeUnorderedBulkOp();
collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
print((++i));
//collection.update({_id: result._id},result.value);
bulk.find({_id: result._id}).replaceOne(result.value);
if(i%1000==0)
{
print("Executing bulk...");
bulk.execute();
bulk=collection.initializeUnorderedBulkOp();
}
});
bulk.execute();
};
flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();
非常基本的$lookup示例。
db.getCollection('users').aggregate([
{
$lookup: {
from: "userinfo",
localField: "userId",
foreignField: "userId",
as: "userInfoData"
}
},
{
$lookup: {
from: "userrole",
localField: "userId",
foreignField: "userId",
as: "userRoleData"
}
},
{ $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
{ $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
])
这里用到了
{ $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
{ $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
而不是
{ $unwind:"$userRoleData"}
{ $unwind:"$userRoleData"}
因为{$unwind:"$userRoleData"}如果在$lookup中没有找到匹配的记录,将返回空或0结果。