我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?

是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?

这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。


当前回答

看看这个:改变流

2018年1月10日——3.6版

*编辑:我写了一篇关于如何做到这一点的文章https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


这是mongodb 3.6中的新功能 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

为了使用changeStreams,数据库必须是一个复制集

关于复制集的更多信息: https://docs.mongodb.com/manual/replication/

默认情况下,数据库将是“独立的”。

如何将一个独立的副本集:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


下面的示例是一个实际应用程序,说明如何使用它。 *特别针对Node。

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

有用的链接: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example http://plusnconsulting.com/post/MongoDB-Change-Streams

其他回答

自从MongoDB 3.6以来,会有一个新的通知API叫做Change Streams,你可以使用它。有关示例,请参阅这篇博客文章。例子如下:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

这里有一个工作的java示例。

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

键是这里给出的QUERY OPTIONS。

你也可以改变查找查询,如果你不需要每次都加载所有的数据。

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

你的想法听起来很像触发器。MongoDB不支持任何触发器,但是有些人已经使用一些技巧“滚出了自己的”触发器。这里的关键是oplog。

当您在Replica Set中运行MongoDB时,所有的MongoDB操作都会被记录到一个操作日志(称为oplog)。oplog基本上只是对数据所做修改的一个运行列表。Replicas Sets的功能是监听这个oplog上的更改,然后在本地应用这些更改。

这听起来熟悉吗?

我不能在这里详细说明整个过程,这是几页的文档,但你需要的工具是可用的。

首先是对oplog的一些评论 -简要描述 -本地集合的布局(其中包含oplog)

您还需要利用可尾游标。这将为您提供一种侦听更改的方法,而不是轮询更改。注意,复制使用可尾游标,因此这是一个受支持的特性。

MongoDB有所谓的上限集合和可尾游标,允许MongoDB将数据推送到侦听器。

有上限的集合本质上是一个固定大小且只允许插入的集合。下面是创建一个的样子:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB可Tailable游标(原文由Jonathan H. Wage撰写)

Ruby

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python(作者:罗伯特·斯图尔特)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (by Max)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

额外的资源:

Ruby/Node.js教程,指导您创建一个应用程序,侦听MongoDB capped collection中的插入。

一篇详细讨论可尾标的文章。

PHP、Ruby、Python和Perl使用可尾游标的示例。

实际上,与其观察输出,不如使用mongoose schema提供的中间件在插入新内容时注意到它

您可以捕捉插入新文档的事件,并在插入完成后执行一些操作