如何检索队列中尚未处理的任务列表?


当前回答

EDIT:查看获取队列中任务列表的其他答案。

你应该看这里: 芹菜指南-检查工人

基本上是这样的:

my_app = Celery(...)

# Inspect all nodes.
i = my_app.control.inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

这取决于你想要什么

其他回答

如果你正在使用rabbitMQ,在终端中使用这个:

sudo rabbitmqctl list_queues

它将打印带有挂起任务数量的队列列表。例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

右边列的数字是队列中的任务数。在上面,芹菜队列有166个待处理的任务。

我得出的结论是,获得队列上的作业数量的最佳方法是使用rabbitmqctl,正如这里多次建议的那样。为了允许任何选择的用户使用sudo运行命令,我遵循了这里的说明(我跳过了编辑配置文件部分,因为我不介意在命令之前键入sudo)。

我还获取了jamesc的grep和cut代码片段,并将其封装在子进程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

要从后端检索任务,使用这个

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

Redis json序列化的复制粘贴解决方案:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

它与Django一起工作。只是别忘了改变你的项目。

from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False