使用 RabbitMQ 延迟消息插件来调度celery任务。
RabbitMQ 延迟消息插件:
RabbitMQ 延迟交换插件用于实现消息到达交易所和传递到队列之间的等待时间。每次发布消息时,都可以指定以毫秒为单位的偏移量。
我们可以声明类型为“x-delayed-message”的交换,然后使用自定义标头 x-delay 发布任务,以毫秒为单位表示任务的延迟时间。消息将在 x 延迟毫秒后传递到相应的队列。
对 rabbitMq 队列和交换感到困惑,请单击此处获取相同的惊人解释(来自 rabbitMq 的核心开发人员之一)
此方法的问题:
- Celery 不支持开箱即用的“RabbitMQ 延迟消息插件”。因此,我们需要在 rabbit-MQ 中手动安装插件。
- 此插件的最新版本针对 RabbitMQ 3.10.x。3.9.x 之前的系列不受支持。
- 如果延迟消息的总数超过特定数量 (https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72)
- 有关详细信息,请参阅 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#limitations
实施步骤
1、安装rabbitmq 延迟消息插件,创建延迟消息exchange,celery只能创建topic的交换机不会创建x-delayed-message交换。
首先,在rabbitmq中安装rabbitmq_delayed_message_exchange
插件。可以使用rabbitmq-plugins
命令启用插件。如下所示。
注意:有可能会提示没有此插件,需要去GitHub去下载插件并且上传到服务器。
# 开启 x-delayed-message类型的插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后再创建一个名为delay-tasks
的延迟交换机,可以使用以下命令创建一个x-delayed-message
类型的交换机。
rabbitmqadmin declare exchange name=delay-tasks type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
或者直接在管理页面创建
声明使用该类型的交换,然后发布带有自定义标头的消息,以毫秒为单位表示消息的延迟时间。消息将在毫秒后传递到相应的队列。x-delayed-message
x-delay
2、celery配合rabbitmq延迟消息使用,创建一个celery应用程序并且使用rabbitmq的延迟消息。
import os
from typing import NamedTuple
from celery import Celery, platforms
from django.conf import settings
from kombu import Exchange, Queue
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'devops_api.settings')
app = Celery('devops_api', broker='amqp://guest@localhost//')
# 获取Django settings中的配置。
# app.config_from_object('django.conf:settings')
# 获取Django的应用配置 文件名需要命名为tasks.py
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 可以使用root启动程序,不然使用root启动会出现报错。
# platforms.C_FORCE_ROOT = True
# 配置默认时区
# app.conf.timezone = 'Asia/Shanghai'
# 定义结构
class DelayedTaskDeliveryKit(NamedTuple):
destination_queue: Queue
destination_exchange: Exchange
routing_key: str
def get_delayed_task_delivery_kit(destination_queue_name: str) -> DelayedTaskDeliveryKit:
"""
For getting the utils using which we can schedule celery tasks.
Publish tasks with the custom header x-delay expressing a delay time
for the task in milliseconds.The message will be delivered to the
respective queues after x-delay milliseconds.
(For setting 'x-delay' header,
see: https://stackoverflow.com/questions/35449234/how-could-i-send-a-delayed-message-in-rabbitmq-using-the-rabbitmq-delayed-messag )
"""
# 要使用延迟消息功能,
# 声明一个 x 延迟消息类型的交换
destination_exchange = Exchange(
destination_queue_name,
type='x-delayed-message',
# 可以在 exchange.declare 期间传递的 x 延迟类型参数。
# 这里我们使用“直接”作为交换类型。这意味着插件
# 将具有与直接交换相同的路由行为。
arguments={"x-delayed-type": "direct"},
)
destination_queue = Queue(
destination_queue_name,
exchange=destination_exchange,
routing_key=destination_queue_name,
)
return DelayedTaskDeliveryKit(
destination_queue=destination_queue,
destination_exchange=destination_exchange,
routing_key=destination_queue_name,
)
# 更改celery的默认队列,需要将要创建的队列添加到celery配置中,因为对于celery,默认的交换机类型是direct, 对于使用rabbitmq 延迟消息插件的延迟任务,需要我们自己手动声明。
get_delayed_task_delivery: DelayedTaskDeliveryKit = get_delayed_task_delivery_kit("delay-tasks")
# 添加Queue 队列必须先添加一个默认的队列不然有些 tasks会找不到路由,
app.conf.task_default_queue = 'devops_api'
app.conf.task_queues = [
# 添加默认的 Queue
Queue('devops_api', routing_key='task.#'),
# 添加刚刚创建的 Queue
get_delayed_task_delivery.destination_queue
]
3、定义一个celery任务
from celery_app import app
@app.task()
def add(a: int, b: int):
print(f"{a} + {b} is {a+b}")
5、将任务发布到celery add
需要使用apply_async
方法将celery任务发送到延迟队列,并添加x-delay
,指定延迟值(以毫秒为单位)。
from tasks import add
if __name__ == '__main__':
add.apply_async(kwargs={"a": 1, "b": 3},, queue='delay-tasks', headers={'x-delay': 10000}
发表评论
共 0 条评论
暂无评论