Celery

File de taches distribuee Python - execution async, planification, reessais, traitement temps reel

TL;DR

Quoi: File de tâches distribuée pour Python avec traitement en temps réel.

Pourquoi: Exécution de tâches async, planification, réessais, multiples brokers, monitoring.

Quick Start

Installer:

pip install celery redis

Créer des tâches (tasks.py):

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

Démarrer le worker:

celery -A tasks worker --loglevel=info

Appeler des tâches:

from tasks import add

# Appel async
result = add.delay(4, 4)
print(result.get())  # 8

# Ou avec apply_async
result = add.apply_async((4, 4), countdown=10)  # Exécuter dans 10 secondes

Cheatsheet

CommandeDescription
celery -A app workerDémarrer le worker
celery -A app beatDémarrer le planificateur
celery -A app flowerDémarrer l’UI de monitoring
celery -A app inspect activeAfficher les tâches actives
celery -A app purgePurger tous les messages
celery -A app statusAfficher le statut du worker

Gotchas

Project structure

# proj/celery.py
from celery import Celery

app = Celery('proj')
app.config_from_object('proj.celeryconfig')
app.autodiscover_tasks(['proj.tasks'])

# proj/celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True

Task options

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_data(self, data):
    try:
        # Traiter les données
        return result
    except Exception as exc:
        raise self.retry(exc=exc)

@app.task(rate_limit='10/m')  # 10 par minute
def limited_task():
    pass

@app.task(time_limit=300)  # Timeout de 5 minutes
def long_task():
    pass

Task chaining

from celery import chain, group, chord

# Chain: exécuter en séquence
result = chain(add.s(2, 2), add.s(4), add.s(8))()

# Group: exécuter en parallèle
result = group(add.s(2, 2), add.s(4, 4))()

# Chord: group + callback
result = chord([add.s(i, i) for i in range(10)], sum_all.s())()

Periodic tasks (beat)

# celeryconfig.py
from celery.schedules import crontab

beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
    'daily-report': {
        'task': 'tasks.send_report',
        'schedule': crontab(hour=7, minute=30),
    },
}

Monitoring with Flower

pip install flower
celery -A proj flower --port=5555
# Accès à http://localhost:5555

Next Steps