# Web组件-celery

# 什么是celery

  • 处理大量消息的分布式系统,节点处理任务
  • 本质上就是一个任务调度框架
  • 本身并不具备存储任务的功能,需要搭配消息队列RabbitMQ、Redis缓存、数据库等

使用场景

  • 异步任务:一些耗时的操作可以交给celery异步执行,而不用等着程序处理完才知道结果。比如:视频转码、邮件发送、消息推送等等;
  • 定时任务:比如定时推送消息、定时爬取数据、定时统计数据等等;

# celery架构

  • producer:生产者,专门用来生产任务(task);
  • celery beat:任务调度器,调度器进程会读取配置文件的内容,周期性地将配置文件里面到期需要执行的任务发送给消息队列,说白了就是生产定时任务;
  • broker:任务队列,用于存放生产者和调度器生产的任务。一般使用消息队列或者 Redis 来存储,当然具有存储功能的数据库也是可以的。这一部分是 celery 所不提供的,需要依赖第三方。作用就是接收任务,存进队列;
  • worker:任务的执行单元,会将任务从队列中顺序取出并执行;
  • backend:用于在任务结束之后保存状态信息和结果,以便查询,一般是数据库,当然只要具备存储功能都可以作为 backend;
python 3.10.5
pip install celery==5.1.2
pip install pika     --> RabbitMQ驱动
pip install redis-py --> Redis驱动

# celery实现异步任务

  • 新建工程 celery_demo,新建文件app.py
# 文件名:app.py
import time
# 这个 Celery 就类似于 flask.Flask
# 然后实例化得到一个app
from celery import Celery

# 指定一个 name、以及 broker 的地址、backend 的地址
app = Celery(
    "achilles",
    # 这里使用我服务器上的 Redis
    # broker 用 1 号库, backend 用 2 号库
    broker="redis://127.0.0.1:6379/1",
    backend="redis://127.0.0.1:6379/2")

# 这里通过 @app.task 对函数进行装饰
# 那么之后我们便可调用 task.delay 创建一个任务
@app.task
def task(name, age):
    print("准备执行任务啦")
    time.sleep(3)
    return f"name is {name}, age is {age}"
  • 启动worker

5.0 开始这种做法就不允许了,必须写成 celery -A app worker ...,因为 -A 变成了一个全局参数,所以它不应该放在 worker 的后面,而是要放在 worker 的前面

执行命令如下:

以上就前台启动了一个 worker,正在等待从队列中获取任务,图中也显示了相应的信息。然而此时队列中并没有任务,所以我们需要在另一个文件中创建任务并发送到队列里面去。

import time
from app import task

# 从 app 导入 task, 创建任务, 但是注意: 不要直接调用 task
# 因为那样的话就在本地执行了, 我们的目的是将任务发送到队列里面去
# 然后让监听队列的 worker 从队列里面取任务并执行
# 而 task 被 @app.task 装饰, 所以它不再是原来的 task 了
# 我们需要调用它的 delay 方法


# 调用 delay 之后, 就会创建一个任务
# 然后发送到队列里面去, 也就是我们这里的 Redis
# 至于参数, 普通调用的时候怎么传, 在 delay 里面依旧怎么传
start = time.perf_counter()
task.delay("achilles", 17)
print(
    time.perf_counter() - start
)  #  0.04128062492236495

然后执行该文件,发现只用了 0.04 秒,而 task 里面明明 sleep 了 3 秒。所以说明这一步是不会阻塞的,调用 task.delay 只是创建一个任务并发送至队列。我们再看一下 worker 的输出信息:

可以看到任务已经被消费者接收并且消费了,而且调用 delay 方法是不会阻塞的,花费的那 0.04 秒是用在了其它地方,比如连接 Redis 发送任务等等。

然后我们再来看看 Redis 中存储的信息 1 号库用作 broker,负责存储任务; 2 号库用作 backend,负责存储执行结果

select 2
keys *
keys celery*
get key 

以上我们就启动了一个 worker 并成功消费了队列中的任务,并且还从 Redis 里面拿到了执行信息。当然啦,如果只能通过查询 backend 才能拿到信息的话,那 celery 就太不智能了。我们也可以直接从程序中获取。

# 直接查询任务执行信息

Redis(backend)里面存储了很多关于任务的信息,这些信息我们可以直接在程序中获取。

from app import task

