我如何(在MongoDB)结合数据从多个集合到一个集合?

我可以使用地图减少,如果是,然后如何?

我非常感谢一些例子,因为我是一个新手。


当前回答

MongoDB 3.2现在允许通过$lookup聚合阶段将多个集合中的数据合并为一个集合。作为一个实际的例子,假设您有关于书籍的数据,分为两个不同的集合。

第一种藏书,称为书籍,有以下资料:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe"
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe"
}

第二个集合名为books_selling_data,包含以下数据:

{
    "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
    "isbn": "978-3-16-148410-0",
    "copies_sold": 12500
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d28"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 720050
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d29"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 1000
}

要合并这两个集合,只需按以下方式使用$lookup:

db.books.aggregate([{
    $lookup: {
            from: "books_selling_data",
            localField: "isbn",
            foreignField: "isbn",
            as: "copies_sold"
        }
}])

在此聚合之后,图书集合将看起来如下所示:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
            "isbn": "978-3-16-148410-0",
            "copies_sold": 12500
        }
    ]
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 720050
        },
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 1000
        }
    ]
}

有几件事值得注意:

“from”集合(在本例中为books_selling_data)不能被分片。 “as”字段将是一个数组,如上面的例子所示。 $lookup stage上的"localField"和"foreignField"选项如果在各自的集合中不存在,出于匹配目的将被视为空($lookup文档中有一个完美的例子)。

因此,作为结论,如果您想合并两个集合,在这种情况下,有一个平坦的copies_sold字段,其中包含售出的总副本,您将不得不多做一些工作,可能会使用一个中间集合,然后$out到最终集合。

其他回答

如果mongodb没有批量插入,我们循环small_collection中的所有对象,并将它们逐个插入到big_collection中:

db.small_collection.find().forEach(function(obj){ 
   db.big_collection.insert(obj)
});

MongoDB 3.2现在允许通过$lookup聚合阶段将多个集合中的数据合并为一个集合。作为一个实际的例子,假设您有关于书籍的数据,分为两个不同的集合。

第一种藏书,称为书籍,有以下资料:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe"
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe"
}

第二个集合名为books_selling_data,包含以下数据:

{
    "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
    "isbn": "978-3-16-148410-0",
    "copies_sold": 12500
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d28"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 720050
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d29"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 1000
}

要合并这两个集合,只需按以下方式使用$lookup:

db.books.aggregate([{
    $lookup: {
            from: "books_selling_data",
            localField: "isbn",
            foreignField: "isbn",
            as: "copies_sold"
        }
}])

在此聚合之后,图书集合将看起来如下所示:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
            "isbn": "978-3-16-148410-0",
            "copies_sold": 12500
        }
    ]
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 720050
        },
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 1000
        }
    ]
}

有几件事值得注意:

“from”集合(在本例中为books_selling_data)不能被分片。 “as”字段将是一个数组,如上面的例子所示。 $lookup stage上的"localField"和"foreignField"选项如果在各自的集合中不存在,出于匹配目的将被视为空($lookup文档中有一个完美的例子)。

因此,作为结论,如果您想合并两个集合,在这种情况下,有一个平坦的copies_sold字段,其中包含售出的总副本,您将不得不多做一些工作,可能会使用一个中间集合,然后$out到最终集合。

从Mongo 4.4开始,我们可以通过将新的$unionWith聚合阶段与$group的新的$accumulator操作符耦合在聚合管道中实现这种连接:

