我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?
是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?
这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。
我正在用MongoDB作为数据存储创建一种后台作业队列系统。在生成工人处理作业之前,我如何“监听”MongoDB集合的插入?
是否需要每隔几秒钟轮询一次,以查看与上次相比是否有任何更改,或者是否有一种方法可以让脚本等待插入的发生?
这是一个PHP项目,我正在工作,但请随意回答在Ruby或语言不可知。
当前回答
自从MongoDB 3.6以来,会有一个新的通知API叫做Change Streams,你可以使用它。有关示例,请参阅这篇博客文章。例子如下:
cursor = client.my_db.my_collection.changes([
{'$match': {
'operationType': {'$in': ['insert', 'replace']}
}},
{'$match': {
'newDocument.n': {'$gte': 1}
}}
])
# Loops forever.
for change in cursor:
print(change['newDocument'])
其他回答
MongoDB 3.6版现在包含了变更流,它本质上是OpLog之上的API,允许触发/通知类用例。
下面是一个Java示例的链接: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/
一个NodeJS的例子可能是这样的:
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
.then(function(client){
let db = client.db('MyStore')
let change_streams = db.collection('products').watch()
change_streams.on('change', function(change){
console.log(JSON.stringify(change));
});
});
你的想法听起来很像触发器。MongoDB不支持任何触发器,但是有些人已经使用一些技巧“滚出了自己的”触发器。这里的关键是oplog。
当您在Replica Set中运行MongoDB时,所有的MongoDB操作都会被记录到一个操作日志(称为oplog)。oplog基本上只是对数据所做修改的一个运行列表。Replicas Sets的功能是监听这个oplog上的更改,然后在本地应用这些更改。
这听起来熟悉吗?
我不能在这里详细说明整个过程,这是几页的文档,但你需要的工具是可用的。
首先是对oplog的一些评论 -简要描述 -本地集合的布局(其中包含oplog)
您还需要利用可尾游标。这将为您提供一种侦听更改的方法,而不是轮询更改。注意,复制使用可尾游标,因此这是一个受支持的特性。
MongoDB有所谓的上限集合和可尾游标,允许MongoDB将数据推送到侦听器。
有上限的集合本质上是一个固定大小且只允许插入的集合。下面是创建一个的样子:
db.createCollection("messages", { capped: true, size: 100000000 })
MongoDB可Tailable游标(原文由Jonathan H. Wage撰写)
Ruby
coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
if doc = cursor.next_document
puts doc
else
sleep 1
end
end
PHP
$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
if ($cursor->hasNext()) {
$doc = $cursor->getNext();
print_r($doc);
} else {
sleep(1);
}
}
Python(作者:罗伯特·斯图尔特)
from pymongo import Connection
import time
db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
try:
doc = cursor.next()
print doc
except StopIteration:
time.sleep(1)
Perl (by Max)
use 5.010;
use strict;
use warnings;
use MongoDB;
my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
if (defined(my $doc = $cursor->next))
{
say $doc;
}
else
{
sleep 1;
}
}
额外的资源:
Ruby/Node.js教程,指导您创建一个应用程序,侦听MongoDB capped collection中的插入。
一篇详细讨论可尾标的文章。
PHP、Ruby、Python和Perl使用可尾游标的示例。
或者,你可以使用标准的Mongo findupdate方法,在回调中,在回调运行时触发EventEmitter事件(在Node中)。
应用程序或体系结构中侦听此事件的任何其他部分都将收到更新通知,并将任何相关数据发送到那里。这是实现Mongo通知的一个非常简单的方法。
自从MongoDB 3.6以来,会有一个新的通知API叫做Change Streams,你可以使用它。有关示例,请参阅这篇博客文章。例子如下:
cursor = client.my_db.my_collection.changes([
{'$match': {
'operationType': {'$in': ['insert', 'replace']}
}},
{'$match': {
'newDocument.n': {'$gte': 1}
}}
])
# Loops forever.
for change in cursor:
print(change['newDocument'])