我如何(在MongoDB)结合数据从多个集合到一个集合?
我可以使用地图减少,如果是,然后如何?
我非常感谢一些例子,因为我是一个新手。
我如何(在MongoDB)结合数据从多个集合到一个集合?
我可以使用地图减少,如果是,然后如何?
我非常感谢一些例子,因为我是一个新手。
当前回答
Mongorestore有这样一个特性,即在数据库中已经存在的数据之上追加数据,所以这个行为可以用于组合两个集合:
mongodump文物 collection2.rename(文物) mongorestore
还没有尝试过,但它可能比map/reduce方法执行得更快。
其他回答
Mongorestore有这样一个特性,即在数据库中已经存在的数据之上追加数据,所以这个行为可以用于组合两个集合:
mongodump文物 collection2.rename(文物) mongorestore
还没有尝试过,但它可能比map/reduce方法执行得更快。
你必须在你的应用层做这些。如果您正在使用ORM,它可以使用注释(或类似的东西)来提取存在于其他集合中的引用。我只使用过Morphia, @Reference注释在查询时获取被引用的实体,因此我能够避免自己在代码中这样做。
如果mongodb没有批量插入,我们循环small_collection中的所有对象,并将它们逐个插入到big_collection中:
db.small_collection.find().forEach(function(obj){
db.big_collection.insert(obj)
});
虽然不能实时执行,但可以多次运行map-reduce,通过使用MongoDB 1.8+ map/reduce中的“reduce”out选项将数据合并在一起(参见http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions)。您需要在两个集合中都有一些可以用作_id的键。
例如,假设您有一个用户集合和一个评论集合,并且您希望有一个新的集合,其中包含每个评论的一些用户统计信息。
让我们说users集合有以下字段:
_id firstName 姓 国家 性别 年龄
然后comments集合有以下字段:
_id 用户标识 评论 创建
你可以这样做:
var mapUsers, mapComments, reduce;
db.users_comments.remove();
// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup
mapUsers = function() {
var values = {
country: this.country,
gender: this.gender,
age: this.age
};
emit(this._id, values);
};
mapComments = function() {
var values = {
commentId: this._id,
comment: this.comment,
created: this.created
};
emit(this.userId, values);
};
reduce = function(k, values) {
var result = {}, commentFields = {
"commentId": '',
"comment": '',
"created": ''
};
values.forEach(function(value) {
var field;
if ("comment" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push(value);
} else if ("comments" in value) {
if (!("comments" in result)) {
result.comments = [];
}
result.comments.push.apply(result.comments, value.comments);
}
for (field in value) {
if (value.hasOwnProperty(field) && !(field in commentFields)) {
result[field] = value[field];
}
}
});
return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection
此时,您将拥有一个名为users_comments的新集合,其中包含合并的数据,您现在可以使用它了。这些简化的集合都有_id,这是你在map函数中发出的键,然后所有的值都是value键内的子对象-这些值不在这些简化文档的顶层。
这是一个比较简单的例子。您可以重复使用更多的集合,只要您想继续构建减少的集合。您还可以在该过程中对数据进行总结和聚合。随着聚合和保存现有字段的逻辑变得更加复杂,您可能会定义多个reduce函数。
您还会注意到,现在每个用户都有一个文档,数组中包含该用户的所有评论。如果我们合并的数据是一对一的关系,而不是一对多的关系,它将是平坦的,你可以简单地使用这样的reduce函数:
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
如果你想平铺users_comments集合,所以每个注释只有一个文档,另外运行这个:
var map, reduce;
map = function() {
var debug = function(value) {
var field;
for (field in value) {
print(field + ": " + value[field]);
}
};
debug(this);
var that = this;
if ("comments" in this.value) {
this.value.comments.forEach(function(value) {
emit(value.commentId, {
userId: that._id,
country: that.value.country,
age: that.value.age,
comment: value.comment,
created: value.created,
});
});
}
};
reduce = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});
这个技巧绝对不应该在飞行中执行。它适用于定期更新合并数据的cron作业或类似的工作。您可能希望在新集合上运行ensureIndex,以确保对它执行的查询能够快速运行(请记住,您的数据仍然在值键中,因此如果您要在注释创建时间上索引comments_with_demographic,那么它将是db.comments_with_demographic .ensureIndex({"value.created": 1});
代码片段。礼貌-关于堆栈溢出的多个帖子,包括这一篇。
db.cust.drop();
db.zip.drop();
db.cust.insert({cust_id:1, zip_id: 101});
db.cust.insert({cust_id:2, zip_id: 101});
db.cust.insert({cust_id:3, zip_id: 101});
db.cust.insert({cust_id:4, zip_id: 102});
db.cust.insert({cust_id:5, zip_id: 102});
db.zip.insert({zip_id:101, zip_cd:'AAA'});
db.zip.insert({zip_id:102, zip_cd:'BBB'});
db.zip.insert({zip_id:103, zip_cd:'CCC'});
mapCust = function() {
var values = {
cust_id: this.cust_id
};
emit(this.zip_id, values);
};
mapZip = function() {
var values = {
zip_cd: this.zip_cd
};
emit(this.zip_id, values);
};
reduceCustZip = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
if ("cust_id" in value) {
if (!("cust_ids" in result)) {
result.cust_ids = [];
}
result.cust_ids.push(value);
} else {
for (field in value) {
if (value.hasOwnProperty(field) ) {
result[field] = value[field];
}
};
}
});
return result;
};
db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();
mapCZ = function() {
var that = this;
if ("cust_ids" in this.value) {
this.value.cust_ids.forEach(function(value) {
emit(value.cust_id, {
zip_id: that._id,
zip_cd: that.value.zip_cd
});
});
}
};
reduceCZ = function(k, values) {
var result = {};
values.forEach(function(value) {
var field;
for (field in value) {
if (value.hasOwnProperty(field)) {
result[field] = value[field];
}
}
});
return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"});
db.cust_zip_joined.find().pretty();
var flattenMRCollection=function(dbName,collectionName) {
var collection=db.getSiblingDB(dbName)[collectionName];
var i=0;
var bulk=collection.initializeUnorderedBulkOp();
collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
print((++i));
//collection.update({_id: result._id},result.value);
bulk.find({_id: result._id}).replaceOne(result.value);
if(i%1000==0)
{
print("Executing bulk...");
bulk.execute();
bulk=collection.initializeUnorderedBulkOp();
}
});
bulk.execute();
};
flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();