我如何(在MongoDB)结合数据从多个集合到一个集合?

我可以使用地图减少,如果是,然后如何?

我非常感谢一些例子,因为我是一个新手。


当前回答

MongoDB 3.2现在允许通过$lookup聚合阶段将多个集合中的数据合并为一个集合。作为一个实际的例子,假设您有关于书籍的数据,分为两个不同的集合。

第一种藏书,称为书籍,有以下资料:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe"
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe"
}

第二个集合名为books_selling_data,包含以下数据:

{
    "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
    "isbn": "978-3-16-148410-0",
    "copies_sold": 12500
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d28"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 720050
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d29"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 1000
}

要合并这两个集合,只需按以下方式使用$lookup:

db.books.aggregate([{
    $lookup: {
            from: "books_selling_data",
            localField: "isbn",
            foreignField: "isbn",
            as: "copies_sold"
        }
}])

在此聚合之后,图书集合将看起来如下所示:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
            "isbn": "978-3-16-148410-0",
            "copies_sold": 12500
        }
    ]
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 720050
        },
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 1000
        }
    ]
}

有几件事值得注意:

“from”集合(在本例中为books_selling_data)不能被分片。 “as”字段将是一个数组,如上面的例子所示。 $lookup stage上的"localField"和"foreignField"选项如果在各自的集合中不存在,出于匹配目的将被视为空($lookup文档中有一个完美的例子)。

因此,作为结论,如果您想合并两个集合,在这种情况下,有一个平坦的copies_sold字段,其中包含售出的总副本,您将不得不多做一些工作,可能会使用一个中间集合,然后$out到最终集合。

其他回答

是的,你可以,拿我今天写的这个效用函数来说

function shangMergeCol() {
  tcol= db.getCollection(arguments[0]);
  for (var i=1; i<arguments.length; i++){
    scol= db.getCollection(arguments[i]);
    scol.find().forEach(
        function (d) {
            tcol.insert(d);
        }
    )
  }
}

你可以传递给这个函数任意数量的集合,第一个将是目标集合。其余所有集合都是要传输到目标集合的源。

代码片段。礼貌-关于堆栈溢出的多个帖子,包括这一篇。

 db.cust.drop();
 db.zip.drop();
 db.cust.insert({cust_id:1, zip_id: 101});
 db.cust.insert({cust_id:2, zip_id: 101});
 db.cust.insert({cust_id:3, zip_id: 101});
 db.cust.insert({cust_id:4, zip_id: 102});
 db.cust.insert({cust_id:5, zip_id: 102});

 db.zip.insert({zip_id:101, zip_cd:'AAA'});
 db.zip.insert({zip_id:102, zip_cd:'BBB'});
 db.zip.insert({zip_id:103, zip_cd:'CCC'});

mapCust = function() {
    var values = {
        cust_id: this.cust_id
    };
    emit(this.zip_id, values);
};

mapZip = function() {
    var values = {
    zip_cd: this.zip_cd
    };
    emit(this.zip_id, values);
};

reduceCustZip =  function(k, values) {
    var result = {};
    values.forEach(function(value) {
    var field;
        if ("cust_id" in value) {
            if (!("cust_ids" in result)) {
                result.cust_ids = [];
            }
            result.cust_ids.push(value);
        } else {
    for (field in value) {
        if (value.hasOwnProperty(field) ) {
                result[field] = value[field];
        }
         };  
       }
      });
       return result;
};


db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();


mapCZ = function() {
    var that = this;
    if ("cust_ids" in this.value) {
        this.value.cust_ids.forEach(function(value) {
            emit(value.cust_id, {
                zip_id: that._id,
                zip_cd: that.value.zip_cd
            });
        });
    }
};

reduceCZ = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"}); 
db.cust_zip_joined.find().pretty();


var flattenMRCollection=function(dbName,collectionName) {
    var collection=db.getSiblingDB(dbName)[collectionName];

    var i=0;
    var bulk=collection.initializeUnorderedBulkOp();
    collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
        print((++i));
        //collection.update({_id: result._id},result.value);

        bulk.find({_id: result._id}).replaceOne(result.value);

        if(i%1000==0)
        {
            print("Executing bulk...");
            bulk.execute();
            bulk=collection.initializeUnorderedBulkOp();
        }
    });
    bulk.execute();
};


flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();

MongoDB 3.2现在允许通过$lookup聚合阶段将多个集合中的数据合并为一个集合。作为一个实际的例子,假设您有关于书籍的数据,分为两个不同的集合。

第一种藏书,称为书籍,有以下资料:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe"
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe"
}

第二个集合名为books_selling_data,包含以下数据:

{
    "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
    "isbn": "978-3-16-148410-0",
    "copies_sold": 12500
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d28"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 720050
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d29"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 1000
}

