我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?

是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?

这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。


当前回答

实际上,与其观察输出,不如使用mongoose schema提供的中间件在插入新内容时注意到它

您可以捕捉插入新文档的事件,并在插入完成后执行一些操作

其他回答

或者,你可以使用标准的Mongo findupdate方法,在回调中,在回调运行时触发EventEmitter事件(在Node中)。

应用程序或体系结构中侦听此事件的任何其他部分都将收到更新通知,并将任何相关数据发送到那里。这是实现Mongo通知的一个非常简单的方法。

在3.6允许使用数据库之后,以下数据库触发类型:

事件驱动触发器——用于自动更新相关文档、通知下游服务、传播数据以支持混合工作负载、数据完整性和审计 计划触发器——对于计划数据检索、传播、归档和分析工作负载非常有用

登录到您的Atlas帐户,选择触发器界面,并添加新的触发器:

展开每个部分以了解更多设置或详细信息。

MongoDB有所谓的上限集合和可尾游标,允许MongoDB将数据推送到侦听器。

有上限的集合本质上是一个固定大小且只允许插入的集合。下面是创建一个的样子:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB可Tailable游标(原文由Jonathan H. Wage撰写)

Ruby

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python(作者:罗伯特·斯图尔特)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (by Max)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

额外的资源:

Ruby/Node.js教程,指导您创建一个应用程序,侦听MongoDB capped collection中的插入。

一篇详细讨论可尾标的文章。

PHP、Ruby、Python和Perl使用可尾游标的示例。

许多这些答案只会给你新的记录,而不是更新和/或非常低效

要做到这一点,唯一可靠、高效的方法是在本地db: oplog上创建一个可尾游标。rs集合得到所有的变化到MongoDB和做什么,你会。(MongoDB甚至在内部或多或少地支持复制!)

oplog包含内容的解释: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

一个Node.js库的例子,它提供了一个关于oplog可以做什么的API: https://github.com/cayasso/mongo-oplog

自从MongoDB 3.6以来,会有一个新的通知API叫做Change Streams,你可以使用它。有关示例,请参阅这篇博客文章。例子如下:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])