res = task.delay("achilles", 17)
print(type(res))
"""
<class 'celery.result.AsyncResult'>
"""
# 直接打印,显示任务的 id
print(res)
"""
7ee6f5c8-b08e-4383-9a1c-93212a3a86c3
"""
# 获取状态, 显然此刻没有执行完
# 因此结果是PENDING, 表示等待状态
print(res.status)
"""
PENDING
"""
# 获取 id,两种方式均可
print(res.task_id)
print(res.id)
"""
7ee6f5c8-b08e-4383-9a1c-93212a3a86c3
7ee6f5c8-b08e-4383-9a1c-93212a3a86c3
"""
# 获取任务执行结束时的时间
# 任务还没有结束, 所以返回None
print(res.date_done)
"""
None
"""
# 获取任务的返回值, 可以通过 result 或者 get()
# 注意: 如果是 result, 那么任务还没有执行完的话会直接返回 None
# 如果是 get(), 那么会阻塞直到任务完成
print(res.result)
print(res.get())
"""
None
name is achilles, age is 17
"""
# 再次查看状态和执行结束时的时间
# 发现 status 变成SUCCESS
# date_done 变成了执行结束时的时间
print(res.status)
# 但显示的是 UTC 时间
print(res.date_done)
"""
SUCCESS
2022-11-03 03:24:17.758106
"""

另外我们说结果需要存储在 backend 中,如果没有配置 backend,那么获取结果的时候会报错。至于 backend,因为它是存储结果的,所以一般会保存在数据库中,因为要持久化。我这里为了方便,就还是保存在 Redis 中。

# celery.result.AsyncResult 对象

调用完任务工厂的 delay 方法之后,会创建一个任务并发送至队列,同时返回一个 AsyncResult 对象,基于此对象我们可以拿到任务执行时的所有信息。但是 AsyncResult 对象我们也可以手动构造,举个例子:

import time
# 我们不光要导入 task, 还要导入里面的 app
from app import app, task
# 导入 AsyncResult 这个类
from celery.result import AsyncResult

# 发送任务到队列当中
res = task.delay("achilles", 17)

# 传入任务的 id 和 app, 创建 AsyncResult 对象
async_result = AsyncResult(res.id, app=app)

# 此时的这个 res 和 async_result 之间是等价的
# 两者都是 AsyncResult 对象, 它们所拥有的方法也是一样的
# 下面用谁都可以
while True:
    # 等价于async_result.state == "SUCCESS"
    if async_result.successful():
        print(async_result.get())
        break
    # 等价于async_result.state == "FAILURE"
    elif async_result.failed():
        print("任务执行失败")
    elif async_result.status == "PENDING":
        print("任务正在被执行")
    elif async_result.status == "RETRY":
        print("任务执行异常正在重试")
    elif async_result.status == "REJECTED":
        print("任务被拒绝接收")
    elif async_result.status == "REVOKED":
        print("任务被取消")
    else:
        print("其它的一些状态")
    time.sleep(0.8)

"""
任务正在被执行
任务正在被执行
任务正在被执行
任务正在被执行
name is achilles, age is 17
"""

其他查看方法

from app import task

res = task.delay("achilles", 17)

# 1. ready():查看任务状态,返回布尔值。
# 任务执行完成返回 True,否则为 False
# 那么问题来了,它和 successful() 有什么区别呢?
# successful() 是在任务执行成功之后返回 True, 否则返回 False
# 而 ready() 只要是任务没有处于阻塞状态就会返回 True
# 比如执行成功、执行失败、被 worker 拒收都看做是已经 ready 了
print(res.ready())  
"""
False
"""

# 2. wait():和之前的 get 一样, 因为在源码中写了: wait = get
# 所以调用哪个都可以, 不过 wait 可能会被移除,建议直接用 get 就行
print(res.wait())
print(res.get())
"""
name is achilles, age is 17
name is achilles, age is 17
"""

# 3. trackback:如果任务抛出了一个异常,可以获取原始的回溯信息
# 执行成功就是 None
print(res.traceback)  
"""
None
"""

# celery的配置

