Back to Blogs
python
celery

python celery

Soloman
2020-09-10

Taste of Celery

1 初窥门径

Celery是专注实时处理和任务调度的分布式任务队列。Celery 大致有两种应用场景,一种是异步任务,一种是定时任务。

Celery的组成:

  1. task。异步任务或者是定时任务,我们可以定义这些任务,然后发送到 broker
  2. broker。broker 可以理解成消息中间件,用于获取异步或者定时任务,形成一个或多个消息队列,然后发送给 worker 处理这些消息。broker 的形式可以是 Redis,RabbitMQ 或者其他,这里我们使用 Redis 作为消息中间件
  3. worker。worker 是处理消息的程序,获取 broker 中的消息,然后在 worker 中执行,然后根据配置决定将处理结果发送到 backend
  4. result_backend。在 worker 处理完消息之后会有 return 或者没有返回结果,都会根据配置将结果发送出来,可以配置成发送到 redis 中,也可以将之存储到 database 中
  5. beat。主要用于调用定时任务,根据设定好的定时任务,比如每天晚上十点执行某个函数,beat 则会在相应的时间将这个 task 发送给 broker,然后 worker 获取任务进行处理

Celery架构

注意:异步任务是不经过 beat 处理,直接发送给 broker 的。在上面的结构中,broker 需要将相应的服务比如 redis 运行起来,而 worker 和 beat 需要在手动用程序运行,而且每次更改了定时策略之后需要重新启动 beat 和 worker 才能生效。

2 小试牛刀

先安装celery 和 redis 的依赖

pip3 install celery
pip3 install redis

创建celery项目如下

proj/__init__.py
    /celery.py
    /tasks1.py
    /tasks2.py

其中 celery.py 内容为 celery 实例化以及一些基础配置。broker和backend分别使用redis的两个库

from celery import Celery

app = Celery('proj',
             broker='redis://localhost/0',
             backend='redis://localhost/1',
             include=['proj.tasks1', 'proj.tasks2'])

# 设置系统对结果的留存时间600s
app.conf.update(
    result_expires=600
)

if __name__ == '__main__':
    app.start()

模拟两个任务,在上面的配置中已通过include引入

# tasks1.py
from .celery import app

@app.task
def add(x, y):
    return x + y

# tasks2.py
from .celery import app

@app.task
def mul(x, y):
    return x * y

在 proj 同级的文件夹下启动celery,说是 celery 的启动,其实是 worker 的启动(中间件broker是 redis 也需要启动)

celery -A proj worker -l INFO

在 proj 同级的文件夹下执行 python3,进入 python 的交互界面,模拟待处理任务

from proj.tasks1 import add
res = add.delay(1, 2)
print(res.id)

# 获取延时任务的结果
print(res.result)
# 也可以使用 get() 
print(res.get())

# 判断函数运行是否完成
print(res.ready())

# 任务执行是否失败,返回布尔型数据
is_failed = res.failed()

# 任务执行是否成功,返回布尔型数据
is_successful = res.successful()

# 执行的任务所处的状态
# state 的值会在 PENDING,STARTED,SUCCESS,RETRY,FAILURE 这几种状态中
# 分别是 待处理中,任务已经开始,成功,重试中,失败
state = res.state

3 Celery配置方式

1. 类的方式加载配置

注意在 Config 类中的 broker_url 和 result_backend 与直接实例化 Celery 时写入的参数名称是不一样的

# celery.py
from celery import Celery

app = Celery()
class Config:
    include = ['proj.tasks1', 'proj.tasks2']
    broker_url = 'redis://localhost:6379/0'
    result_backend = 'redis://localhost:6379/1'
    
app.config_from_object(Config)

if __name__ == '__main__':
    app.start()

2. 文件的形式加载配置

在 proj 的文件夹中新建一个文件 celeryconfig.py

proj/__init__.py
    /celery.py
    /celeryconfig.py
    /tasks1.py
    /tasks2.py

编辑配置文件 celeryconfig.py

# celeryconfig.py
broker_url = 'redis://localhost/0'
result_backend = 'redis://localhost/1'
include = ['proj.tasks1', 'proj.tasks2']