要合并这两个集合,只需按以下方式使用$lookup:

db.books.aggregate([{
    $lookup: {
            from: "books_selling_data",
            localField: "isbn",
            foreignField: "isbn",
            as: "copies_sold"
        }
}])

在此聚合之后,图书集合将看起来如下所示:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
            "isbn": "978-3-16-148410-0",
            "copies_sold": 12500
        }
    ]
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 720050
        },
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 1000
        }
    ]
}

有几件事值得注意:

“from”集合(在本例中为books_selling_data)不能被分片。 “as”字段将是一个数组,如上面的例子所示。 $lookup stage上的"localField"和"foreignField"选项如果在各自的集合中不存在,出于匹配目的将被视为空($lookup文档中有一个完美的例子)。

因此,作为结论,如果您想合并两个集合,在这种情况下,有一个平坦的copies_sold字段,其中包含售出的总副本,您将不得不多做一些工作,可能会使用一个中间集合,然后$out到最终集合。

虽然不能实时执行,但可以多次运行map-reduce,通过使用MongoDB 1.8+ map/reduce中的“reduce”out选项将数据合并在一起(参见http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions)。您需要在两个集合中都有一些可以用作_id的键。

例如,假设您有一个用户集合和一个评论集合,并且您希望有一个新的集合,其中包含每个评论的一些用户统计信息。

让我们说users集合有以下字段:

_id firstName 姓 国家 性别 年龄

然后comments集合有以下字段:

_id 用户标识 评论 创建

你可以这样做:

var mapUsers, mapComments, reduce;
db.users_comments.remove();

// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup

mapUsers = function() {
    var values = {
        country: this.country,
        gender: this.gender,
        age: this.age
    };
    emit(this._id, values);
};
mapComments = function() {
    var values = {
        commentId: this._id,
        comment: this.comment,
        created: this.created
    };
    emit(this.userId, values);
};
reduce = function(k, values) {
    var result = {}, commentFields = {
        "commentId": '', 
        "comment": '',
        "created": ''
    };
    values.forEach(function(value) {
        var field;
        if ("comment" in value) {
            if (!("comments" in result)) {
                result.comments = [];
            }
            result.comments.push(value);
        } else if ("comments" in value) {
            if (!("comments" in result)) {
                result.comments = [];
            }
            result.comments.push.apply(result.comments, value.comments);
        }
        for (field in value) {
            if (value.hasOwnProperty(field) && !(field in commentFields)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection

此时,您将拥有一个名为users_comments的新集合,其中包含合并的数据,您现在可以使用它了。这些简化的集合都有_id,这是你在map函数中发出的键,然后所有的值都是value键内的子对象-这些值不在这些简化文档的顶层。

这是一个比较简单的例子。您可以重复使用更多的集合,只要您想继续构建减少的集合。您还可以在该过程中对数据进行总结和聚合。随着聚合和保存现有字段的逻辑变得更加复杂,您可能会定义多个reduce函数。

您还会注意到,现在每个用户都有一个文档,数组中包含该用户的所有评论。如果我们合并的数据是一对一的关系,而不是一对多的关系,它将是平坦的,你可以简单地使用这样的reduce函数:

reduce = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};

如果你想平铺users_comments集合,所以每个注释只有一个文档,另外运行这个:

var map, reduce;
map = function() {
    var debug = function(value) {
        var field;
        for (field in value) {
            print(field + ": " + value[field]);
        }
    };
    debug(this);
    var that = this;
    if ("comments" in this.value) {
        this.value.comments.forEach(function(value) {
            emit(value.commentId, {
                userId: that._id,
                country: that.value.country,
                age: that.value.age,
                comment: value.comment,
                created: value.created,
            });
        });
    }
};
reduce = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});

这个技巧绝对不应该在飞行中执行。它适用于定期更新合并数据的cron作业或类似的工作。您可能希望在新集合上运行ensureIndex,以确保对它执行的查询能够快速运行(请记住,您的数据仍然在值键中,因此如果您要在注释创建时间上索引comments_with_demographic,那么它将是db.comments_with_demographic .ensureIndex({"value.created": 1});

非常基本的$lookup示例。

db.getCollection('users').aggregate([
    {
        $lookup: {
            from: "userinfo",
            localField: "userId",
            foreignField: "userId",
            as: "userInfoData"
        }
    },
    {
        $lookup: {
            from: "userrole",
            localField: "userId",
            foreignField: "userId",
            as: "userRoleData"
        }
    },
    { $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
    { $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
])

这里用到了

 { $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }}, 
 { $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}

而不是

{ $unwind:"$userRoleData"} 
{ $unwind:"$userRoleData"}

因为{$unwind:"$userRoleData"}如果在$lookup中没有找到匹配的记录,将返回空或0结果。