如何检索队列中尚未处理的任务列表?


当前回答

要获得队列上的任务数,你可以使用flower库,下面是一个简化的例子:

from flower.utils.broker import Broker
from django.conf import settings

def get_queue_length(queue):
    broker = Broker(settings.CELERY_BROKER_URL)
    queues_result = broker.queues([queue])
    return queues_result.result()[0]['messages']

其他回答

芹菜检查模块似乎只知道从工作人员的角度来看的任务。如果你想查看队列中的消息(还没有被worker提取),我建议使用pyrabbit,它可以与rabbitmq http api接口,从队列中检索各种信息。

一个例子可以在这里找到: 使用芹菜检索队列长度(RabbitMQ, Django)

据我所知,芹菜没有提供API来检查队列中等待的任务。这是特定于代理的。例如,如果你使用Redis作为代理,那么检查在芹菜(默认)队列中等待的任务就像这样简单:

连接到代理 在芹菜列表中列出项目(以LRANGE命令为例)

请记住,这些任务等待可用的员工来挑选。您的集群可能有一些正在运行的任务——这些任务不会在这个列表中,因为它们已经被选中了。

检索特定队列中的任务的过程是特定于代理的。

如果你正在使用芹菜+Django,最简单的方法是在虚拟环境中直接从终端使用命令检查任务,或者使用芹菜的完整路径:

道格:http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

另外,如果你正在使用芹菜+RabbitMQ,你可以使用下面的命令检查队列列表:

更多信息:https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

我认为获取正在等待的任务的唯一方法是保留一个已启动任务的列表,并让任务在启动时将自己从列表中删除。

通过rabbitmqctl和list_queues,你可以了解有多少任务正在等待,而不是任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果你想要的包括正在处理的任务,但还没有完成,你可以保留一个任务列表,并检查它们的状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者让芹菜使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务不在其中。

如果你不使用优先级任务,这其实很简单,如果你使用的是Redis。获取任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先级任务在redis中使用不同的键,所以整体情况稍微复杂一些。总的来说,您需要为任务的每个优先级查询redis。在python中(以及在Flower项目中),它看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

如果你想要获得一个实际的任务,你可以使用以下方法:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

从那里,您必须反序列化返回的列表。以我为例,我可以通过以下方法来实现:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一些时间,您需要调整上面的命令以处理不同的优先级。