如何检索队列中尚未处理的任务列表?


当前回答

要获得队列上的任务数,你可以使用flower库,下面是一个简化的例子:

from flower.utils.broker import Broker
from django.conf import settings

def get_queue_length(queue):
    broker = Broker(settings.CELERY_BROKER_URL)
    queues_result = broker.queues([queue])
    return queues_result.result()[0]['messages']

其他回答

如果你正在使用rabbitMQ,在终端中使用这个:

sudo rabbitmqctl list_queues

它将打印带有挂起任务数量的队列列表。例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

右边列的数字是队列中的任务数。在上面,芹菜队列有166个待处理的任务。

我得出的结论是,获得队列上的作业数量的最佳方法是使用rabbitmqctl,正如这里多次建议的那样。为了允许任何选择的用户使用sudo运行命令,我遵循了这里的说明(我跳过了编辑配置文件部分,因为我不介意在命令之前键入sudo)。

我还获取了jamesc的grep和cut代码片段,并将其封装在子进程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受self或context作为第一个参数,这样我们才能访问重试计数。

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。

编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。

据我所知,芹菜没有提供API来检查队列中等待的任务。这是特定于代理的。例如,如果你使用Redis作为代理,那么检查在芹菜(默认)队列中等待的任务就像这样简单:

连接到代理 在芹菜列表中列出项目(以LRANGE命令为例)

请记住,这些任务等待可用的员工来挑选。您的集群可能有一些正在运行的任务——这些任务不会在这个列表中,因为它们已经被选中了。

检索特定队列中的任务的过程是特定于代理的。

我认为获取正在等待的任务的唯一方法是保留一个已启动任务的列表,并让任务在启动时将自己从列表中删除。

通过rabbitmqctl和list_queues,你可以了解有多少任务正在等待,而不是任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果你想要的包括正在处理的任务,但还没有完成,你可以保留一个任务列表,并检查它们的状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者让芹菜使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务不在其中。