我如何删除所有挂起的任务,而不知道每个任务的task_id ?


从文档中可以看出:

$ celery -A proj purge

or

from proj.celery import app
app.control.purge()

(编辑:更新与当前的方法。)


对于芹菜3.0+:

$ celery purge

清除特定队列:

$ celery -Q queue_name purge

我发现芹菜清除对我更复杂的芹菜配置不起作用。我使用多个命名队列用于不同的目的:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
celery@web01.celery.pidbox                      0   1
celery@web02.celery.pidbox                      0   1
apns                                            0   1
apns@web01.celery.pidbox                        0   1
analytics                                       1   1
analytics@web01.celery.pidbox                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

第一列是队列名称,第二列是队列中等待的消息数量,第三列是该队列的侦听器数量。队列是:

celery - Queue for standard, idempotent celery tasks apns - Queue for Apple Push Notification Service tasks, not quite as idempotent analytics - Queue for long running nightly analytics *.pidbox - Queue for worker commands, such as shutdown and reset, one per worker (2 celery workers, one apns worker, one analytics worker) bcast.* - Broadcast queues, for sending messages to all workers listening to a queue (rather than just the first to grab it) celeryev.* - Celery event queues, for reporting task analytics

The analytics task is a brute force tasks that worked great on small data sets, but now takes more than 24 hours to process. Occasionally, something will go wrong and it will get stuck waiting on the database. It needs to be re-written, but until then, when it gets stuck I kill the task, empty the queue, and try again. I detect "stuckness" by looking at the message count for the analytics queue, which should be 0 (finished analytics) or 1 (waiting for last night's analytics to finish). 2 or higher is bad, and I get an email.

芹菜清除提供从一个广播队列中删除任务,并且我没有看到选择不同命名队列的选项。

以下是我的流程:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start

芹菜2。X和3.x:

例如,当使用带-Q参数的worker来定义队列时

celery worker -Q queue1,queue2,queue3

然后芹菜清除将不会工作,因为您不能传递队列参数给它。它只会删除默认队列。 解决方案是用——purge参数启动你的worker,就像这样:

celery worker -Q queue1,queue2,queue3 --purge

然而,这将运行worker。

另一个选择是使用芹菜的amqp子命令

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3

芹菜3+:

CLI:

$ celery -A proj purge

编程:

>>> from proj.celery import app
>>> app.control.purge()

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks


芹菜3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

清除命名队列:

 celery -A proj amqp queue.purge <queue name>

清除配置的队列

celery -A proj purge

I’ve purged messages, but there are still messages left in the queue? Answer: Tasks are acknowledged (removed from the queue) as soon as they are actually executed. After the worker has received a task, it will take some time until it is actually executed, especially if there are a lot of tasks already waiting for execution. Messages that are not acknowledged are held on to by the worker until it closes the connection to the broker (AMQP server). When that connection is closed (e.g. because the worker was stopped) the tasks will be re-sent by the broker to the next available worker (or the same worker when it has been restarted), so to properly purge the queue of waiting tasks you have to stop all the workers, and then purge the tasks using celery.control.purge().

因此,要清除整个队列,必须停止worker。


1. 要正确清除等待任务的队列,您必须停止所有的worker (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

$ sudo rabbitmqctl stop

或者(如果RabbitMQ/消息代理由Supervisor管理):

$ sudo supervisorctl stop all

2. ...然后清除特定队列中的任务:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. RabbitMQ开始:

$ sudo rabbitmqctl start

或者(如果RabbitMQ由Supervisor管理):

$ sudo supervisorctl start all

芹菜4 + 芹菜清除命令清除所有已配置的任务队列

celery -A *APPNAME* purge

编程:

from proj.celery import app
app.control.purge()

所有挂起的任务将被清除。 参考:celerydoc


如果你想删除所有挂起的任务,以及活动和保留的任务,以完全停止Celery,这对我来说是有效的:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

对于芹菜5.0+版本,使用RabbitMQ作为代理

我们需要先建立一个从程序到经纪人的新连接, 并将连接与要清除的队列绑定。

# proj/celery.py
from celery import Celery
app = Celery('proj')
from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
    conn.connect()
    for queue in queues:
        count = app.amqp.queues[queue].bind(conn).purge()
        print(f'Purge {queue} with {count} message(s)')

对于芹菜5.0+,从CLI执行该操作,并针对特定的队列:

—APP_NAME清除——QUEUE_NAME

如果你想像我一样在一个步骤中做,那么在最后添加-f选项来跳过确认步骤。