Taste of Celery
1 初窥门径
Celery是专注实时处理和任务调度的分布式任务队列。Celery 大致有两种应用场景,一种是异步任务,一种是定时任务。
Celery的组成:
- task。异步任务或者是定时任务,我们可以定义这些任务,然后发送到 broker
- broker。broker 可以理解成消息中间件,用于获取异步或者定时任务,形成一个或多个消息队列,然后发送给 worker 处理这些消息。broker 的形式可以是 Redis,RabbitMQ 或者其他,这里我们使用 Redis 作为消息中间件
- worker。worker 是处理消息的程序,获取 broker 中的消息,然后在 worker 中执行,然后根据配置决定将处理结果发送到 backend
- result_backend。在 worker 处理完消息之后会有 return 或者没有返回结果,都会根据配置将结果发送出来,可以配置成发送到 redis 中,也可以将之存储到 database 中
- beat。主要用于调用定时任务,根据设定好的定时任务,比如每天晚上十点执行某个函数,beat 则会在相应的时间将这个 task 发送给 broker,然后 worker 获取任务进行处理

注意:异步任务是不经过 beat 处理,直接发送给 broker 的。在上面的结构中,broker 需要将相应的服务比如 redis 运行起来,而 worker 和 beat 需要在手动用程序运行,而且每次更改了定时策略之后需要重新启动 beat 和 worker 才能生效。
2 小试牛刀
先安装celery 和 redis 的依赖
pip3 install celery
pip3 install redis
创建celery项目如下
proj/__init__.py
/celery.py
/tasks1.py
/tasks2.py
其中 celery.py 内容为 celery 实例化以及一些基础配置。broker和backend分别使用redis的两个库
from celery import Celery
app = Celery('proj',
broker='redis://localhost/0',
backend='redis://localhost/1',
include=['proj.tasks1', 'proj.tasks2'])
# 设置系统对结果的留存时间600s
app.conf.update(
result_expires=600
)
if __name__ == '__main__':
app.start()
模拟两个任务,在上面的配置中已通过include引入
# tasks1.py
from .celery import app
@app.task
def add(x, y):
return x + y
# tasks2.py
from .celery import app
@app.task
def mul(x, y):
return x * y
在 proj 同级的文件夹下启动celery,说是 celery 的启动,其实是 worker 的启动(中间件broker是 redis 也需要启动)
celery -A proj worker -l INFO
在 proj 同级的文件夹下执行 python3,进入 python 的交互界面,模拟待处理任务
from proj.tasks1 import add
res = add.delay(1, 2)
print(res.id)
# 获取延时任务的结果
print(res.result)
# 也可以使用 get()
print(res.get())
# 判断函数运行是否完成
print(res.ready())
# 任务执行是否失败,返回布尔型数据
is_failed = res.failed()
# 任务执行是否成功,返回布尔型数据
is_successful = res.successful()
# 执行的任务所处的状态
# state 的值会在 PENDING,STARTED,SUCCESS,RETRY,FAILURE 这几种状态中
# 分别是 待处理中,任务已经开始,成功,重试中,失败
state = res.state
3 Celery配置方式
1. 类的方式加载配置
注意在 Config 类中的 broker_url 和 result_backend 与直接实例化 Celery 时写入的参数名称是不一样的
# celery.py
from celery import Celery
app = Celery()
class Config:
include = ['proj.tasks1', 'proj.tasks2']
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
app.config_from_object(Config)
if __name__ == '__main__':
app.start()
2. 文件的形式加载配置
在 proj 的文件夹中新建一个文件 celeryconfig.py
proj/__init__.py
/celery.py
/celeryconfig.py
/tasks1.py
/tasks2.py
编辑配置文件 celeryconfig.py
# celeryconfig.py
broker_url = 'redis://localhost/0'
result_backend = 'redis://localhost/1'
include = ['proj.tasks1', 'proj.tasks2']
引入配置文件
# celery.py
from celery import Celery
from . import celeryconfig
app = Celery()
app.config_from_object(celeryconfig)
if __name__ == '__main__':
app.start()
无论是把配置写入 Config 类中,还是写入文件 celeryconfig.py,这里的思想都是将配置集中处理,在一处管理所有的配置内容。
在这里我们引入配置的方式都是 config_from_object(),还有一个更新配置的方式是 app.conf.update()。这里要说明的是,只要运行了 config_from_object() 函数,在此之前的变量都会被覆盖失效,如果我们要新增初始 config 之外的配置,则需要在调用 config_from_object() 函数之后调用 app.conf.update()
基本配置参数介绍
# 设置时区北京时间
app.conf.update(
enable_utc=False,
timezone='Asia/Shanghai',
)
# 设置 broker 和 result_backend 的地址
app.conf.update(
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
)
# 如果是 broker 和 backend 带密码123456的配置
app.conf.update(
broker_url = 'redis://:123456@localhost:6380/0'
result_backend = 'redis://:123456@localhost:6380/1'
)
4 Task的定义
# 普通任务使用 @app.task 作为装饰器
@app.task
def add(x, y):
return x + y
# 在 Django 系统中使用 celery,可以使用 @shared_task 来修饰
from celery import shared_task
@shared_task
def add(x, y):
return x + y
# 如果是 celery 的任务和其他装饰器一起联用,需要将 celery 的装饰器放在最上面
@app.task
@decorator1
@decorator2
def add(x, y):
return x + y
# 指定task名称
@app.task(name="tasks1.add")
def add(x, y):
return x + y
# 任务重试:每隔 10s 执行一次,一共执行 5 次,5次之后还是不成功则会报错
# autoretry_for 表示的是某种报错情况下重试,我们定义的 Exception 表示任何错误都重试
# default_retry_delay 表示重试间隔时长,默认值是 3 * 60s,即三分钟,我们设置的是 10s
# retry_kwargs 是一个 dict,其中 max_retries 参数,表示的是最大重试次数,我们定为 5
@app.task(autoretry_for=(Exception, ), default_retry_delay=10, retry_kwargs={'max_retries': 5})
def div(x, y):
return x / y
# retry_backoff_max 是重试的最大的间隔时间,比如重试次数设置的很大
# retry_backoff 的间隔时间重复达到了这个值之后就不再增大了
@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_jitter=False, retry_kwargs={'max_retries': 8})
def div(x, y):
return x / y
# 忽略任务运行结果
@app.task(ignore_result=True)
def add(x, y):
return x + y
# 或者通过 app.conf 的配置来禁用结果的保存
app.conf.update(
task_ignore_result=True
)
# 或者执行单个任务的时候禁用
from proj.tasks1 import add
add.apply_async((1, 2), ignore_result=True)
# task 的调用,一个是 apply_async(),一个是 delay()。delay() 是不带参数执行的 apply_async()
# 现在开始 10s 后开始运行
add.apply_async((1, 2), countdown=10)
# 距现在60秒后开始执行,两分钟后过期
add.apply_async((1, 2), countdown=60, expires=120)
# 用 eta 参数来指定 10s 后运行
from datetime import datetime, timedelta
now = datetime.now()
add.apply_async((1, 2), eta=now + timedelta(seconds=10))
5 Django中使用Celery
在 settings.py 所在的文件夹下配置如下几个文件。注意本 django 的项目名称为 nb
nb/nb/
/__init__.py
/celery.py
/settings.py
# /celery.py
import os
from celery import Celery
# 为 celery 程序设置默认的 Django 配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nb.settings')
app = Celery('nb')
# 表示从 Django 的配置中加载 celery 的配置
# namespace='CELERY' 表示 celery 的配置必须是以 'CELERY' 为前缀
# 在 settings.py 中所有 CELERY 开头的值都可以被捕捉到并作为 celery 的配置项
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.update(
task_ignore_result=True
)
# 我们需要设置的定时任务 或者延时任务都可以在对应的 app下定义,task 的定义我们在下面介绍
# 然后通过下面的命令自动发现对应的任务
# 比如想要在 blog 下设置一个延时任务,可以创建 blog/tasks.py,然后 celery 就可以自己发现对应的任务
app.autodiscover_tasks()
# /settings.py
# 在 settings.py 中的配置如下
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_BROKER_URL = "redis://localhost/0"
CELERY_RESULT_BACKEND = "redis://localhost/1"
# /__init__.py
# 为了确保 celery 的 app 在 Django 运行的时候被加载,我们还需要在 nb/nb/init.py 中引入 celery_app
# 每个 app 下的 tasks.py 中被 @shared_task 修饰的延时或定时任务就可以使用了
from .celery import app as celery_app
__all__ = ('celery_app',)
在某个django的app下(例如blog)创建tasks.py
# nb/blog/tasks.py
from celery import shared_task
@shared_task
def test_add(x, y):
return x + y
在django项目根目录下运行worker
celery -A nb worker -l INFO
测试一下,在 django 系统的根目录下进入 django 的 shell,也就是使用 python3 manage.py shell,然后调用 task 测试
from blog.tasks import test_add
test_add.delay(1, 2)
之前都是用redis来保存结果,也可以直接使用MySQL等数据库保存结果,例如使用 Django 的 MySQL 数据库保存 task 结果可以了解一下django-celery-results库
pip3 install django-celery-results
6 Celery的消息队列
存储task的默认队列 task_default_queue
from nb.celery import app
app.conf.task_default_queue
自定义队列
from kombu import Queue
app.conf.task_queues = (
Queue('blog_tasks', ),
)
每次调用时指定队列名字
from blog.tasks import add
add.apply_async((1, 2), queue='blog_tasks')
# 调用 delay() 函数,即不指定 queue 的话,会发现 task 不能被 worker 处理
add.delay(1, 2)
如果需要在调用 task 的时候不指定队列,使用系统默认的队列,则需要额外指定一个 task_default_queue
from kombu import Queue
app.conf.task_queues = (
Queue('blog_tasks'),
Queue('default_queue'),
)
app.conf.task_default_queue = 'default_queue'
想让不同函数使用不同指定的 queue,我们可以使用 task_routes 配置项
假设django项目现在有两个app,blog 和 polls,这两个app 下都有各自的 tasks
# blog/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def minus(x, y):
return x - y
# polls/tasks.py
from celery import shared_task
@shared_task
def multi(x, y):
return x * y
我们想要实现的功能是,polls/tasks.py 下的所有的任务以及 blog/tasks.py 下的 add() 函数进入 queue_1 队列。blog/tasks.py 下的 minus() 函数进入 queue_2 队列。其他所有的 task 都走默认的队列 default_queue
from kombu import Queue
# 创建队列
app.conf.task_queues = (
Queue('queue_1'),
Queue('queue_2'),
Queue('default_queue'),
)
# 按需求分配队列
app.conf.task_routes = {
'polls.tasks.*': {
'queue': 'queue_1',
},
'blog.tasks.add': {
'queue': 'queue_1',
},
'blog.tasks.minus': {
'queue': 'queue_2',
},
}
app.conf.task_default_queue = 'default_queue'
7 Celery worker 介绍
一般来说,当我们直接启动 worker 的时候,会默认同时起好几个 worker 进程。不指定 worker 的数量,worker 的进程会默认是所在机器的 CPU 的数量。也可以通过 concurrency 参数来指定启动 worker 的进程数。
# 3个进程
celery -A nb worker --concurrency=3 -l INFO
celery -A nb worker -c 3 -l INFO
# Windows 启动方式
celery -A nb worker -l INFO --pool=solo
可以在运行 worker 的时候指定 worker 只消费特定队列的 task,这个特定队列,可以是一个,也可以是多个,用逗号分隔开。
celery -A nb worker -l INFO -Q queue_1,queue_2
# 列出所有活跃的queues
celery -A nb inspect active_queues
# 列出指定 worker 的活跃queues
celery -A nb inspect active_queues -d worker1@localhost
当然在Python代码中也能直接获取
# 获取所有的队列信息
from nb.celery import app
app.control.inspect().active_queues()
# 获取指定 worker 的队列信息
app.control.inspect(['worker1@localhost']).active_queues()
worker 的状态检测,app.control.inspect() 函数可以检测正在运行的 worker 信息
from nb.celery import app
# 获取所有节点
i = app.control.inspect()
# 输入数组参数,表示获取多个节点worker信息
i = app.control.inspect(['worker1@localhost', 'worker2@localhost'])
# 输入单个worker名,指定获取worker信息
i = app.control.inspect('worker1@localhost')
# 获取已经注册的task列表,输出 dict,worker 的名称为 key,task 列表为 value
# 输出结果为 worker 及其下的 task name
# 输出示例为 {'worker1@localhost': ['blog.tasks.add', 'blog.tasks.minus', 'polls.tasks.multi']}
i.registered()
# 输出 worker 正在执行的 task
i.active()
# 即将运行的 task
i.scheduled()
# 输出queue队列中等待的 task,例如有任务在 queue 中积压还没来得及处理
i.reserved()
# 使用 ping() 函数,得到 pong 字符串的回复表明该 worker 是存活的,类似 redis 的存活检测操作
# 输出 [{'worker1@localhost': {'ok': 'pong'}}]
app.control.ping(timeout=0.5)
app.control.ping(['worker1@localhost'])
8 定时任务和crontab配置
周期或者定时任务,比如说每天晚上零点零分需要运行一遍某个函数,或者每隔半小时运行一遍该函数,都是这种任务的范畴。celery 有一个组件叫做 beat,就是我们定时任务的调度器,所有的定时任务都由 beat 发出。
首先统一设置时间为北京时间
# settings.py
# django 时区设置
TIME_ZONE = "Asia/Shanghai"
USE_TZ = False
# celery 时区设置
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
定义两个定时任务,key为任务名称,task指向我们定义的任务,schedule定时任务的策略,args定时任务的所需参数
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'blog.tasks.add',
'schedule': 30,
'args': (16, 16),
},
'schedule_test_add': {
'task': 'blog.tasks.minus',
'schedule': crontab(minute="50", hour="23"),
},
}
运行定时任务需要启动两个服务,一个是 beat,一个是 worker。一般来说我们会先启动 worker,再启动 beat,这样 beat 有一些立即发出的任务就可以直接被 worker 接收然后运行。beat 的启动方式和 worker 启动方式一致,将 worker 改成 beat 即可
celery -A nb beat -l INFO
crontab 的配置与 Linux 服务器上的 crontab 服务类似
from celery.schedules import crontab
# 默认的每分钟执行一次
crontab()
# 每天晚上11点11分执行一次
crontab(minute=23, hour=11)
# 分别在 23点11分,23点33分,23点55分,分别执行一次,
crontab(minute="11,33,55", hour=23)
# 23点之内,每隔5分钟执行一次任务
crontab(minute="*/5", hour=23)
# 23点的 10-20分钟内每分钟执行一次
crontab(minute="10-20", hour=23)
# 对于分钟的这些操作,对于小时是同样生效
# 指定8点,12点,18点的30分执行一次吃饭任务
crontab(minute=30, hour="8,12,18")
# 每个小时执行一次
crontab(minute=0, hour="*/1")
# 当 n = 1 的时候 1可以省略,即为
crontab(minute=0, hour="*")
# 在周一,周三,周五三天的零点执行
crontab(minute=0, hour=0, day_of_week="1,3,5")
在3号,5号,7号,9号,以及每个偶数日的零点零分执行一次
crontab(minute=0, hour=0, day_of_month="3,5,7,9,*/2")
以上启动 Django 系统,worker 和 beat 服务,系统的定时任务就只有固定的这几个,写死在系统里。当然,也可以使用一些 celery 的函数来手动向系统里添加定时任务,但是有一个更好的方法来管理操作这些定时任务,那就是将这些定时任务写入到数据库中,使用增删改查操作,有兴趣可以了解一下django-celery-beat库
pip3 install django-celery-beat