Cuando una tarea NO debe estar en el request
Tu API recibe POST /api/pedidos/{id}/factura/ que tiene que generar un PDF. Si tarda 4 segundos, el cliente espera 4 segundos en cada request. Si tarda 40 segundos, el balanceador devuelve 504. Y aunque tarde poco, esos 4 segundos ocupan un worker que no puede atender otras peticiones.
La regla:
Si una tarea tarda mas de 1 segundo, encola en lugar de hacerla en el request. La API responde con
job_iden milisegundos, el worker procesa cuando puede.
Casos clasicos:
- Generar PDFs, Excel o reportes pesados.
- Enviar emails, SMS o notificaciones push masivas.
- Sincronizar con APIs externas lentas (CRM, ERP).
- Procesar uploads (transcodificar video, extraer texto OCR).
- Recalcular agregados (estadisticas, dashboards).
- Llamar a LLMs largos (>5 s).
Celery + Redis: arquitectura
flowchart LR
C[Cliente] -->|POST factura| API[DRF]
API -->|task.delay| BR[(Redis broker)]
API -->|201 + job_id| C
BR --> W1[Celery worker 1]
BR --> W2[Celery worker 2]
W1 -->|resultado| BK[(Redis backend)]
W2 -->|resultado| BK
C -->|GET /jobs/job_id| API
API -->|consulta estado| BK
- Broker: Redis (recomendado) o RabbitMQ. Cola de tareas pendientes.
- Worker: proceso aparte (
celery -A core worker -l info) que coge tareas y las ejecuta. - Result backend: Redis (o BBDD). Almacena estado y resultado de cada job.
Instalacion y configuracion
pip install celery[redis] django-celery-results
# settings.py
INSTALLED_APPS = [..., "django_celery_results"]
CELERY_BROKER_URL = "redis://redis:6379/0"
CELERY_RESULT_BACKEND = "django-db" # o "redis://redis:6379/1"
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TIMEZONE = "Europe/Madrid"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 300 # mata tareas > 5 min
CELERY_TASK_SOFT_TIME_LIMIT = 240 # avisa antes
# core/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# core/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
python manage.py migrate django_celery_results
Definir tareas
# pedidos/tasks.py
from celery import shared_task
from pedidos.models import Pedido
from pedidos.servicios import generar_pdf_factura, enviar_email_factura
@shared_task(
bind = True,
autoretry_for = (Exception,),
retry_backoff = True, # 1, 2, 4, 8, ... segundos
retry_backoff_max= 600,
retry_jitter = True,
max_retries = 5,
)
def generar_factura(self, pedido_id: int):
pedido = Pedido.objects.get(id=pedido_id)
pdf = generar_pdf_factura(pedido)
enviar_email_factura(pedido, pdf)
return {"pedido_id": pedido_id, "pdf_url": pdf.url}
bind=Trueinyectaself, lo que permiteself.retry()manual o llamar aself.request.id.autoretry_for + retry_backoffautomatiza reintentos con backoff exponencial.
Disparar desde una APIView
# pedidos/views.py
from rest_framework.viewsets import GenericViewSet
from rest_framework.mixins import RetrieveModelMixin
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework import status
from celery.result import AsyncResult
from pedidos.tasks import generar_factura
from pedidos.models import Pedido
class PedidoViewSet(RetrieveModelMixin, GenericViewSet):
queryset = Pedido.objects.all()
@action(detail=True, methods=["post"], url_path="factura")
def factura(self, request, pk=None):
pedido = self.get_object()
async_result = generar_factura.delay(pedido.id)
return Response(
{"job_id": async_result.id, "status": "PENDING"},
status=status.HTTP_202_ACCEPTED,
)
Status HTTP 202 Accepted es la respuesta semanticamente correcta cuando se acepta una peticion para procesarla en background, no
200 OKni201 Created.
Endpoint de consulta de jobs
# jobs/views.py
from celery.result import AsyncResult
from rest_framework.views import APIView
from rest_framework.response import Response
class JobStatusView(APIView):
def get(self, request, job_id):
result = AsyncResult(job_id)
return Response({
"job_id": job_id,
"status": result.status, # PENDING, STARTED, SUCCESS, FAILURE, RETRY
"result": result.result if result.successful() else None,
"error": str(result.result) if result.failed() else None,
})
# urls.py
path("api/jobs/<str:job_id>/", JobStatusView.as_view()),
El cliente sigue el patron polling:
async function esperarJob(jobId) {
while (true) {
const r = await fetch(`/api/jobs/${jobId}/`);
const data = await r.json();
if (data.status === "SUCCESS") return data.result;
if (data.status === "FAILURE") throw new Error(data.error);
await new Promise(r => setTimeout(r, 1000));
}
}
Para reducir el polling, combina con WebSockets (Django Channels) y haz
group_senddesde la tarea cuando termina.
Tareas periodicas con Celery Beat
Para ejecutar algo cada hora o cada noche:
pip install django-celery-beat
# settings.py
INSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
celery -A core worker -l info
celery -A core beat -l info
Crea una entrada desde el admin Django o programaticamente:
from django_celery_beat.models import PeriodicTask, CrontabSchedule
schedule = CrontabSchedule.objects.create(hour=3, minute=0)
PeriodicTask.objects.create(
name = "Limpieza nocturna",
task = "pedidos.tasks.limpieza_diaria",
crontab = schedule,
)
Es preferible al
crondel sistema porque es trazable (cada ejecucion se registra) y se gestiona desde el admin sin acceso SSH.
Integracion con drf-spectacular
from drf_spectacular.utils import extend_schema, OpenApiResponse, OpenApiTypes
@extend_schema(
request = None,
responses = {
202: OpenApiResponse(
response = OpenApiTypes.OBJECT,
description = "Tarea encolada. Devuelve job_id para consultar estado.",
examples = [
{"value": {"job_id": "uuid", "status": "PENDING"}},
],
),
},
description = "Genera la factura PDF en background y la envia por email.",
)
@action(detail=True, methods=["post"], url_path="factura")
def factura(self, request, pk=None):
...
Errores y patrones a evitar
- Pasar objetos Django como parametros. Pasa siempre ids (
generar_factura.delay(pedido.id)), nogenerar_factura.delay(pedido). Los workers no comparten estado y serializar el objeto rompe lazy fields. - Olvidar idempotencia. Si autoretry esta activo, la tarea puede ejecutarse N veces. Implementa la logica como idempotente o usa una
IdempotencyKeyinterna. - No limitar
max_retries. Una tarea conmax_retries=Noney un bug puede saturar el broker. - Resultados gigantes en el backend. Si la tarea genera un PDF de 50 MB, NO lo metas en
return. Sube a S3 y devuelve la URL. - Single broker para todo. Para volumen alto, separa colas por prioridad (
celery -Q alta,baja) para que tareas largas no bloqueen las urgentes.
Monitorizacion
- Flower:
pip install flowerycelery -A core flower --port=5555. Dashboard web con tareas, workers, retries. - Sentry: integra con
celery_integrationpara capturar excepciones de workers. - Metricas Prometheus con
celery_exporter.
Alan Sastre
Ingeniero de Software y formador, CEO en CertiDevs
Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, Django es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.
Más tutoriales de Django
Explora más contenido relacionado con Django y continúa aprendiendo con nuestros tutoriales gratuitos.
Aprendizajes de esta lección
Configurar Celery con broker Redis y backend Redis. Anadir CELERY_BROKER_URL y CELERY_RESULT_BACKEND a settings. Crear shared_tasks con bind=True y autoretry_for. Disparar tareas desde una APIView con .delay() o .apply_async(). Devolver job_id como respuesta HTTP. Crear endpoint /api/jobs/{id}/ para consultar estado (PENDING, STARTED, SUCCESS, FAILURE, RETRY). Configurar celery beat para tareas periodicas.
Cursos que incluyen esta lección
Esta lección forma parte de los siguientes cursos estructurados con rutas de aprendizaje