引入配置文件

# celery.py
from celery import Celery
from . import celeryconfig


app = Celery()
app.config_from_object(celeryconfig)

if __name__ == '__main__':
    app.start()

无论是把配置写入 Config 类中,还是写入文件 celeryconfig.py,这里的思想都是将配置集中处理,在一处管理所有的配置内容。

在这里我们引入配置的方式都是 config_from_object(),还有一个更新配置的方式是 app.conf.update()。这里要说明的是,只要运行了 config_from_object() 函数,在此之前的变量都会被覆盖失效,如果我们要新增初始 config 之外的配置,则需要在调用 config_from_object() 函数之后调用 app.conf.update()

基本配置参数介绍

# 设置时区北京时间
app.conf.update(
    enable_utc=False,
    timezone='Asia/Shanghai',
)

# 设置 broker 和 result_backend 的地址
app.conf.update(
    broker_url = 'redis://localhost:6379/0'
    result_backend = 'redis://localhost:6379/1'
)

# 如果是 broker 和 backend 带密码123456的配置
app.conf.update(
    broker_url = 'redis://:123456@localhost:6380/0'
    result_backend = 'redis://:123456@localhost:6380/1'
)

4 Task的定义

# 普通任务使用 @app.task 作为装饰器
@app.task
def add(x, y):
    return x + y

# 在 Django 系统中使用 celery,可以使用 @shared_task 来修饰
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

# 如果是 celery 的任务和其他装饰器一起联用,需要将 celery 的装饰器放在最上面
@app.task
@decorator1
@decorator2
def add(x, y):
    return x + y

# 指定task名称
@app.task(name="tasks1.add")
def add(x, y):
    return x + y

# 任务重试:每隔 10s 执行一次,一共执行 5 次,5次之后还是不成功则会报错
# autoretry_for 表示的是某种报错情况下重试,我们定义的 Exception 表示任何错误都重试
# default_retry_delay 表示重试间隔时长,默认值是 3 * 60s,即三分钟,我们设置的是 10s
# retry_kwargs 是一个 dict,其中 max_retries 参数,表示的是最大重试次数,我们定为 5
@app.task(autoretry_for=(Exception, ),  default_retry_delay=10, retry_kwargs={'max_retries': 5})
def div(x, y):
    return x / y

# retry_backoff_max 是重试的最大的间隔时间,比如重试次数设置的很大
# retry_backoff 的间隔时间重复达到了这个值之后就不再增大了
@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_jitter=False, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

# 忽略任务运行结果
@app.task(ignore_result=True)
def add(x, y):
    return x + y
# 或者通过 app.conf 的配置来禁用结果的保存
app.conf.update(
    task_ignore_result=True
)
# 或者执行单个任务的时候禁用
from proj.tasks1 import add
add.apply_async((1, 2), ignore_result=True)

# task 的调用,一个是 apply_async(),一个是 delay()。delay() 是不带参数执行的 apply_async()
# 现在开始 10s 后开始运行
add.apply_async((1, 2), countdown=10)
# 距现在60秒后开始执行,两分钟后过期
add.apply_async((1, 2), countdown=60, expires=120)

# 用 eta 参数来指定 10s 后运行
from datetime import datetime, timedelta

now = datetime.now()
add.apply_async((1, 2), eta=now + timedelta(seconds=10))

5 Django中使用Celery

在 settings.py 所在的文件夹下配置如下几个文件。注意本 django 的项目名称为 nb

nb/nb/
    /__init__.py
    /celery.py
    /settings.py


# /celery.py
import os
from celery import Celery

# 为 celery 程序设置默认的 Django 配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nb.settings')

app = Celery('nb')

# 表示从 Django 的配置中加载 celery 的配置
# namespace='CELERY' 表示 celery 的配置必须是以 'CELERY' 为前缀
# 在 settings.py 中所有 CELERY 开头的值都可以被捕捉到并作为 celery 的配置项
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
    task_ignore_result=True
)