// > db.users.find()
//   [{ user: 1, name: "x" }, { user: 2, name: "y" }]
// > db.books.find()
//   [{ user: 1, book: "a" }, { user: 1, book: "b" }, { user: 2, book: "c" }]
// > db.movies.find()
//   [{ user: 1, movie: "g" }, { user: 2, movie: "h" }, { user: 2, movie: "i" }]
db.users.aggregate([
  { $unionWith: "books"  },
  { $unionWith: "movies" },
  { $group: {
    _id: "$user",
    user: {
      $accumulator: {
        accumulateArgs: ["$name", "$book", "$movie"],
        init: function() { return { books: [], movies: [] } },
        accumulate: function(user, name, book, movie) {
          if (name) user.name = name;
          if (book) user.books.push(book);
          if (movie) user.movies.push(movie);
          return user;
        },
        merge: function(userV1, userV2) {
          if (userV2.name) userV1.name = userV2.name;
          userV1.books.concat(userV2.books);
          userV1.movies.concat(userV2.movies);
          return userV1;
        },
        lang: "js"
      }
    }
  }}
])
// { _id: 1, user: { books: ["a", "b"], movies: ["g"], name: "x" } }
// { _id: 2, user: { books: ["c"], movies: ["h", "i"], name: "y" } }

$unionWith combines records from the given collection within documents already in the aggregation pipeline. After the 2 union stages, we thus have all users, books and movies records within the pipeline. We then $group records by $user and accumulate items using the $accumulator operator allowing custom accumulations of documents as they get grouped: the fields we're interested in accumulating are defined with accumulateArgs. init defines the state that will be accumulated as we group elements. the accumulate function allows performing a custom action with a record being grouped in order to build the accumulated state. For instance, if the item being grouped has the book field defined, then we update the books part of the state. merge is used to merge two internal states. It's only used for aggregations running on sharded clusters or when the operation exceeds memory limits.

是的,你可以,拿我今天写的这个效用函数来说

function shangMergeCol() {
  tcol= db.getCollection(arguments[0]);
  for (var i=1; i<arguments.length; i++){
    scol= db.getCollection(arguments[i]);
    scol.find().forEach(
        function (d) {
            tcol.insert(d);
        }
    )
  }
}

你可以传递给这个函数任意数量的集合,第一个将是目标集合。其余所有集合都是要传输到目标集合的源。

代码片段。礼貌-关于堆栈溢出的多个帖子,包括这一篇。

 db.cust.drop();
 db.zip.drop();
 db.cust.insert({cust_id:1, zip_id: 101});
 db.cust.insert({cust_id:2, zip_id: 101});
 db.cust.insert({cust_id:3, zip_id: 101});
 db.cust.insert({cust_id:4, zip_id: 102});
 db.cust.insert({cust_id:5, zip_id: 102});

 db.zip.insert({zip_id:101, zip_cd:'AAA'});
 db.zip.insert({zip_id:102, zip_cd:'BBB'});
 db.zip.insert({zip_id:103, zip_cd:'CCC'});

mapCust = function() {
    var values = {
        cust_id: this.cust_id
    };
    emit(this.zip_id, values);
};

mapZip = function() {
    var values = {
    zip_cd: this.zip_cd
    };
    emit(this.zip_id, values);
};

reduceCustZip =  function(k, values) {
    var result = {};
    values.forEach(function(value) {
    var field;
        if ("cust_id" in value) {
            if (!("cust_ids" in result)) {
                result.cust_ids = [];
            }
            result.cust_ids.push(value);
        } else {
    for (field in value) {
        if (value.hasOwnProperty(field) ) {
                result[field] = value[field];
        }
         };  
       }
      });
       return result;
};


db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();


mapCZ = function() {
    var that = this;
    if ("cust_ids" in this.value) {
        this.value.cust_ids.forEach(function(value) {
            emit(value.cust_id, {
                zip_id: that._id,
                zip_cd: that.value.zip_cd
            });
        });
    }
};

reduceCZ = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"}); 
db.cust_zip_joined.find().pretty();


var flattenMRCollection=function(dbName,collectionName) {
    var collection=db.getSiblingDB(dbName)[collectionName];

    var i=0;
    var bulk=collection.initializeUnorderedBulkOp();
    collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
        print((++i));
        //collection.update({_id: result._id},result.value);

        bulk.find({_id: result._id}).replaceOne(result.value);

        if(i%1000==0)
        {
            print("Executing bulk...");
            bulk.execute();
            bulk=collection.initializeUnorderedBulkOp();
        }
    });
    bulk.execute();
};


flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();