Spring Cloud Stream producer con Kafka

Intermedio
SpringBoot
SpringBoot
Actualizado: 09/10/2025

Configurar Kafka binder (bootstrap.servers)

Antes de implementar un producer con Spring Cloud Stream, necesitamos configurar el binder de Kafka que actuará como el puente entre nuestra aplicación Spring Boot y el broker de Kafka. Esta configuración incluye las dependencias necesarias, la conexión al broker y el entorno de desarrollo con Docker.

Dependencias del proyecto

Para integrar Spring Cloud Stream con Kafka, añadimos la dependencia del Kafka binder que incluye todo lo necesario para la comunicación:

Maven (pom.xml):

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Gradle (build.gradle):

implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'

Esta dependencia incluye automáticamente el Spring Cloud Stream core, el Kafka binder y las librerías del cliente de Kafka, proporcionando toda la funcionalidad necesaria para producir mensajes.

Configuración del binder en application.yml

La configuración del binder de Kafka se define en el archivo application.yml y especifica cómo conectarse al broker y qué configuraciones aplicar:

spring:
  application:
    name: producer-service
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: true
          configuration:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
      bindings:
        output:
          destination: user-events
          content-type: application/json

Los parámetros clave de esta configuración son:

  • brokers: Especifica la dirección del broker de Kafka (localhost:9092 para desarrollo)
  • auto-create-topics: Permite que se creen automáticamente los topics si no existen
  • key.serializer: Define cómo serializar las claves de los mensajes
  • value.serializer: Define cómo serializar el contenido del mensaje (JSON en este caso)
  • destination: Nombre del topic de Kafka donde se enviarán los mensajes
  • content-type: Tipo de contenido para la serialización automática

Entorno de desarrollo con Docker

Para facilitar el desarrollo local, creamos un archivo Docker Compose que levanta Kafka y Zookeeper automáticamente:

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

Para iniciar el entorno ejecutamos:

docker-compose up -d

Configuración alternativa con propiedades específicas

También podemos configurar propiedades específicas de Kafka directamente en el binder para un control más granular:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          producer-properties:
            acks: all
            retries: 3
            batch.size: 16384
            linger.ms: 5
            buffer.memory: 33554432
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: org.springframework.kafka.support.serializer.JsonSerializer

Estas propiedades del producer controlan aspectos como:

  • acks: Nivel de confirmación requerido (all = máxima durabilidad)
  • retries: Número de reintentos en caso de error
  • batch.size: Tamaño del batch para envío en lotes
  • linger.ms: Tiempo de espera para llenar el batch

Verificación del entorno

Para confirmar que Kafka está funcionando correctamente, podemos usar las herramientas CLI incluidas en el contenedor:

# Listar topics existentes
docker exec kafka kafka-topics --bootstrap-server localhost:9092 --list

# Crear un topic manualmente (opcional, ya que auto-create-topics está habilitado)
docker exec kafka kafka-topics --bootstrap-server localhost:9092 --create --topic user-events --partitions 1 --replication-factor 1

Con esta configuración base del binder de Kafka, nuestra aplicación Spring Boot ya está preparada para conectarse al broker y enviar mensajes utilizando Spring Cloud Stream. El siguiente paso será implementar el componente producer que genere y publique los mensajes en el topic configurado.

Producer: Supplier y envío de mensajes

Una vez configurado el binder de Kafka, implementamos el componente producer utilizando el modelo funcional de Spring Cloud Stream. Este modelo se basa en interfaces funcionales estándar de Java como Supplier, Function y Consumer, proporcionando una API más limpia y moderna para el manejo de mensajes.

Implementación con Supplier automático

El enfoque más directo para crear un producer es mediante un bean Supplier que emite mensajes de forma periódica. Spring Cloud Stream detecta automáticamente estos beans y los conecta con los bindings configurados:

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    public Supplier<UserEvent> userEventSupplier() {
        return () -> {
            UserEvent event = new UserEvent(
                UUID.randomUUID().toString(),
                "user-" + System.currentTimeMillis(),
                "USER_CREATED",
                Instant.now()
            );
            
            System.out.println("Enviando evento: " + event);
            return event;
        };
    }
}

La clase UserEvent representa el mensaje que enviaremos:

public class UserEvent {
    private String eventId;
    private String userId;
    private String eventType;
    private Instant timestamp;

    // Constructor
    public UserEvent(String eventId, String userId, String eventType, Instant timestamp) {
        this.eventId = eventId;
        this.userId = userId;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }

    // Getters y setters
    public String getEventId() { return eventId; }
    public void setEventId(String eventId) { this.eventId = eventId; }
    
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    
    public String getEventType() { return eventType; }
    public void setEventType(String eventType) { this.eventType = eventType; }
    
    public Instant getTimestamp() { return timestamp; }
    public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }

    @Override
    public String toString() {
        return String.format("UserEvent{eventId='%s', userId='%s', eventType='%s', timestamp=%s}",
                eventId, userId, eventType, timestamp);
    }
}

Configuración del binding para Supplier

El binding conecta nuestro Supplier con el topic de Kafka. En application.yml configuramos la salida:

spring:
  cloud:
    stream:
      function:
        definition: userEventSupplier
      bindings:
        userEventSupplier-out-0:
          destination: user-events
          content-type: application/json
      kafka:
        bindings:
          userEventSupplier-out-0:
            producer:
              configuration:
                acks: all
                retries: 3

