我如何(在MongoDB)结合数据从多个集合到一个集合?

我可以使用地图减少,如果是,然后如何?

我非常感谢一些例子,因为我是一个新手。


当前回答

Mongorestore有这样一个特性,即在数据库中已经存在的数据之上追加数据,所以这个行为可以用于组合两个集合:

mongodump文物 collection2.rename(文物) mongorestore

还没有尝试过,但它可能比map/reduce方法执行得更快。

其他回答

Mongorestore有这样一个特性,即在数据库中已经存在的数据之上追加数据,所以这个行为可以用于组合两个集合:

mongodump文物 collection2.rename(文物) mongorestore

还没有尝试过,但它可能比map/reduce方法执行得更快。

你必须在你的应用层做这些。如果您正在使用ORM,它可以使用注释(或类似的东西)来提取存在于其他集合中的引用。我只使用过Morphia, @Reference注释在查询时获取被引用的实体,因此我能够避免自己在代码中这样做。

如果mongodb没有批量插入,我们循环small_collection中的所有对象,并将它们逐个插入到big_collection中:

db.small_collection.find().forEach(function(obj){ 
   db.big_collection.insert(obj)
});

虽然不能实时执行,但可以多次运行map-reduce,通过使用MongoDB 1.8+ map/reduce中的“reduce”out选项将数据合并在一起(参见http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions)。您需要在两个集合中都有一些可以用作_id的键。

例如,假设您有一个用户集合和一个评论集合,并且您希望有一个新的集合,其中包含每个评论的一些用户统计信息。

让我们说users集合有以下字段:

_id firstName 姓 国家 性别 年龄

然后comments集合有以下字段:

_id 用户标识 评论 创建

你可以这样做:

var mapUsers, mapComments, reduce;
db.users_comments.remove();

// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup

mapUsers = function() {
    var values = {
        country: this.country,
        gender: this.gender,
        age: this.age
    };
    emit(this._id, values);
};
mapComments = function() {
    var values = {
        commentId: this._id,
        comment: this.comment,
        created: this.created
    };
    emit(this.userId, values);
};
reduce = function(k, values) {
    var result = {}, commentFields = {
        "commentId": '', 
        "comment": '',
        "created": ''
    };
    values.forEach(function(value) {
        var field;
        if ("comment" in value) {
            if (!("comments" in result)) {
                result.comments = [];
            }
            result.comments.push(value);
        } else if ("comments" in value) {
            if (!("comments" in result)) {
                result.comments = [];
            }
            result.comments.push.apply(result.comments, value.comments);
        }
        for (field in value) {
            if (value.hasOwnProperty(field) && !(field in commentFields)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection

此时,您将拥有一个名为users_comments的新集合,其中包含合并的数据,您现在可以使用它了。这些简化的集合都有_id,这是你在map函数中发出的键,然后所有的值都是value键内的子对象-这些值不在这些简化文档的顶层。

这是一个比较简单的例子。您可以重复使用更多的集合,只要您想继续构建减少的集合。您还可以在该过程中对数据进行总结和聚合。随着聚合和保存现有字段的逻辑变得更加复杂,您可能会定义多个reduce函数。

您还会注意到,现在每个用户都有一个文档,数组中包含该用户的所有评论。如果我们合并的数据是一对一的关系,而不是一对多的关系,它将是平坦的,你可以简单地使用这样的reduce函数:

reduce = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};

如果你想平铺users_comments集合,所以每个注释只有一个文档,另外运行这个:

var map, reduce;
map = function() {
    var debug = function(value) {
        var field;
        for (field in value) {
            print(field + ": " + value[field]);
        }
    };
    debug(this);
    var that = this;
    if ("comments" in this.value) {
        this.value.comments.forEach(function(value) {
            emit(value.commentId, {
                userId: that._id,
                country: that.value.country,
                age: that.value.age,
                comment: value.comment,
                created: value.created,
            });
        });
    }
};
reduce = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});

这个技巧绝对不应该在飞行中执行。它适用于定期更新合并数据的cron作业或类似的工作。您可能希望在新集合上运行ensureIndex,以确保对它执行的查询能够快速运行(请记住,您的数据仍然在值键中,因此如果您要在注释创建时间上索引comments_with_demographic,那么它将是db.comments_with_demographic .ensureIndex({"value.created": 1});

代码片段。礼貌-关于堆栈溢出的多个帖子,包括这一篇。

 db.cust.drop();
 db.zip.drop();
 db.cust.insert({cust_id:1, zip_id: 101});
 db.cust.insert({cust_id:2, zip_id: 101});
 db.cust.insert({cust_id:3, zip_id: 101});
 db.cust.insert({cust_id:4, zip_id: 102});
 db.cust.insert({cust_id:5, zip_id: 102});

 db.zip.insert({zip_id:101, zip_cd:'AAA'});
 db.zip.insert({zip_id:102, zip_cd:'BBB'});
 db.zip.insert({zip_id:103, zip_cd:'CCC'});

mapCust = function() {
    var values = {
        cust_id: this.cust_id
    };
    emit(this.zip_id, values);
};

mapZip = function() {
    var values = {
    zip_cd: this.zip_cd
    };
    emit(this.zip_id, values);
};

reduceCustZip =  function(k, values) {
    var result = {};
    values.forEach(function(value) {
    var field;
        if ("cust_id" in value) {
            if (!("cust_ids" in result)) {
                result.cust_ids = [];
            }
            result.cust_ids.push(value);
        } else {
    for (field in value) {
        if (value.hasOwnProperty(field) ) {
                result[field] = value[field];
        }
         };  
       }
      });
       return result;
};


db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();


mapCZ = function() {
    var that = this;
    if ("cust_ids" in this.value) {
        this.value.cust_ids.forEach(function(value) {
            emit(value.cust_id, {
                zip_id: that._id,
                zip_cd: that.value.zip_cd
            });
        });
    }
};

reduceCZ = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"}); 
db.cust_zip_joined.find().pretty();


var flattenMRCollection=function(dbName,collectionName) {
    var collection=db.getSiblingDB(dbName)[collectionName];

    var i=0;
    var bulk=collection.initializeUnorderedBulkOp();
    collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
        print((++i));
        //collection.update({_id: result._id},result.value);

        bulk.find({_id: result._id}).replaceOne(result.value);

        if(i%1000==0)
        {
            print("Executing bulk...");
            bulk.execute();
            bulk=collection.initializeUnorderedBulkOp();
        }
    });
    bulk.execute();
};


flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();