celery 的配置不同,所表现出来的性能也不同,比如序列化的方式、连接队列的方式,单线程、多线程、多进程等等。那么 celery 都有那些配置呢?

  • broker_url:broker 的地址,就是类 Celery 里面传入的 broker 参数。
  • result_backend:存储结果地址,就是类 Celery 里面传入的 backend 参数。
  • task_serializer:任务序列化方式,支持以下几种:
    • binary:二进制序列化方式,pickle 模块默认的序列化方法;
    • json:支持多种语言,可解决多语言的问题,但通用性不高;
    • xml:标签语言,和 json 定位相似;
    • msgpack:二进制的类 json 序列化,但比 json 更小、更快;
    • yaml:表达能力更强、支持的类型更多,但是在 Python里面的性能不如 json; 根据情况,选择合适的类型。如果不是跨语言的话,直接选择 binary 即可,默认是 json。
  • result_serializer:任务执行结果序列化方式,支持的方式和任务序列化方式一致。
  • result_expires:任务结果的过期时间,单位是秒。
  • accept_content:指定任务接受的内容序列化类型(序列化),一个列表,比如:["msgpack", "binary", "json"]。
  • timezone:时区,默认是 UTC 时区。
  • enable_utc:是否开启 UTC 时区,默认为 True;如果为 False,则使用本地时区。
  • task_publish_retry:发送消息失败时是否重试,默认为 True。
  • worker_concurrency:并发的 worker 数量。
  • worker_prefetch_multiplier:每次 worker 从任务队列中获取的任务数量。
  • worker_max_tasks_per_child:每个 worker 执行多少次就会被杀掉,默认是无限的。
  • task_time_limit:单个任务执行的最大时间,单位是秒。
  • task_default_queue :设置默认的队列名称,如果一个消息不符合其它的队列规则,就会放在默认队列里面。如果什么都不设置的话,数据都会发送到默认的队列中。
  • task_queues :设置详细的队列