El patrón de nomenclatura userEventSupplier-out-0 indica:

  • userEventSupplier: Nombre del bean Supplier
  • out: Dirección de salida (output)
  • 0: Índice del binding (para múltiples salidas)

Control de frecuencia de emisión

Por defecto, Spring Cloud Stream ejecuta el Supplier cada 1 segundo. Podemos ajustar esta frecuencia:

spring:
  cloud:
    stream:
      poller:
        fixed-delay: 5000  # 5 segundos
        max-messages-per-poll: 1

Para mayor control, implementamos un Supplier condicional que solo emite cuando hay datos:

@Bean
public Supplier<UserEvent> conditionalSupplier() {
    return () -> {
        // Lógica condicional para decidir si emitir
        if (shouldEmitMessage()) {
            return createUserEvent();
        }
        return null; // No emite mensaje
    };
}

private boolean shouldEmitMessage() {
    // Implementar lógica de negocio
    return ThreadLocalRandom.current().nextBoolean();
}

Producer bajo demanda con StreamBridge

Para casos donde necesitamos enviar mensajes de forma reactiva (por ejemplo, desde un endpoint REST), utilizamos StreamBridge:

@RestController
@RequestMapping("/api/events")
public class EventController {

    private final StreamBridge streamBridge;

    public EventController(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @PostMapping("/user")
    public ResponseEntity<String> createUserEvent(@RequestBody CreateUserRequest request) {
        UserEvent event = new UserEvent(
            UUID.randomUUID().toString(),
            request.getUserId(),
            "USER_CREATED",
            Instant.now()
        );

        // Enviar mensaje al topic configurado
        boolean sent = streamBridge.send("user-events", event);
        
        if (sent) {
            return ResponseEntity.ok("Evento enviado correctamente");
        } else {
            return ResponseEntity.status(500).body("Error al enviar evento");
        }
    }

    @PostMapping("/user/{userId}/update")
    public ResponseEntity<String> updateUserEvent(@PathVariable String userId, 
                                                 @RequestBody Map<String, Object> updates) {
        UserEvent event = new UserEvent(
            UUID.randomUUID().toString(),
            userId,
            "USER_UPDATED",
            Instant.now()
        );

        streamBridge.send("user-events", event);
        return ResponseEntity.ok("Evento de actualización enviado");
    }
}

La clase CreateUserRequest para el endpoint:

public class CreateUserRequest {
    private String userId;
    private String email;
    private String name;

    // Constructor, getters y setters
    public CreateUserRequest() {}

    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }

    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }

    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
}

Envío de mensajes con headers personalizados

Podemos enriquecer los mensajes con headers adicionales usando Message<T>:

@Bean
public Supplier<Message<UserEvent>> enrichedUserEventSupplier() {
    return () -> {
        UserEvent event = createUserEvent();
        
        return MessageBuilder
            .withPayload(event)
            .setHeader("source", "user-service")
            .setHeader("version", "1.0")
            .setHeader(KafkaHeaders.KEY, event.getUserId())
            .setHeader(KafkaHeaders.PARTITION, calculatePartition(event.getUserId()))
            .build();
    };
}

private Integer calculatePartition(String userId) {
    // Lógica simple de particionamiento
    return Math.abs(userId.hashCode()) % 3; // 3 particiones
}

Pruebas del producer

Para verificar que nuestro producer funciona correctamente, podemos usar el consumidor de consola de Kafka:

# Consumir mensajes del topic
docker exec -it kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --from-beginning \
  --property print.key=true \
  --property print.headers=true

También podemos monitorizar la aplicación activando logs detallados:

logging:
  level:
    org.springframework.cloud.stream: DEBUG
    org.springframework.kafka: DEBUG
    org.apache.kafka: WARN

Manejo básico de errores

Para gestionar errores en el envío, configuramos reintentos y logs:

@Bean
public Supplier<UserEvent> resilientSupplier() {
    return () -> {
        try {
            return createUserEvent();
        } catch (Exception e) {
            System.err.println("Error creando evento: " + e.getMessage());
            return null; // No envía mensaje en caso de error
        }
    };
}

Con esta implementación del producer, nuestra aplicación puede generar y enviar mensajes tanto de forma automática (Supplier) como bajo demanda (StreamBridge), proporcionando flexibilidad para diferentes patrones de uso en arquitecturas de microservicios.

Fuentes y referencias

Documentación oficial y recursos externos para profundizar en SpringBoot

Documentación oficial de SpringBoot
Alan Sastre - Autor del tutorial

Alan Sastre

Ingeniero de Software y formador, CEO en CertiDevs

Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, SpringBoot es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.

Más tutoriales de SpringBoot

Explora más contenido relacionado con SpringBoot y continúa aprendiendo con nuestros tutoriales gratuitos.

Aprendizajes de esta lección

  • Configurar el binder de Kafka en Spring Cloud Stream para conectar con un broker Kafka.
  • Implementar un producer usando el modelo funcional con Supplier para emitir mensajes.
  • Ajustar la frecuencia de emisión y controlar el envío condicional de mensajes.
  • Utilizar StreamBridge para enviar mensajes bajo demanda desde controladores REST.
  • Enriquecer mensajes con headers personalizados y manejar errores básicos en el envío.

Cursos que incluyen esta lección

Esta lección forma parte de los siguientes cursos estructurados con rutas de aprendizaje