如何检索队列中尚未处理的任务列表?


当前回答

subprocess.run:

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
                                        stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

注意使用your_proj更改my_proj

其他回答

这在我的申请中很奏效:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

Active_jobs将是一个字符串列表,对应于队列中的任务。

不要忘记将CELERY_APP_INSTANCE替换为您自己的。

感谢@ashish在这里为我指出了正确的方向:https://stackoverflow.com/a/19465670/9843399

要从后端检索任务,使用这个

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False

据我所知,芹菜没有提供API来检查队列中等待的任务。这是特定于代理的。例如,如果你使用Redis作为代理,那么检查在芹菜(默认)队列中等待的任务就像这样简单:

连接到代理 在芹菜列表中列出项目(以LRANGE命令为例)

请记住,这些任务等待可用的员工来挑选。您的集群可能有一些正在运行的任务——这些任务不会在这个列表中,因为它们已经被选中了。

检索特定队列中的任务的过程是特定于代理的。

如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受self或context作为第一个参数,这样我们才能访问重试计数。

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。

编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。