Tareas asincronas en DRF con Celery y Redis

Avanzado
Django
Django
Actualizado: 19/04/2026

Cuando una tarea NO debe estar en el request

Tu API recibe POST /api/pedidos/{id}/factura/ que tiene que generar un PDF. Si tarda 4 segundos, el cliente espera 4 segundos en cada request. Si tarda 40 segundos, el balanceador devuelve 504. Y aunque tarde poco, esos 4 segundos ocupan un worker que no puede atender otras peticiones.

La regla:

Si una tarea tarda mas de 1 segundo, encola en lugar de hacerla en el request. La API responde con job_id en milisegundos, el worker procesa cuando puede.

Casos clasicos:

  • Generar PDFs, Excel o reportes pesados.
  • Enviar emails, SMS o notificaciones push masivas.
  • Sincronizar con APIs externas lentas (CRM, ERP).
  • Procesar uploads (transcodificar video, extraer texto OCR).
  • Recalcular agregados (estadisticas, dashboards).
  • Llamar a LLMs largos (>5 s).

Celery + Redis: arquitectura

flowchart LR
    C[Cliente] -->|POST factura| API[DRF]
    API -->|task.delay| BR[(Redis broker)]
    API -->|201 + job_id| C
    BR --> W1[Celery worker 1]
    BR --> W2[Celery worker 2]
    W1 -->|resultado| BK[(Redis backend)]
    W2 -->|resultado| BK
    C -->|GET /jobs/job_id| API
    API -->|consulta estado| BK
  • Broker: Redis (recomendado) o RabbitMQ. Cola de tareas pendientes.
  • Worker: proceso aparte (celery -A core worker -l info) que coge tareas y las ejecuta.
  • Result backend: Redis (o BBDD). Almacena estado y resultado de cada job.

Instalacion y configuracion

pip install celery[redis] django-celery-results
# settings.py
INSTALLED_APPS = [..., "django_celery_results"]

CELERY_BROKER_URL          = "redis://redis:6379/0"
CELERY_RESULT_BACKEND      = "django-db"  # o "redis://redis:6379/1"
CELERY_TASK_SERIALIZER     = "json"
CELERY_RESULT_SERIALIZER   = "json"
CELERY_ACCEPT_CONTENT      = ["json"]
CELERY_TIMEZONE            = "Europe/Madrid"
CELERY_TASK_TRACK_STARTED  = True
CELERY_TASK_TIME_LIMIT     = 300        # mata tareas > 5 min
CELERY_TASK_SOFT_TIME_LIMIT = 240       # avisa antes
# core/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# core/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
python manage.py migrate django_celery_results

Definir tareas

# pedidos/tasks.py
from celery import shared_task
from pedidos.models import Pedido
from pedidos.servicios import generar_pdf_factura, enviar_email_factura

@shared_task(
    bind             = True,
    autoretry_for    = (Exception,),
    retry_backoff    = True,         # 1, 2, 4, 8, ... segundos
    retry_backoff_max= 600,
    retry_jitter     = True,
    max_retries      = 5,
)
def generar_factura(self, pedido_id: int):
    pedido = Pedido.objects.get(id=pedido_id)
    pdf    = generar_pdf_factura(pedido)
    enviar_email_factura(pedido, pdf)
    return {"pedido_id": pedido_id, "pdf_url": pdf.url}

bind=True inyecta self, lo que permite self.retry() manual o llamar a self.request.id. autoretry_for + retry_backoff automatiza reintentos con backoff exponencial.

Disparar desde una APIView

# pedidos/views.py
from rest_framework.viewsets import GenericViewSet
from rest_framework.mixins import RetrieveModelMixin
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework import status
from celery.result import AsyncResult
from pedidos.tasks import generar_factura
from pedidos.models import Pedido

class PedidoViewSet(RetrieveModelMixin, GenericViewSet):
    queryset = Pedido.objects.all()

    @action(detail=True, methods=["post"], url_path="factura")
    def factura(self, request, pk=None):
        pedido = self.get_object()
        async_result = generar_factura.delay(pedido.id)
        return Response(
            {"job_id": async_result.id, "status": "PENDING"},
            status=status.HTTP_202_ACCEPTED,
        )

Status HTTP 202 Accepted es la respuesta semanticamente correcta cuando se acepta una peticion para procesarla en background, no 200 OK ni 201 Created.

