Celery 笔记

1.首先执行需要在项目目录下,每个worker都需要一份注册代码以及任务的代码

from __future__ import absolute_import
from celery import Celery
app = Celery('src', include=['src.task'])# 这里申明了代码在src目录下,任务脚本包括了src下的task.py

app.config_from_object('src.celeryconfig')# 额外的配置python脚本

if __name__ == '__main__':
    app.start()

2.任务脚本需要注解@app.task, 构造一些执行相关函数

from __future__ import absolute_import
import time
from src.app import app
from celery import chain, chord


@app.task
def add(x, y):
    return x,y

3. 任务的执行

以下两种都可以驱动任务的执行, 区别就是delay是简化版
add.delay()
add.apply_async()
apply_async在delay的基础上支持传递控制任务执行相关熟悉如下
arg 作用
link 该参数接收一个回调函数或者函数列表,将在主任务完成后执行
link_error 该参数接收一个回调函数或者函数列表,将在主任务执行出错后执行
countdown 接收一个数字,任务将在countdown秒后执行
eta 接收一个datetime对象,任务将在该时间执行
expires 接收一个数字或者datetime对象,任务的到期时间
这里还有一个概念就做签名Signatures,以下方式都会返回Signatures
add.subtask((2, 2), {'debug': True}, countdown=10)
add.s()
add.si()

s() 是 subtask() 的语法糖
si() 是 s(immutable=True) 的语法糖
immutable的意思是不接收上一轮次的任务结果作为本轮任务的输入

Signatures的作用是在task,如add()的基础上封装附带,自带一些参数
如subtask((2, 2), {'debug': True}, countdown=10)意思就是已经填满了
add函数所需要的x,y参数(args),并传递debug=True(kwargs),任务在启动后10秒正式执行

4. 任务的结果获取

add.delay()
add.apply_async()
都会返回一个AsyncResult的实例,通过这个实例的get()获取结果, 或者collect() 获取全链结果
通过ready() 等等函数可获取任务执行状态

5. 任务的并发,串行执行

并发执行推荐使用
group(add.s(2, 2), add.s(4, 4)).apply_async()
这里的意思是所有group内任务都会同时执行

并发有回调要求,或串行任务带额外条件判断推荐使用
chord(add.s(i, i)for i in xrange(10))(tsum.s()).get()
这里是计算1+1+2+2+...+10+10的任务,分解开来就是先执行1+1,2+2,3+3...,全部完成后调用回调tsum.s(), 汇总所有结果

串行推荐使用
chain(add.s(2, 2), add.s(4))
这里是先调用2+2, 然后2+2=4的结果会作为下一轮输入,与第二个add相加,得到4+4=8,后返回
KAI Python, 编程语言