# 我们需要设置的定时任务 或者延时任务都可以在对应的 app下定义,task 的定义我们在下面介绍
# 然后通过下面的命令自动发现对应的任务
# 比如想要在 blog 下设置一个延时任务,可以创建 blog/tasks.py,然后 celery 就可以自己发现对应的任务
app.autodiscover_tasks()


# /settings.py
# 在 settings.py 中的配置如下
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_BROKER_URL = "redis://localhost/0"
CELERY_RESULT_BACKEND = "redis://localhost/1"


# /__init__.py
# 为了确保 celery 的 app 在 Django 运行的时候被加载,我们还需要在 nb/nb/init.py 中引入 celery_app
# 每个 app 下的 tasks.py 中被 @shared_task 修饰的延时或定时任务就可以使用了
from .celery import app as celery_app

__all__ = ('celery_app',)

在某个django的app下(例如blog)创建tasks.py

# nb/blog/tasks.py
from celery import shared_task

@shared_task
def test_add(x, y):
    return x + y

在django项目根目录下运行worker

celery -A nb worker -l INFO

测试一下,在 django 系统的根目录下进入 django 的 shell,也就是使用 python3 manage.py shell,然后调用 task 测试

from blog.tasks import test_add

test_add.delay(1, 2)

之前都是用redis来保存结果,也可以直接使用MySQL等数据库保存结果,例如使用 Django 的 MySQL 数据库保存 task 结果可以了解一下django-celery-results库

pip3 install django-celery-results

6 Celery的消息队列

存储task的默认队列 task_default_queue

from nb.celery import app

app.conf.task_default_queue

自定义队列

from kombu import Queue

app.conf.task_queues = (
    Queue('blog_tasks', ),
)

每次调用时指定队列名字

from blog.tasks import add

add.apply_async((1, 2), queue='blog_tasks')

# 调用 delay() 函数,即不指定 queue 的话,会发现 task 不能被 worker 处理
add.delay(1, 2)

如果需要在调用 task 的时候不指定队列,使用系统默认的队列,则需要额外指定一个 task_default_queue

from kombu import Queue

app.conf.task_queues = (
    Queue('blog_tasks'),
    Queue('default_queue'),
)
app.conf.task_default_queue = 'default_queue'

想让不同函数使用不同指定的 queue,我们可以使用 task_routes 配置项

假设django项目现在有两个app,blog 和 polls,这两个app 下都有各自的 tasks

# blog/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def minus(x, y):
    return x - y


# polls/tasks.py
from celery import shared_task

@shared_task
def multi(x, y):
    return x * y

我们想要实现的功能是,polls/tasks.py 下的所有的任务以及 blog/tasks.py 下的 add() 函数进入 queue_1 队列。blog/tasks.py 下的 minus() 函数进入 queue_2 队列。其他所有的 task 都走默认的队列 default_queue

from kombu import Queue

# 创建队列
app.conf.task_queues = (
    Queue('queue_1'),
    Queue('queue_2'),
    Queue('default_queue'),
)

# 按需求分配队列
app.conf.task_routes = {
    'polls.tasks.*': {
        'queue': 'queue_1',
    },
    'blog.tasks.add': {
        'queue': 'queue_1',
    },
    'blog.tasks.minus': {
        'queue': 'queue_2',
    },
}

app.conf.task_default_queue = 'default_queue'

7 Celery worker 介绍

一般来说,当我们直接启动 worker 的时候,会默认同时起好几个 worker 进程。不指定 worker 的数量,worker 的进程会默认是所在机器的 CPU 的数量。也可以通过 concurrency 参数来指定启动 worker 的进程数。

# 3个进程
celery -A nb worker --concurrency=3 -l INFO
celery -A nb worker -c 3 -l INFO

# Windows 启动方式
celery -A nb worker -l INFO --pool=solo

可以在运行 worker 的时候指定 worker 只消费特定队列的 task,这个特定队列,可以是一个,也可以是多个,用逗号分隔开。

celery -A nb worker -l INFO -Q queue_1,queue_2

# 列出所有活跃的queues
celery -A nb inspect active_queues

# 列出指定 worker 的活跃queues
celery -A nb inspect active_queues -d worker1@localhost

当然在Python代码中也能直接获取

