Celery

Python 分布式任务队列 - 异步执行、定时调度、重试机制、实时处理

TL;DR

是什么:Python 的分布式任务队列,支持实时处理。

为什么用:异步任务执行、调度、重试、多代理支持、监控。

Quick Start

安装

pip install celery redis

创建任务tasks.py):

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

启动 worker

celery -A tasks worker --loglevel=info

调用任务

from tasks import add

# 异步调用
result = add.delay(4, 4)
print(result.get())  # 8

# 或使用 apply_async
result = add.apply_async((4, 4), countdown=10)  # 10 秒后执行

Cheatsheet

命令描述
celery -A app worker启动 worker
celery -A app beat启动调度器
celery -A app flower启动监控 UI
celery -A app inspect active显示活动任务
celery -A app purge清除所有消息
celery -A app status显示 worker 状态

Gotchas

项目结构

# proj/celery.py
from celery import Celery

app = Celery('proj')
app.config_from_object('proj.celeryconfig')
app.autodiscover_tasks(['proj.tasks'])

# proj/celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True

任务选项

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_data(self, data):
    try:
        # 处理数据
        return result
    except Exception as exc:
        raise self.retry(exc=exc)

@app.task(rate_limit='10/m')  # 每分钟 10 个
def limited_task():
    pass

@app.task(time_limit=300)  # 5 分钟超时
def long_task():
    pass

任务链

from celery import chain, group, chord

# Chain:顺序执行
result = chain(add.s(2, 2), add.s(4), add.s(8))()

# Group:并行执行
result = group(add.s(2, 2), add.s(4, 4))()

# Chord:group + 回调
result = chord([add.s(i, i) for i in range(10)], sum_all.s())()

定时任务(beat)

# celeryconfig.py
from celery.schedules import crontab

beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
    'daily-report': {
        'task': 'tasks.send_report',
        'schedule': crontab(hour=7, minute=30),
    },
}

使用 Flower 监控

pip install flower
celery -A proj flower --port=5555
# 访问 http://localhost:5555

Next Steps