Eventos en DRF con Apache Kafka

Avanzado
Django
Django
Actualizado: 19/04/2026

Por que Kafka y no llamadas HTTP

Tu API publica PedidoCreado y necesitan reaccionar:

  • El servicio de inventario decrementa stock.
  • El servicio de email envia confirmacion.
  • El servicio de analytics registra el evento.
  • El servicio de CRM actualiza el historial.

Con HTTP sincrono, tu vista hace 4 POSTs y si uno falla, todo falla. Si el CRM esta lento (5 s), tu API tambien. Acoplas vida de tu API a la salud de servicios que ni controlas.

Con Kafka publicas un evento PedidoCreado en un topic. Cada consumidor lo lee a su ritmo, retrocede en el log si necesita reprocesar, y la caida de un servicio no afecta a los demas. Es la base de las arquitecturas event-driven modernas.

Diferencia clave: en Kafka el log persiste (dias o ilimitado), no es una cola transitoria como Redis. Permite replay (reprocesar eventos viejos) y multiples consumidores independientes del mismo topic.

Esquema canonico del evento

{
  "id":           "evt_01J3K8...",
  "type":         "PedidoCreado",
  "version":      "v1",
  "occurred_at":  "2026-04-18T10:34:21Z",
  "actor": { "type": "user", "id": "u_42" },
  "data": {
    "pedido_id": "ord_8923",
    "total":     "59.90",
    "moneda":    "EUR",
    "lineas":    [{ "producto_id": 1, "cantidad": 2 }]
  }
}

Mismo formato que webhooks (uniformidad). El version permite evolucionar sin romper consumidores antiguos.

Productor con confluent-kafka

pip install confluent-kafka
# core/kafka.py
import json
from confluent_kafka import Producer
from django.conf import settings

producer = Producer({
    "bootstrap.servers": settings.KAFKA_BROKERS,     # "kafka:9092"
    "client.id":         "drf-api",
    "acks":              "all",                      # espera ack de todos los replicas
    "enable.idempotence": True,                      # no duplica si retransmite
    "compression.type":  "lz4",
})

def publicar(topic: str, key: str, value: dict):
    producer.produce(
        topic = topic,
        key   = key.encode(),
        value = json.dumps(value, separators=(",", ":")).encode(),
        on_delivery = _on_delivery,
    )
    producer.poll(0)  # NO bloquea; procesa callbacks pendientes

def _on_delivery(err, msg):
    if err:
        # log a Sentry / structured log
        print(f"kafka delivery failed: {err}")

Llamada sencilla: publicar("pedidos", str(pedido.id), evento_dict).

Publicar desde signals

# pedidos/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.utils import timezone
from uuid import uuid4
from pedidos.models import Pedido
from core.kafka import publicar

@receiver(post_save, sender=Pedido)
def emitir_pedido_creado(sender, instance, created, **kwargs):
    if not created:
        return
    publicar("pedidos.events", str(instance.id), {
        "id":          f"evt_{uuid4().hex}",
        "type":        "PedidoCreado",
        "version":     "v1",
        "occurred_at": timezone.now().isoformat(),
        "data": {
            "pedido_id": str(instance.id),
            "total":     str(instance.total),
            "user_id":   instance.usuario_id,
        },
    })

Cuidado: signals + Kafka no son atomicos. Si la transaccion BBDD hace rollback DESPUES del send, publicas un evento fantasma. Solucion: outbox pattern mas abajo.

El outbox pattern

Para garantizar exactamente una vez la consistencia entre BBDD y Kafka:

  1. En la misma transaccion atomica que crea el Pedido, inserta una fila en outbox.
  2. Un proceso aparte (relay) lee outbox y publica en Kafka.
  3. Tras ack exitoso, marca la fila como enviada (o la borra).
# core/models.py
class OutboxEvent(models.Model):
    topic       = models.CharField(max_length=128)
    key         = models.CharField(max_length=64)
    payload     = models.JSONField()
    created_at  = models.DateTimeField(auto_now_add=True)
    sent_at     = models.DateTimeField(null=True)
# pedidos/views.py
from django.db import transaction
from core.models import OutboxEvent

class PedidoViewSet(ModelViewSet):
    def perform_create(self, serializer):
        with transaction.atomic():
            pedido = serializer.save()
            OutboxEvent.objects.create(
                topic   = "pedidos.events",
                key     = str(pedido.id),
                payload = {
                    "id":         f"evt_{uuid4().hex}",
                    "type":       "PedidoCreado",
                    "occurred_at": timezone.now().isoformat(),
                    "data": {"pedido_id": str(pedido.id)},
                },
            )
# core/management/commands/outbox_relay.py
import time
from django.core.management.base import BaseCommand
from django.utils import timezone
from core.models import OutboxEvent
from core.kafka import publicar, producer

