Configurar Kafka binder (bootstrap.servers)
Antes de implementar un producer con Spring Cloud Stream, necesitamos configurar el binder de Kafka que actuará como el puente entre nuestra aplicación Spring Boot y el broker de Kafka. Esta configuración incluye las dependencias necesarias, la conexión al broker y el entorno de desarrollo con Docker.
Dependencias del proyecto
Para integrar Spring Cloud Stream con Kafka, añadimos la dependencia del Kafka binder que incluye todo lo necesario para la comunicación:
Maven (pom.xml):
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
Gradle (build.gradle):
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
Esta dependencia incluye automáticamente el Spring Cloud Stream core, el Kafka binder y las librerías del cliente de Kafka, proporcionando toda la funcionalidad necesaria para producir mensajes.
Configuración del binder en application.yml
La configuración del binder de Kafka se define en el archivo application.yml
y especifica cómo conectarse al broker y qué configuraciones aplicar:
spring:
application:
name: producer-service
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
bindings:
output:
destination: user-events
content-type: application/json
Los parámetros clave de esta configuración son:
brokers
: Especifica la dirección del broker de Kafka (localhost:9092 para desarrollo)auto-create-topics
: Permite que se creen automáticamente los topics si no existenkey.serializer
: Define cómo serializar las claves de los mensajesvalue.serializer
: Define cómo serializar el contenido del mensaje (JSON en este caso)destination
: Nombre del topic de Kafka donde se enviarán los mensajescontent-type
: Tipo de contenido para la serialización automática
Entorno de desarrollo con Docker
Para facilitar el desarrollo local, creamos un archivo Docker Compose que levanta Kafka y Zookeeper automáticamente:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Para iniciar el entorno ejecutamos:
docker-compose up -d
Configuración alternativa con propiedades específicas
También podemos configurar propiedades específicas de Kafka directamente en el binder para un control más granular:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
producer-properties:
acks: all
retries: 3
batch.size: 16384
linger.ms: 5
buffer.memory: 33554432
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
Estas propiedades del producer controlan aspectos como:
acks
: Nivel de confirmación requerido (all = máxima durabilidad)retries
: Número de reintentos en caso de errorbatch.size
: Tamaño del batch para envío en loteslinger.ms
: Tiempo de espera para llenar el batch
Verificación del entorno
Para confirmar que Kafka está funcionando correctamente, podemos usar las herramientas CLI incluidas en el contenedor:
# Listar topics existentes
docker exec kafka kafka-topics --bootstrap-server localhost:9092 --list
# Crear un topic manualmente (opcional, ya que auto-create-topics está habilitado)
docker exec kafka kafka-topics --bootstrap-server localhost:9092 --create --topic user-events --partitions 1 --replication-factor 1
Con esta configuración base del binder de Kafka, nuestra aplicación Spring Boot ya está preparada para conectarse al broker y enviar mensajes utilizando Spring Cloud Stream. El siguiente paso será implementar el componente producer que genere y publique los mensajes en el topic configurado.
Producer: Supplier y envío de mensajes
Una vez configurado el binder de Kafka, implementamos el componente producer utilizando el modelo funcional de Spring Cloud Stream. Este modelo se basa en interfaces funcionales estándar de Java como Supplier
, Function
y Consumer
, proporcionando una API más limpia y moderna para el manejo de mensajes.
Implementación con Supplier automático
El enfoque más directo para crear un producer es mediante un bean Supplier
que emite mensajes de forma periódica. Spring Cloud Stream detecta automáticamente estos beans y los conecta con los bindings configurados:
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
public Supplier<UserEvent> userEventSupplier() {
return () -> {
UserEvent event = new UserEvent(
UUID.randomUUID().toString(),
"user-" + System.currentTimeMillis(),
"USER_CREATED",
Instant.now()
);
System.out.println("Enviando evento: " + event);
return event;
};
}
}
La clase UserEvent
representa el mensaje que enviaremos:
public class UserEvent {
private String eventId;
private String userId;
private String eventType;
private Instant timestamp;
// Constructor
public UserEvent(String eventId, String userId, String eventType, Instant timestamp) {
this.eventId = eventId;
this.userId = userId;
this.eventType = eventType;
this.timestamp = timestamp;
}
// Getters y setters
public String getEventId() { return eventId; }
public void setEventId(String eventId) { this.eventId = eventId; }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public Instant getTimestamp() { return timestamp; }
public void setTimestamp(Instant timestamp) { this.timestamp = timestamp; }
@Override
public String toString() {
return String.format("UserEvent{eventId='%s', userId='%s', eventType='%s', timestamp=%s}",
eventId, userId, eventType, timestamp);
}
}
Configuración del binding para Supplier
El binding conecta nuestro Supplier con el topic de Kafka. En application.yml
configuramos la salida:
spring:
cloud:
stream:
function:
definition: userEventSupplier
bindings:
userEventSupplier-out-0:
destination: user-events
content-type: application/json
kafka:
bindings:
userEventSupplier-out-0:
producer:
configuration:
acks: all
retries: 3
El patrón de nomenclatura userEventSupplier-out-0
indica:
userEventSupplier
: Nombre del bean Supplierout
: Dirección de salida (output)0
: Índice del binding (para múltiples salidas)
Control de frecuencia de emisión
Por defecto, Spring Cloud Stream ejecuta el Supplier
cada 1 segundo. Podemos ajustar esta frecuencia:
spring:
cloud:
stream:
poller:
fixed-delay: 5000 # 5 segundos
max-messages-per-poll: 1
Para mayor control, implementamos un Supplier condicional que solo emite cuando hay datos:
@Bean
public Supplier<UserEvent> conditionalSupplier() {
return () -> {
// Lógica condicional para decidir si emitir
if (shouldEmitMessage()) {
return createUserEvent();
}
return null; // No emite mensaje
};
}
private boolean shouldEmitMessage() {
// Implementar lógica de negocio
return ThreadLocalRandom.current().nextBoolean();
}
Producer bajo demanda con StreamBridge
Para casos donde necesitamos enviar mensajes de forma reactiva (por ejemplo, desde un endpoint REST), utilizamos StreamBridge
:
@RestController
@RequestMapping("/api/events")
public class EventController {
private final StreamBridge streamBridge;
public EventController(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@PostMapping("/user")
public ResponseEntity<String> createUserEvent(@RequestBody CreateUserRequest request) {
UserEvent event = new UserEvent(
UUID.randomUUID().toString(),
request.getUserId(),
"USER_CREATED",
Instant.now()
);
// Enviar mensaje al topic configurado
boolean sent = streamBridge.send("user-events", event);
if (sent) {
return ResponseEntity.ok("Evento enviado correctamente");
} else {
return ResponseEntity.status(500).body("Error al enviar evento");
}
}
@PostMapping("/user/{userId}/update")
public ResponseEntity<String> updateUserEvent(@PathVariable String userId,
@RequestBody Map<String, Object> updates) {
UserEvent event = new UserEvent(
UUID.randomUUID().toString(),
userId,
"USER_UPDATED",
Instant.now()
);
streamBridge.send("user-events", event);
return ResponseEntity.ok("Evento de actualización enviado");
}
}
La clase CreateUserRequest
para el endpoint:
public class CreateUserRequest {
private String userId;
private String email;
private String name;
// Constructor, getters y setters
public CreateUserRequest() {}
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
}
Envío de mensajes con headers personalizados
Podemos enriquecer los mensajes con headers adicionales usando Message<T>
:
@Bean
public Supplier<Message<UserEvent>> enrichedUserEventSupplier() {
return () -> {
UserEvent event = createUserEvent();
return MessageBuilder
.withPayload(event)
.setHeader("source", "user-service")
.setHeader("version", "1.0")
.setHeader(KafkaHeaders.KEY, event.getUserId())
.setHeader(KafkaHeaders.PARTITION, calculatePartition(event.getUserId()))
.build();
};
}
private Integer calculatePartition(String userId) {
// Lógica simple de particionamiento
return Math.abs(userId.hashCode()) % 3; // 3 particiones
}
Pruebas del producer
Para verificar que nuestro producer funciona correctamente, podemos usar el consumidor de consola de Kafka:
# Consumir mensajes del topic
docker exec -it kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic user-events \
--from-beginning \
--property print.key=true \
--property print.headers=true
También podemos monitorizar la aplicación activando logs detallados:
logging:
level:
org.springframework.cloud.stream: DEBUG
org.springframework.kafka: DEBUG
org.apache.kafka: WARN
Manejo básico de errores
Para gestionar errores en el envío, configuramos reintentos y logs:
@Bean
public Supplier<UserEvent> resilientSupplier() {
return () -> {
try {
return createUserEvent();
} catch (Exception e) {
System.err.println("Error creando evento: " + e.getMessage());
return null; // No envía mensaje en caso de error
}
};
}
Con esta implementación del producer, nuestra aplicación puede generar y enviar mensajes tanto de forma automática (Supplier) como bajo demanda (StreamBridge), proporcionando flexibilidad para diferentes patrones de uso en arquitecturas de microservicios.
Fuentes y referencias
Documentación oficial y recursos externos para profundizar en SpringBoot
Documentación oficial de SpringBoot
Alan Sastre
Ingeniero de Software y formador, CEO en CertiDevs
Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, SpringBoot es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.
Más tutoriales de SpringBoot
Explora más contenido relacionado con SpringBoot y continúa aprendiendo con nuestros tutoriales gratuitos.
Aprendizajes de esta lección
- Configurar el binder de Kafka en Spring Cloud Stream para conectar con un broker Kafka.
- Implementar un producer usando el modelo funcional con Supplier para emitir mensajes.
- Ajustar la frecuencia de emisión y controlar el envío condicional de mensajes.
- Utilizar StreamBridge para enviar mensajes bajo demanda desde controladores REST.
- Enriquecer mensajes con headers personalizados y manejar errores básicos en el envío.
Cursos que incluyen esta lección
Esta lección forma parte de los siguientes cursos estructurados con rutas de aprendizaje