Endpoint de consulta de jobs

# jobs/views.py
from celery.result import AsyncResult
from rest_framework.views import APIView
from rest_framework.response import Response

class JobStatusView(APIView):
    def get(self, request, job_id):
        result = AsyncResult(job_id)
        return Response({
            "job_id":  job_id,
            "status":  result.status,         # PENDING, STARTED, SUCCESS, FAILURE, RETRY
            "result":  result.result if result.successful() else None,
            "error":   str(result.result) if result.failed() else None,
        })
# urls.py
path("api/jobs/<str:job_id>/", JobStatusView.as_view()),

El cliente sigue el patron polling:

async function esperarJob(jobId) {
    while (true) {
        const r = await fetch(`/api/jobs/${jobId}/`);
        const data = await r.json();
        if (data.status === "SUCCESS") return data.result;
        if (data.status === "FAILURE") throw new Error(data.error);
        await new Promise(r => setTimeout(r, 1000));
    }
}

Para reducir el polling, combina con WebSockets (Django Channels) y haz group_send desde la tarea cuando termina.

Tareas periodicas con Celery Beat

Para ejecutar algo cada hora o cada noche:

pip install django-celery-beat
# settings.py
INSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
celery -A core worker -l info
celery -A core beat   -l info

Crea una entrada desde el admin Django o programaticamente:

from django_celery_beat.models import PeriodicTask, CrontabSchedule

schedule = CrontabSchedule.objects.create(hour=3, minute=0)
PeriodicTask.objects.create(
    name      = "Limpieza nocturna",
    task      = "pedidos.tasks.limpieza_diaria",
    crontab   = schedule,
)

Es preferible al cron del sistema porque es trazable (cada ejecucion se registra) y se gestiona desde el admin sin acceso SSH.

Integracion con drf-spectacular

from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiTypes

@extend_schema(
    request   = None,
    responses = {
        202: OpenApiResponse(
            response    = OpenApiTypes.OBJECT,
            description = "Tarea encolada. Devuelve job_id para consultar estado.",
            examples = [
                {"value": {"job_id": "uuid", "status": "PENDING"}},
            ],
        ),
    },
    description = "Genera la factura PDF en background y la envia por email.",
)
@action(detail=True, methods=["post"], url_path="factura")
def factura(self, request, pk=None):
    ...

Errores y patrones a evitar

  • Pasar objetos Django como parametros. Pasa siempre ids (generar_factura.delay(pedido.id)), no generar_factura.delay(pedido). Los workers no comparten estado y serializar el objeto rompe lazy fields.
  • Olvidar idempotencia. Si autoretry esta activo, la tarea puede ejecutarse N veces. Implementa la logica como idempotente o usa una IdempotencyKey interna.
  • No limitar max_retries. Una tarea con max_retries=None y un bug puede saturar el broker.
  • Resultados gigantes en el backend. Si la tarea genera un PDF de 50 MB, NO lo metas en return. Sube a S3 y devuelve la URL.
  • Single broker para todo. Para volumen alto, separa colas por prioridad (celery -Q alta,baja) para que tareas largas no bloqueen las urgentes.

Monitorizacion

  • Flower: pip install flower y celery -A core flower --port=5555. Dashboard web con tareas, workers, retries.
  • Sentry: integra con celery_integration para capturar excepciones de workers.
  • Metricas Prometheus con celery_exporter.
Alan Sastre - Autor del tutorial

Alan Sastre

Ingeniero de Software y formador, CEO en CertiDevs

Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, Django es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.

Más tutoriales de Django

Explora más contenido relacionado con Django y continúa aprendiendo con nuestros tutoriales gratuitos.

Aprendizajes de esta lección

Configurar Celery con broker Redis y backend Redis. Anadir CELERY_BROKER_URL y CELERY_RESULT_BACKEND a settings. Crear shared_tasks con bind=True y autoretry_for. Disparar tareas desde una APIView con .delay() o .apply_async(). Devolver job_id como respuesta HTTP. Crear endpoint /api/jobs/{id}/ para consultar estado (PENDING, STARTED, SUCCESS, FAILURE, RETRY). Configurar celery beat para tareas periodicas.

Cursos que incluyen esta lección

Esta lección forma parte de los siguientes cursos estructurados con rutas de aprendizaje