# 将 RabbitMQ 作为 broker 时需要使用
task_queues = {
    # 这是指定的默认队列
    "default": { 
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    # 凡是 topic 开头的 routing key
    # 都会被放到这个队列
    "topicqueue": { 
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 设置扇形交换机
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
}

celery 的配置非常多,不止我们上面说的那些,更多配置可以查看官网,写的比较详细。

https://docs.celeryq.dev/en/latest/userguide/configuration.html#general-settings (opens new window)

值得一提的是,在 5.0 之前配置项都是大写的,而从 5.0 开始配置项改成小写了。不过老的写法目前仍然支持,只是启动的时候会抛警告,并且在 6.0 的时候不再兼容老的写法。

然后下面我们就根据配置文件的方式启动 celery,当前目录结构如下:

# 发送任务时指定参数

delay 方法的 *args 和 **kwargs 就是函数的参数,它会传递给 apply_async 的 args 和 kwargs。而其它的参数就是发送任务时所设置的一些参数,我们这里重点介绍一下 apply_async 的其它参数。

  • countdown:倒计时,表示任务延迟多少秒之后再执行,参数为整型;
  • eta:任务的开始时间,datetime 类型,如果指定了 countdown,那么这个参数就不应该再指定;
  • expires:datetime 或者整型,如果到规定时间、或者未来的多少秒之内,任务还没有发送到队列被 - worker 执行,那么该任务将被丢弃;
  • shadow:重新指定任务的名称,覆盖 app.py 创建任务时日志上所指定的名字;
  • retry:任务失败之后是否重试,bool 类型;
  • retry_policy:重试所采用的策略,如果指定这个参数,那么 retry 必须要为 True。参数类型是一个字典,里面参数如下:
    • max_retries : 最大重试次数,默认为 3 次;
    • interval_start : 重试等待的时间间隔秒数,默认为 0,表示直接重试不等待;
    • interval_step : 每次重试让重试间隔增加的秒数,可以是数字或浮点数,默认为 0.2;
    • interval_max : 重试间隔最大的秒数,即通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数;
  • routing_key:自定义路由键,针对 RabbitMQ;
  • queue:指定发送到哪个队列,针对 RabbitMQ;
  • exchange:指定发送到哪个交换机,针对 RabbitMQ;
  • priority:任务队列的优先级,0-9 之间,对于 RabbitMQ 而言,0是最高级;
  • serializer:任务序列化方法,通常不设置;
  • compression:压缩方案,通常有zlib、bzip2;
  • headers:为任务添加额外的消息头;
  • link:任务成功执行后的回调方法,是一个signature对象,可以用作关联任务;
  • link_error: 任务失败后的回调方法,是一个signature对象;

举个例子:

>>> from app import add
# 使用 apply_async,要注意参数的传递
# 位置参数使用元组或者列表,关键字参数使用字典
# 因为是args和kwargs, 不是 *args和 **kwargs
>>> add.apply_async([3], {"y": 4}, 
...                 task_id="linda", 
...                 countdown=5).get()

7

# 创建任务工厂的另一种方式

# celery_demo/tasks/task1.py
from app import app

@app.task
def add(x, y):
    return x + y

@app.task
def sub(x, y):
    return x - y
    
# celery_demo/app.py    
from celery import Celery
import config

# 通过 include 指定存放任务的 py 文件
# 注意它和 worker 启动路径之间的关系
# 我们是在 celery_demo 目录下启动的 worker
# 所以应该写成 "tasks.task1"
# 如果是在 celery_demo 的上一级目录启动 worker
# 那么这里就要指定为 "celery_demo.tasks.task1"
# 当然启动时的 -A app 也要换成 -A celery_demo.app
app = Celery(__name__, include=["tasks.task1"])
# 如果还有其它文件,比如 task2.py, task3.py
# 那么就把 "tasks.task2", "tasks.task3" 加进去
app.config_from_object(config)

所以还是要将 app.py 中的 app 导入到 task1.py 中直接创建任务工厂,但如果再将 task1.py 中的任务工厂导入到 app.py 中就会发生循环导入。于是 celery 提供了一个 include 参数,可以在创建 app 的时候自动将里面所有的任务工厂加载进来,然后启动并告诉 worker。

# Task对象

我们之前通过对一个函数使用 @app.task 即可将其变成一个任务工厂,而这个任务工厂就是一个 Task 实例对象。而我们在使用 @app.task 的时候,其实是可以加上很多的参数的,常用参数如下:

  • name:默认的任务名是一个uuid,我们可以通过 name 参数指定任务名,当然这个 name 就是 - apply_async 的参数 name。如果在 apply_async 中指定了,那么以 apply_async 指定的为准;
  • bind:一个 bool 值,表示是否和任务工厂进行绑定。如果绑定,任务工厂会作为参数传递到方法中;
  • base:定义任务的基类,用于定义回调函数,当任务到达某个状态时触发不同的回调函数,默认是 Task,所以我们一般会自己写一个类然后继承 Task;
  • default_retry_delay:设置该任务重试的延迟机制,当任务执行失败后,会自动重试,单位是秒,默认是3分钟;
  • serializer:指定序列化的方法;
# celery_demo/tasks/task1.py
from app import app

@app.task(name="你好")
def add(x, y):
    return x + y

@app.task(name="我不好", bind=True)
def sub(self, x, y):
    """
    如果 bind=True,则需要多指定一个 self
    这个 self 就是对应的任务工厂
    """
    # self.request 是一个 celery.task.Context 对象
    # 获取它的属性字典,即可拿到该任务的所有属性
    print(self.request.__dict__)
    return x - y

最后再来说一说 @app.task 里面的 base 参数。

# celery_demo/tasks/task1.py
from celery import app
from app import Task

class MyTask(Task):
    """
    自定义一个类,继承自celery.Task
    exc: 失败时的错误的类型;
    task_id: 任务的id;
    args: 任务函数的位置参数;
    kwargs: 任务函数的关键字参数;
    einfo: 失败时的异常详细信息;
    retval: 任务成功执行的返回值;
    """
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """任务失败时执行"""

    def on_success(self, retval, task_id, args, kwargs):
        """任务成功时执行"""
        print("任务执行成功")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """任务重试时执行"""


# 使用 @app.task 的时候,指定 base 即可
# 然后任务在执行的时候,会触发 MyTask 里面的回调函数
@app.task(name="地灵殿", base=MyTask)
def add(x, y):
    print("加法计算")
    return x + y

# 自定义任务流

# celery_demo/tasks/task1.py
from app import app

@app.task()
def add(x, y):
    print("加法计算")
    return x + y

@app.task()
def sub(x, y):
    print("减法计算")
    return x - y

@app.task()
def mul(x, y):
    print("乘法计算")
    return x * y

@app.task()
def div(x, y):
    print("除法计算")
    return x // y

然后来导入它们,创建任务,并将这些任务划分到一个组中。

>>> from tasks.task1 import add, sub, mul, div
>>> from celery import group
# 调用 signature 方法,得到 signature 对象
# 此时 t1.delay() 和 add.delay(2, 3) 是等价的
>>> t1 = add.signature(args=(2, 3))
>>> t2 = sub.signature(args=(2, 3))
>>> t3 = mul.signature(args=(2, 3))
>>> t4 = div.signature(args=(4, 2))
# 但是变成 signature 对象之后,
# 我们可以将其放到一个组里面
>>> gp = group(t1, t2, t3, t4)
# 执行组任务
# 返回 celery.result.GroupResult 对象
>>> res = gp()
# 每个组也有一个唯一 id
>>> print("组id:", res.id)id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a

# 调用 get 方法也会阻塞,知道组里面任务全部完成
>>> print("组结果:", res.get())
组结果: [5, -1, 6, 2]

# celery 实现定时任务

回顾一下 celery 的架构,里面除了 producer 之外还有一个 celery beat,也就是调度器。我们调用任务工厂的 delay 方法,手动将任务发送到队列,此时就相当于 producer。如果是设置定时任务,那么会由调度器自动将任务添加到队列。

我们在 tasks 目录里面再创建一个 period_task1.py 文件。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
from .task1 import task1, task2, task3, task4


@app.on_after_configure.connect
def period_task(sender, **kwargs):
    # 第一个参数为 schedule,可以是 float,或者 crontab
    # crontab 后面会说,第二个参数是任务,第三个参数是名字
    sender.add_periodic_task(10.0, task1.s(),
                             name="每10秒执行一次")
    sender.add_periodic_task(15.0, task2.s("task2"),
                             name="每15秒执行一次")
    sender.add_periodic_task(20.0, task3.s(),
                             name="每20秒执行一次")
    sender.add_periodic_task(
        crontab(hour=18, minute=5, day_of_week=0),
        task4.s("task4"),
        name="每个星期天的18:05运行一次"
    )

# celery_demo/tasks/task1.py 
from app import app

@app.task
def task1():
    print("我是task1")
    return "task1你好"

@app.task
def task2(name):
    print(f"我是{name}")
    return f"{name}你好"

@app.task
def task3():
    print("我是task3")
    return "task3你好"

@app.task
def task4(name):
    print(f"我是{name}")
    return f"{name}你好"

既然使用了定时任务,那么一定要设置时区。

# celery_demo/config.py
broker_url = "redis://127.0.0.1:6379/1"
result_backend = "redis://127.0.0.1:6379/2"
# 之前说过,celery 默认使用 utc 时间
# 其实我们是可以手动禁用的,然后手动指定时区
enable_utc = False
timezone = "Asia/Shanghai"

最后是修改 app.py,将定时任务加进去。

from celery import Celery
import config

app = Celery(
    __name__,
    include=["tasks.task1", "tasks.period_task1"])
app.config_from_object(config)

启动定时任务

celery -A tasks.period_task1 beat -l info

此时我们就成功实现了定时任务,并且是通过定义函数、打上装饰器的方式实现的。除此之外,我们还可以通过配置的方式实现。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app

# 此时也不需显式导入任务工厂了
# 直接以字符串的方式指定即可
app.conf.beat_schedule = {
    # 参数通过 args 和 kwargs 指定
    "每10秒执行一次": {"task": "tasks.task1.task1",
                 "schedule": 10.0},
    "每15秒执行一次": {"task": "tasks.task1.task2",
                 "schedule": 15.0,
                 "args": ("task2",)},
    "每20秒执行一次": {"task": "tasks.task1.task3",
                 "schedule": 20.0},
    "每个星期天的18:05运行一次": {"task": "tasks.task1.task4",
                        "schedule": crontab(hour=18,
                                            minute=5,
                                            day_of_week=0),
                        "args": ("task4",)}
}

# crontab 参数

  • minute:0-59,表示第几分钟触发,* 表示每分钟触发一次;

  • hour:0-23,表示第几个小时触发,* 表示每小时都会触发。比如 minute=2, hour=*,表示每小时的第二分钟触发一次;

  • day_of_week:0-6,表示一周的第几天触发,0 是星期天,1-6 分别是星期一到星期六,不习惯的话也可以用字符串 mon,tue,wed,thu,fri,sat,sun 表示;

  • month_of_year:当前年份的第几个月; 以上就是这些参数的含义,并且参数接收的值还可以是一些特殊的通配符:

  • :所有,比如 minute=,表示每分钟触发;

  • */a:所有可被 a 整除的时候触发;

  • a-b:a 到 b范围内触发;

  • a-b/c:范围 a-b 且能够被 c 整除的时候触发;

  • 2,10,40:比如 minute=2,10,40 表示第 2、10、40 分钟的时候触发;

上次更新: 11/3/2022, 1:04:57 PM