Celery
Celery分布式任务队列框架基础
#resource / python
#type / concept
#status / growing
Celery
[!info] related notes
基础概念
核心组件:
- Broker:消息中间件(Redis / RabbitMQ / Redis Streams 等)
- Worker:执行任务进程(并发:prefork / gevent / eventlet / solo)
- Backend:存储任务结果(Redis / DB / RPC)
- Task:函数封装,支持重试、超时、序列化
- Beat:定时任务调度器
特性:
- 支持任务重试(指数退避)
- 任务路由(queue / exchange / routing_key)
- 任务链(chain)、和弦(chord)、分组(group)、映射(map / starmap)
- 结果忽略 / 持久化可选(提高性能)
- 软超时 / 硬超时
- 信号机制(prerun / postrun / task_failure)
适用场景:发送邮件、批量数据处理、耗时第三方 API 调用、图像/音视频处理、异步通知、定时清理、工作流编排。
使用指南
安装
pip install celery[redis]
定义与启动
# app.py
from celery import Celery
celery_app = Celery(
'demo',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2'
)
@celery_app.task(bind=True, acks_late=True, autoretry_for=(Exception,), retry_backoff=2, max_retries=5)
def add(self, a: int, b: int):
return a + b
if __name__ == '__main__':
celery_app.worker_main(['worker', '-l', 'info'])
启动 Worker:
celery -A app.celery_app worker -l info -Q default -c 4
调用任务:
from app import add
res = add.delay(2, 3)
print(res.id, res.ready(), res.get(timeout=5))
任务组合
from celery import chain, group, chord
# 串行
chain(add.s(1,2), add.s(3))()
# 并行
group(add.s(i, i) for i in range(5))()
# 和弦:前面并行,回调汇总
chord(group(add.s(i, i) for i in range(5)))(add.s(10))
定时任务(Beat)
celery -A app.celery_app beat -l info
配置周期任务(celeryconfig.py):
from celery.schedules import crontab
beat_schedule = {
'daily-report': {
'task': 'app.add',
'schedule': crontab(hour=2, minute=0),
'args': (1,2)
}
}
timezone = 'Asia/Shanghai'
并发模式选择
- prefork:CPU 密集 / 通用
- gevent / eventlet:高 IO 并发(需 monkey patch)
- solo:调试
优雅关闭与幂等
- 避免任务副作用不可逆,确保可重放(幂等 key)
- 使用
acks_late=True+ 手动控制异常抛出
超时与重试
@celery_app.task(soft_time_limit=10, time_limit=20, autoretry_for=(TimeoutError,), retry_backoff=True)
def slow_job(): ...
实战经验
- 任务幂等:数据库唯一约束 / Redis SETNX 标记
- 避免传递超大参数:传引用 ID -> 后端再查询
- 任务拆分:长任务拆子任务 + 汇总(chord)
- 监控:flower / Prometheus metrics + backlog 长度
- 序列化:默认 JSON;复杂对象需转基础结构
- 限速:
task_rate_limit='10/m' - 优先级:RabbitMQ x-max-priority;Redis 支持有限(需多个队列替代)
- 压力控制:合理设置 prefetch multiplier(默认4)
- 避免死信堆积:配置任务过期(expires / visibility_timeout)
- 链路追踪:task_id 传递下游日志
常见问题:
- 任务重复执行:worker 重启 / ack 丢失 -> 幂等必做
- 结果 backend 膨胀:配置
result_expires+ 清理任务结果 - 队列消息挤压:拆分为多个 queue,按业务隔离
经验总结
Celery 适合作为“异步工作者池 + 调度编排”底座。针对超大规模与高吞吐事件流推荐 Kafka + 专用消费者服务。
控制点:幂等性 / 可观测性 / 任务粒度 / 超时重试策略。
替代比较:
- Dramatiq:更轻,类型提示友好
- RQ:简单 Redis 队列,不含复杂工作流
- Airflow:面向有向任务调度(批处理),非实时