如何检索队列中尚未处理的任务列表?


当前回答

我得出的结论是,获得队列上的作业数量的最佳方法是使用rabbitmqctl,正如这里多次建议的那样。为了允许任何选择的用户使用sudo运行命令,我遵循了这里的说明(我跳过了编辑配置文件部分,因为我不介意在命令之前键入sudo)。

我还获取了jamesc的grep和cut代码片段,并将其封装在子进程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

其他回答

我得出的结论是,获得队列上的作业数量的最佳方法是使用rabbitmqctl,正如这里多次建议的那样。为了允许任何选择的用户使用sudo运行命令,我遵循了这里的说明(我跳过了编辑配置文件部分,因为我不介意在命令之前键入sudo)。

我还获取了jamesc的grep和cut代码片段,并将其封装在子进程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

EDIT:查看获取队列中任务列表的其他答案。

你应该看这里: 芹菜指南-检查工人

基本上是这样的:

my_app = Celery(...)

# Inspect all nodes.
i = my_app.control.inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

这取决于你想要什么

如果你正在使用芹菜+Django,最简单的方法是在虚拟环境中直接从终端使用命令检查任务,或者使用芹菜的完整路径:

道格:http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

另外,如果你正在使用芹菜+RabbitMQ,你可以使用下面的命令检查队列列表:

更多信息:https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

如果你不使用优先级任务,这其实很简单,如果你使用的是Redis。获取任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先级任务在redis中使用不同的键,所以整体情况稍微复杂一些。总的来说,您需要为任务的每个优先级查询redis。在python中(以及在Flower项目中),它看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

如果你想要获得一个实际的任务,你可以使用以下方法:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

从那里,您必须反序列化返回的列表。以我为例,我可以通过以下方法来实现:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一些时间,您需要调整上面的命令以处理不同的优先级。

from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False