# 获取所有的队列信息
from nb.celery import app
app.control.inspect().active_queues()

# 获取指定 worker 的队列信息
app.control.inspect(['worker1@localhost']).active_queues()

worker 的状态检测,app.control.inspect() 函数可以检测正在运行的 worker 信息

from nb.celery import app

# 获取所有节点
i = app.control.inspect()

# 输入数组参数,表示获取多个节点worker信息
i = app.control.inspect(['worker1@localhost', 'worker2@localhost'])

# 输入单个worker名,指定获取worker信息
i = app.control.inspect('worker1@localhost')

# 获取已经注册的task列表,输出 dict,worker 的名称为 key,task 列表为 value
# 输出结果为 worker 及其下的 task name 
# 输出示例为 {'worker1@localhost': ['blog.tasks.add', 'blog.tasks.minus', 'polls.tasks.multi']}
i.registered()

# 输出 worker 正在执行的 task
i.active()

# 即将运行的 task
i.scheduled()

# 输出queue队列中等待的 task,例如有任务在 queue 中积压还没来得及处理
i.reserved()

# 使用 ping() 函数,得到 pong 字符串的回复表明该 worker 是存活的,类似 redis 的存活检测操作
# 输出 [{'worker1@localhost': {'ok': 'pong'}}]
app.control.ping(timeout=0.5)
app.control.ping(['worker1@localhost'])

8 定时任务和crontab配置

周期或者定时任务,比如说每天晚上零点零分需要运行一遍某个函数,或者每隔半小时运行一遍该函数,都是这种任务的范畴。celery 有一个组件叫做 beat,就是我们定时任务的调度器,所有的定时任务都由 beat 发出。

首先统一设置时间为北京时间

# settings.py

# django 时区设置
TIME_ZONE = "Asia/Shanghai"
USE_TZ = False

# celery 时区设置 
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False

定义两个定时任务,key为任务名称,task指向我们定义的任务,schedule定时任务的策略,args定时任务的所需参数

from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'blog.tasks.add',
        'schedule': 30,
        'args': (16, 16),
    },
    'schedule_test_add': {
        'task': 'blog.tasks.minus',
        'schedule': crontab(minute="50", hour="23"),
    },
}

运行定时任务需要启动两个服务,一个是 beat,一个是 worker。一般来说我们会先启动 worker,再启动 beat,这样 beat 有一些立即发出的任务就可以直接被 worker 接收然后运行。beat 的启动方式和 worker 启动方式一致,将 worker 改成 beat 即可

celery -A nb beat -l INFO

crontab 的配置与 Linux 服务器上的 crontab 服务类似

from celery.schedules import crontab

# 默认的每分钟执行一次
crontab()

# 每天晚上11点11分执行一次
crontab(minute=23, hour=11)

# 分别在 23点11分,23点33分,23点55分,分别执行一次,
crontab(minute="11,33,55", hour=23)

# 23点之内,每隔5分钟执行一次任务
crontab(minute="*/5", hour=23)

# 23点的 10-20分钟内每分钟执行一次
crontab(minute="10-20", hour=23)

# 对于分钟的这些操作,对于小时是同样生效
# 指定8点,12点,18点的30分执行一次吃饭任务
crontab(minute=30, hour="8,12,18")

# 每个小时执行一次
crontab(minute=0, hour="*/1")
# 当 n = 1 的时候 1可以省略,即为
crontab(minute=0, hour="*")

# 在周一,周三,周五三天的零点执行
crontab(minute=0, hour=0, day_of_week="1,3,5")

在3号,5号,7号,9号,以及每个偶数日的零点零分执行一次
crontab(minute=0, hour=0, day_of_month="3,5,7,9,*/2")

以上启动 Django 系统,worker 和 beat 服务,系统的定时任务就只有固定的这几个,写死在系统里。当然,也可以使用一些 celery 的函数来手动向系统里添加定时任务,但是有一个更好的方法来管理操作这些定时任务,那就是将这些定时任务写入到数据库中,使用增删改查操作,有兴趣可以了解一下django-celery-beat库

pip3 install django-celery-beat