如何检索队列中尚未处理的任务列表?
当前回答
如果您控制任务的代码,那么您可以通过让任务在第一次执行时触发一个微不足道的重试来解决这个问题,然后检查inspect().reserved()。重试将任务注册到结果后端,芹菜可以看到这一点。任务必须接受self或context作为第一个参数,这样我们才能访问重试计数。
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
这个解决方案与代理无关。你不必担心你是用RabbitMQ还是Redis来存储任务。
编辑:经过测试,我发现这只是一个部分的解决方案。预留的大小受限于worker的预取设置。
其他回答
如果你正在使用rabbitMQ,在终端中使用这个:
sudo rabbitmqctl list_queues
它将打印带有挂起任务数量的队列列表。例如:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
celerey_mail_worker@torob2.celery.pidbox 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
右边列的数字是队列中的任务数。在上面,芹菜队列有166个待处理的任务。
要从后端检索任务,使用这个
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
EDIT:查看获取队列中任务列表的其他答案。
你应该看这里: 芹菜指南-检查工人
基本上是这样的:
my_app = Celery(...)
# Inspect all nodes.
i = my_app.control.inspect()
# Show the items that have an ETA or are scheduled for later processing
i.scheduled()
# Show tasks that are currently active.
i.active()
# Show tasks that have been claimed by workers
i.reserved()
这取决于你想要什么
Redis json序列化的复制粘贴解决方案:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
它与Django一起工作。只是别忘了改变你的项目。
我得出的结论是,获得队列上的作业数量的最佳方法是使用rabbitmqctl,正如这里多次建议的那样。为了允许任何选择的用户使用sudo运行命令,我遵循了这里的说明(我跳过了编辑配置文件部分,因为我不介意在命令之前键入sudo)。
我还获取了jamesc的grep和cut代码片段,并将其封装在子进程调用中。
from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录