class Command(BaseCommand):
    def handle(self, *a, **kw):
        while True:
            qs = OutboxEvent.objects.filter(sent_at__isnull=True).order_by("created_at")[:100]
            for ev in qs:
                publicar(ev.topic, ev.key, ev.payload)
            producer.flush()
            qs.update(sent_at=timezone.now())
            time.sleep(1)

Asi BBDD y Kafka acaban consistentes. Si el relay muere antes de marcar sent_at, en el siguiente ciclo reenvia: by design at-least-once, lo que el consumidor debe gestionar con event_id para dedup.

Consumer como management command

# core/management/commands/consume_pedidos.py
import json
from confluent_kafka import Consumer
from django.core.management.base import BaseCommand
from django.conf import settings
from inventario.servicios import descontar_stock

class Command(BaseCommand):
    def handle(self, *a, **kw):
        consumer = Consumer({
            "bootstrap.servers":  settings.KAFKA_BROKERS,
            "group.id":           "inventario-consumer",
            "auto.offset.reset":  "earliest",
            "enable.auto.commit": False,
        })
        consumer.subscribe(["pedidos.events"])

        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    continue
                if msg.error():
                    print(f"kafka error: {msg.error()}")
                    continue

                evento = json.loads(msg.value())
                if evento["type"] == "PedidoCreado":
                    descontar_stock(evento["data"]["pedido_id"])
                consumer.commit(asynchronous=False)
        finally:
            consumer.close()

group.id define un consumer group. Si lanzas N replicas con el mismo group.id, Kafka reparte particiones entre ellas (escalado horizontal automatico). Distintos group.id reciben todos los eventos (broadcasting).

Particiones y orden

Kafka garantiza orden dentro de una particion, no entre particiones. Estrategia comun: partition key = entidad (key = pedido_id). Asi todos los eventos del mismo pedido caen en la misma particion y se procesan en orden.

publicar("pedidos.events", key=str(pedido.id), value=evento)

Dead Letter Topic

Si una excepcion repetitiva impide procesar un evento, mover a un topic separado para inspeccion humana sin bloquear al consumer.

def procesar_con_dlt(evento):
    try:
        descontar_stock(evento["data"]["pedido_id"])
    except Exception as exc:
        publicar("pedidos.events.dlt", evento["id"], {
            **evento, "error": str(exc),
        })

Schema Registry (Avro / Protobuf)

En entornos grandes con muchos servicios, Avro + Schema Registry (Confluent) imponen un esquema versionado y validado. JSON puro es mas flexible pero permite que un servicio publique payloads malformados que rompen consumidores. Para empezar, JSON con version en el evento es suficiente.

Diagrama del flujo

sequenceDiagram
    participant API as DRF
    participant DB as Postgres
    participant OUT as Outbox table
    participant REL as Relay (mgmt cmd)
    participant K as Kafka
    participant C1 as Consumer Inventario
    participant C2 as Consumer Email
    API->>DB: INSERT Pedido (atomic)
    API->>OUT: INSERT OutboxEvent (atomic)
    REL->>OUT: SELECT pendientes
    REL->>K: produce PedidoCreado
    K-->>REL: ack
    REL->>OUT: UPDATE sent_at
    K->>C1: PedidoCreado
    K->>C2: PedidoCreado
    C1->>DB: UPDATE stock
    C2->>SMTP: send_mail

Cuando elegir Kafka frente a alternativas

  • Kafka: log persistente, replay, multiples consumidores. Ideal con muchos servicios y volumen >1k eventos/s.
  • RabbitMQ: cola transitoria con routing complejo. Bueno para tareas tipo Celery, no tanto para event sourcing.
  • Redis Streams: mas simple, suficiente para volumen moderado. Persiste pero menos garantias.
  • Postgres LISTEN/NOTIFY: gratis si ya tienes Postgres y volumen bajo. No persiste eventos perdidos.

Kafka tiene operatividad alta (Zookeeper o KRaft, particiones, backup). Solo merece la pena si ya tienes una arquitectura de microservicios o trafico relevante. Para un monolito DRF, Celery + Redis es mas que suficiente al principio.

Alan Sastre - Autor del tutorial

Alan Sastre

Ingeniero de Software y formador, CEO en CertiDevs

Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, Django es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.

Más tutoriales de Django

Explora más contenido relacionado con Django y continúa aprendiendo con nuestros tutoriales gratuitos.

Aprendizajes de esta lección

Comprender el patron event-driven y por que desacopla servicios. Configurar un productor confluent-kafka en Django. Definir el esquema canonico del evento (event_id, event_type, occurred_at, data). Publicar eventos desde signals post_save o vistas DRF. Implementar el outbox pattern para garantizar atomicidad BBDD + envio Kafka. Crear consumers como management commands. Manejar fallos: dead letter topic, retries.

Cursos que incluyen esta lección

Esta lección forma parte de los siguientes cursos estructurados con rutas de aprendizaje