SpringBoot
Tutorial SpringBoot: Operadores reactivos básicos
Aprende a crear, transformar, filtrar, combinar datos con los operadores reactivos de Reactor sobre los tipos Mono y Flux en Spring WebFlux utilizando Spring Boot.
Aprende SpringBoot GRATIS y certifícateOperadores de creación: just, empty, never, error, from, range, interval
Los operadores de creación en Reactor son fundamentales para iniciar flujos reactivos a partir de diferentes fuentes de datos. Estos operadores permiten generar secuencias Mono
o Flux
según las necesidades específicas de la aplicación.
Operador just
El operador just
crea un Mono
o Flux
a partir de uno o varios elementos proporcionados. Es una forma sencilla de iniciar un flujo con valores conocidos.
Mono<String> monoJust = Mono.just("Hola Mundo");
Flux<Integer> fluxJust = Flux.just(1, 2, 3, 4, 5);
En el ejemplo anterior, monoJust
emitirá el mensaje "Hola Mundo" y completará, mientras que fluxJust
emitirá una secuencia de números del 1 al 5.
Operador empty
El operador empty
genera un Mono
o Flux
que completa sin emitir ningún elemento. Es útil cuando se desea representar una ausencia de datos sin errores.
Mono<String> monoEmpty = Mono.empty();
Flux<String> fluxEmpty = Flux.empty();
Tanto monoEmpty
como fluxEmpty
completarán inmediatamente sin emitir valores. Este comportamiento es útil en casos donde un flujo podría o no tener datos, y la ausencia de ellos es un resultado válido.
Operador never
El operador never
crea un flujo que no emite elementos ni completa. Se mantiene en ejecución indefinidamente, lo cual puede ser útil en pruebas o situaciones específicas donde se requiere un flujo inactivo.
Mono<String> monoNever = Mono.never();
Flux<String> fluxNever = Flux.never();
Estos flujos no emitirán valores ni señales de finalización, manteniendo al suscriptor en espera de eventos que nunca ocurrirán.
Operador error
El operador error
genera un flujo que inmediatamente emite una señal de error. Es útil para representar condiciones de fallo desde el inicio del flujo.
Mono<String> monoError = Mono.error(new RuntimeException("Error ocurrido"));
Flux<String> fluxError = Flux.error(new RuntimeException("Error en el flujo"));
Al suscribirse a estos flujos, se recibirá la señal de error con la excepción proporcionada, permitiendo su manejo mediante operadores de control de errores.
Operador from
El operador from
permite crear flujos a partir de distintos tipos de fuentes, como CompletableFuture
, Stream
, Iterable
o incluso otros Publisher
.
CompletableFuture<String> future = CompletableFuture.completedFuture("Dato asíncrono");
Mono<String> monoFromFuture = Mono.fromFuture(future);
List<String> lista = Arrays.asList("A", "B", "C");
Flux<String> fluxFromIterable = Flux.fromIterable(lista);
En este ejemplo, monoFromFuture
emitirá el resultado del **CompletableFuture**
, mientras que fluxFromIterable
emitirá los elementos de la lista proporcionada.
Operador range
El operador range
crea un Flux
que emite una secuencia de números enteros en un rango específico.
Flux<Integer> fluxRange = Flux.range(10, 5);
El flujo fluxRange
emitirá los números enteros desde 10 hasta 14. Este operador es útil para generar secuencias numéricas de forma concisa.
Operador interval
El operador interval
genera un Flux
que emite secuencias de números largos (Long
) a intervalos de tiempo regulares.
Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));
En este caso, fluxInterval
emitirá un número cada segundo, empezando desde cero y aumentando en uno con cada emisión. Este operador es especialmente útil para tareas programadas o temporizadores.
Estos operadores proporcionan diversas formas de iniciar flujos reactivos en Reactor, adaptándose a diferentes escenarios y fuentes de datos. Al combinarlos con otros operadores, es posible construir pipelines de procesamiento de datos complejos y eficientes.
Operadores de transformación: map, flatMap, concatMap, switchMap
Los operadores de transformación en Reactor son esenciales para modificar los elementos emitidos por flujos reactivos, permitiendo adaptarlos a las necesidades específicas de procesamiento.
El operador **map**
transforma cada elemento emitido por un flujo aplicando una función de manera sincrónica. Es útil para operaciones directas y rápidas sobre los datos.
Flux<String> nombres = Flux.just("Ana", "Luis", "Marta");
Flux<Integer> longitudes = nombres.map(String::length);
En este ejemplo, el **Flux**
longitudes
emitirá los valores 3, 4 y 5, correspondientes a la longitud de cada nombre.
Cuando la transformación de cada elemento implica una operación que devuelve un **Publisher**
, como otra fuente de datos reactiva, se utiliza el operador **flatMap**
. Este operador aplanará los flujos resultantes en un único flujo, permitiendo el manejo de operaciones asíncronas.
Flux<String> usuarios = Flux.just("usuario1", "usuario2", "usuario3");
Flux<String> detalles = usuarios.flatMap(this::obtenerDetallesUsuario);
Aquí, obtenerDetallesUsuario
es un método que devuelve un **Mono<String>**
con los detalles de cada usuario. El **flatMap**
combinará todos los **Mono**
en un solo **Flux**
detalles
.
Si es importante mantener el orden de emisión de los elementos transformados, se debe emplear el operador **concatMap**
. A diferencia de **flatMap**
, **concatMap**
garantiza que las suscripciones a los flujos internos ocurren de forma secuencial.
Flux<String> mensajes = Flux.just("Primero", "Segundo", "Tercero");
Flux<String> procesados = mensajes.concatMap(this::procesarMensaje);
En este caso, procesarMensaje
devuelve un **Flux<String>**
por cada mensaje. El uso de **concatMap**
asegura que los mensajes se procesen uno tras otro, respetando el orden original.
El operador **switchMap**
también transforma cada elemento en un **Publisher**
, pero siempre suscribe al flujo más reciente, cancelando los anteriores. Es especialmente útil en situaciones donde solo interesa el último resultado disponible, como en búsquedas en tiempo real.
Flux<String> consultas = obtenerConsultasUsuario();
Flux<List<Resultado>> resultados = consultas.switchMap(this::buscarResultados);
Si el usuario realiza varias consultas rápidamente, **switchMap**
cancelará las búsquedas anteriores y solo procesará la más reciente, evitando cargas innecesarias en el sistema.
Entender cuándo utilizar cada operador es crucial para el desarrollo de aplicaciones reactivas eficientes. Mientras **map**
es adecuado para transformaciones simples, **flatMap**
, **concatMap**
y **switchMap**
ofrecen control sobre operaciones más complejas y asíncronas, permitiendo manejar flujos internos y controlar el comportamiento en cuanto a concurrencia y orden de emisión.
Operadores de filtrado: filter, take, takeLast, skip, skipLast
Los operadores de filtrado en Reactor permiten controlar y limitar las emisiones de flujos reactivos, facilitando la manipulación eficiente de los datos. Estos operadores son esenciales para seleccionar elementos específicos de un Flux o un Mono según distintas condiciones.
Operador filter
El operador filter
emite únicamente los elementos que cumplen con una condición determinada. Recibe una función predicado que evalúa cada elemento y decide si debe ser emitido.
Flux<Integer> numeros = Flux.range(1, 10);
Flux<Integer> pares = numeros.filter(n -> n % 2 == 0);
pares.subscribe(System.out::println);
// Salida: 2, 4, 6, 8, 10
En este ejemplo, el Flux pares
emitirá solo los números pares del 1 al 10. El operador **filter**
es útil para descartar elementos no deseados basándose en criterios lógicos.
Operador take
El operador take
limita el número de emisiones a las primeras N
emitidas por el flujo original. Es especialmente útil cuando solo se necesitan los primeros elementos de una secuencia.
Flux<String> dias = Flux.just("Lunes", "Martes", "Miércoles", "Jueves", "Viernes");
Flux<String> primerosTresDias = dias.take(3);
primerosTresDias.subscribe(System.out::println);
// Salida: Lunes, Martes, Miércoles
Aquí, se toman los primeros tres días de la semana laboral. El operador **take**
es una forma sencilla de acotar flujos a una cantidad específica de elementos desde el inicio.
Operador takeLast
El operador takeLast
emite los últimos N
elementos de un flujo. A diferencia de take
, este operador espera a que el flujo original complete para emitir los elementos finales.
Flux<String> letras = Flux.just("A", "B", "C", "D", "E");
Flux<String> ultimasDosLetras = letras.takeLast(2);
ultimasDosLetras.subscribe(System.out::println);
// Salida: D, E
En este caso, el Flux ultimasDosLetras
emitirá las últimas dos letras de la secuencia. El operador **takeLast**
es útil cuando se requiere obtener elementos finales después de procesar o recibir todo el flujo.
Operador skip
El operador skip
omite las primeras N
emisiones del flujo original y emite el resto. Es útil para ignorar elementos iniciales que no son relevantes para el procesamiento posterior.
Flux<Integer> numeros = Flux.range(1, 10);
Flux<Integer> sinPrimerosCinco = numeros.skip(5);
sinPrimerosCinco.subscribe(System.out::println);
// Salida: 6, 7, 8, 9, 10
Aquí, se omiten los primeros cinco números, y el Flux sinPrimerosCinco
emite del 6 al 10. El operador **skip**
permite focalizarse en una porción específica del flujo, eliminando emisiones iniciales innecesarias.
Operador skipLast
El operador skipLast
descarta las últimas N
emisiones de un flujo. Como takeLast
, necesita esperar a que el flujo original complete para determinar cuáles elementos omitir al final.
Flux<String> palabras = Flux.just("uno", "dos", "tres", "cuatro", "cinco");
Flux<String> sinUltimasDos = palabras.skipLast(2);
sinUltimasDos.subscribe(System.out::println);
// Salida: uno, dos, tres
En este ejemplo, el Flux sinUltimasDos
emite todas las palabras excepto las dos últimas. El operador **skipLast**
es útil cuando se desean excluir elementos finales de un flujo una vez completado.
Combinación de operadores de filtrado
Los operadores de filtrado pueden combinarse para lograr un control más granular sobre las emisiones. Por ejemplo:
Flux<Integer> numeros = Flux.range(1, 100);
Flux<Integer> resultado = numeros
.filter(n -> n % 2 == 0)
.skip(10)
.take(5);
resultado.subscribe(System.out::println);
// Emite los números pares del 22 al 30
En este caso, se filtran los números pares, se omiten los primeros diez y se toman los siguientes cinco. La capacidad de encadenar operadores permite construir flujos reactivos altamente personalizados y eficientes.
Consideraciones sobre la asincronía:
Es importante tener en cuenta que operadores como takeLast
y skipLast
requieren conocer el tamaño total del flujo, por lo que pueden introducir comportamientos asíncronos que afecten al rendimiento. En flujos infinitos o muy grandes, su uso debe evaluarse cuidadosamente.
Uso de operadores en contexto
En aplicaciones reactivas con Spring WebFlux, estos operadores permiten procesar y transformar datos de manera no bloqueante. Por ejemplo, al manejar peticiones HTTP que devuelven listas filtradas:
@GetMapping("/usuarios/activos")
public Flux<Usuario> obtenerUsuariosActivos() {
return usuarioRepository.findAll()
.filter(Usuario::esActivo)
.take(50);
}
En este controlador, se obtienen todos los usuarios desde el repositorio reactivo y se filtran aquellos que están activos, limitando la respuesta a los primeros 50 usuarios. El uso de operadores de filtrado garantiza respuestas rápidas y eficientes, optimizando el consumo de recursos.
Los operadores de filtrado como **filter**
, **take**
, **takeLast**
, **skip**
y **skipLast**
son herramientas esenciales en la programación reactiva con Reactor. Permiten manipular y controlar la emisión de datos de forma declarativa y eficiente, adaptándose a las necesidades específicas de cada aplicación.
Operadores de combinación: merge, concat, zip, startWith
Los operadores de combinación en Reactor permiten fusionar y coordinar múltiples flujos reactivos, facilitando el manejo concurrente de secuencias de datos. Estos operadores son fundamentales para construir flujos complejos y sincronizar emisiones de diferentes fuentes.
Operador merge
El operador merge
combina las emisiones de varios flujos Flux en uno solo, emitiendo los elementos tan pronto como estén disponibles. Este operador es adecuado cuando se desea mezclar flujos que pueden emitir datos de forma asíncrona.
Flux<String> flujo1 = Flux.just("A", "B", "C");
Flux<String> flujo2 = Flux.just("1", "2", "3");
Flux<String> flujoCombinado = Flux.merge(flujo1, flujo2);
flujoCombinado.subscribe(System.out::println);
// Posible salida: A, 1, B, 2, C, 3
En este ejemplo, **flujoCombinado**
emitirá los elementos de flujo1
y flujo2
intercalados según la disponibilidad, sin garantizar ningún orden específico. El operador **merge**
es útil cuando el orden de las emisiones no es crítico y se busca maximizar el rendimiento mediante la concurrencia.
Operador concat
El operador concat
también combina múltiples flujos Flux, pero a diferencia de merge
, asegura que las emisiones ocurren en secuencia, uno tras otro. No iniciará la emisión del siguiente flujo hasta que el anterior haya completado.
Flux<String> flujo1 = Flux.just("A", "B", "C");
Flux<String> flujo2 = Flux.just("1", "2", "3");
Flux<String> flujoConcatenado = Flux.concat(flujo1, flujo2);
flujoConcatenado.subscribe(System.out::println);
// Salida: A, B, C, 1, 2, 3
Aquí, **flujoConcatenado**
emitirá primero todos los elementos de flujo1
y luego los de flujo2
, manteniendo el orden de los flujos originales. El operador **concat**
es apropiado cuando es importante preservar la secuencia de los datos.
Operador zip
El operador zip
combina varios flujos Flux coordinando sus emisiones en función de la posición. Emite un nuevo elemento cuando todos los flujos fuente han emitido un elemento, combinándolos mediante una función especificada.
Flux<String> letras = Flux.just("A", "B", "C");
Flux<Integer> numeros = Flux.just(1, 2, 3);
Flux<String> flujoEmparejado = Flux.zip(letras, numeros, (letra, numero) -> letra + numero);
flujoEmparejado.subscribe(System.out::println);
// Salida: A1, B2, C3
En este ejemplo, **flujoEmparejado**
emitirá combinaciones de los elementos de letras
y numeros
en función de su posición respectiva. El operador **zip**
es útil cuando se requiere sincronizar flujos y crear pares o tuplas de datos correlacionados.
Es importante destacar que si los flujos tienen longitud diferente, **zip**
completará cuando el flujo más corto haya emitido todos sus elementos, ya que no habrá más elementos para combinar.
Operador startWith
El operador startWith
permite prefijar elementos o flujos al inicio de un flujo existente. Es decir, las emisiones especificadas en **startWith**
se producirán antes que las del flujo original.
Flux<String> flujoOriginal = Flux.just("X", "Y", "Z");
Flux<String> flujoConPrefijo = flujoOriginal.startWith("A", "B", "C");
flujoConPrefijo.subscribe(System.out::println);
// Salida: A, B, C, X, Y, Z
En este caso, **flujoConPrefijo**
emitirá primero los elementos "A", "B", "C", y luego continuará con las emisiones de flujoOriginal
. El operador **startWith**
es útil para agregar valores iniciales o mensajes de encabezado antes de un flujo de datos.
Además, **startWith**
puede recibir otro flujo Flux o un Mono como prefijo:
Flux<String> prefijo = Flux.just("Inicio1", "Inicio2");
Flux<String> flujoConPrefijoFlux = flujoOriginal.startWith(prefijo);
flujoConPrefijoFlux.subscribe(System.out::println);
// Salida: Inicio1, Inicio2, X, Y, Z
Combinación de operadores
Los operadores de combinación pueden encadenarse para construir flujos más elaborados. Por ejemplo, combinar múltiples flujos con prefijos y mantener el orden:
Flux<String> flujo1 = Flux.just("Datos1", "Datos2");
Flux<String> flujo2 = Flux.just("MásDatos1", "MásDatos2");
Flux<String> prefijo = Flux.just("Inicio");
Flux<String> flujoFinal = Flux.concat(prefijo, flujo1, flujo2);
flujoFinal.subscribe(System.out::println);
// Salida: Inicio, Datos1, Datos2, MásDatos1, MásDatos2
En este ejemplo, se utiliza **concat**
para asegurar que las emisiones ocurran en el orden especificado, iniciando con el prefijo y continuando con los datos de flujo1
y flujo2
.
Consideraciones sobre la concurrencia y el rendimiento
El operador **merge**
ejecuta los flujos fuente de manera concurrente, lo que puede mejorar el rendimiento en contextos donde los flujos producen emisiones de forma asíncrona o a diferentes ritmos. Sin embargo, es importante manejar adecuadamente la concurrencia y estar atento a posibles condiciones de carrera.
Por otro lado, **concat**
ejecuta los flujos secuencialmente, esperando a que cada flujo complete antes de iniciar el siguiente. Esto garantiza un uso más predecible de los recursos, pero puede ser menos eficiente en términos de tiempo total si los flujos tienen demoras intrínsecas.
Uso de los operadores en contextos prácticos
Estos operadores son especialmente útiles en aplicaciones reactivas que requieren combinar datos de múltiples fuentes, como llamadas a servicios externos, acceso a bases de datos o procesamiento de eventos.
Por ejemplo, supongamos que tenemos dos repositorios reactivos y deseamos obtener y combinar datos de ambos:
Flux<Producto> productosNacionales = productoRepository.findProductosNacionales();
Flux<Producto> productosImportados = productoRepository.findProductosImportados();
Flux<Producto> todosLosProductos = Flux.merge(productosNacionales, productosImportados);
En este caso, **todosLosProductos**
emitirá productos tanto nacionales como importados conforme estén disponibles, aprovechando la asíncronía para mejorar la eficiencia.
Si es necesario procesar los productos en un orden específico, podríamos utilizar **concat**
:
Flux<Producto> todosLosProductosOrdenados = Flux.concat(productosNacionales, productosImportados);
Así, primero se procesarán los productos nacionales y, una vez completados, se iniciará con los importados.
Sincronización de flujos con zip
Para situaciones en las que es necesario sincronizar datos relacionados, **zip**
es la herramienta adecuada. Por ejemplo, combinar información de usuarios y sus direcciones:
Flux<Usuario> usuarios = usuarioRepository.findAll();
Flux<Direccion> direcciones = direccionRepository.findAll();
Flux<UsuarioConDireccion> usuariosConDirecciones = Flux.zip(usuarios, direcciones)
.map(tuple -> new UsuarioConDireccion(tuple.getT1(), tuple.getT2()));
Aquí, se asume que los flujos usuarios
y direcciones
están alineados de manera que el primer usuario corresponde a la primera dirección, y así sucesivamente. El operador **zip**
coordina las emisiones para que esto sea posible.
Agregando elementos iniciales con startWith
Para situaciones donde se desea iniciar un flujo con valores predeterminados o mensajes de inicio, **startWith**
es la elección adecuada. Por ejemplo, en una aplicación de chat reactiva:
Flux<Mensaje> mensajes = obtenerMensajesChat();
Flux<Mensaje> bienvenida = Flux.just(new Mensaje("Sistema", "Bienvenido al chat"));
Flux<Mensaje> flujoChat = mensajes.startWith(bienvenida);
Así, cuando un usuario se conecta, recibirá primero el mensaje de bienvenida, seguido de los mensajes del chat en tiempo real.
Estos operadores de combinación amplían las capacidades de flujo en Reactor, permitiendo manejar y coordinar múltiples secuencias de datos de manera eficiente y flexible. Su comprensión y uso adecuado son cruciales para desarrollar aplicaciones reactivas robustas con Spring Boot 3.
Operadores de errores: onErrorReturn, onErrorResume, doOnNext, doOnError, doOnComplete
En la programación reactiva con Reactor, el manejo de errores es esencial para garantizar la resiliencia y confiabilidad de las aplicaciones. Los operadores de error permiten gestionar excepciones y eventos inesperados de manera fluida dentro de los flujos reactivos.
Operador onErrorReturn
El operador onErrorReturn
ofrece una forma sencilla de proporcionar un valor alternativo en caso de que ocurra un error en el flujo. Al interceptar la señal de error, emite un valor predeterminado y completa el flujo de manera normal.
Flux<String> flujoConError = Flux.just("1", "2", "a", "4")
.map(dato -> {
// Intento de conversión que puede fallar
return Integer.parseInt(dato);
})
.map(numero -> "Número: " + numero)
.onErrorReturn("Valor por defecto en caso de error");
flujoConError.subscribe(System.out::println);
En este ejemplo, al intentar convertir la cadena "a" a entero, se produce una excepción. Gracias a onErrorReturn
, el flujo emite "Valor por defecto en caso de error" y completa sin propagar el error al suscriptor.
Operador onErrorResume
El operador onErrorResume
permite recuperar de un error cambiando a un flujo alternativo. A diferencia de onErrorReturn
, que emite un solo valor, onErrorResume
puede continuar la emisión con otro **Publisher**
.
Flux<String> flujoRecuperado = Flux.just("5", "6", "b", "8")
.map(dato -> Integer.parseInt(dato))
.map(numero -> "Número: " + numero)
.onErrorResume(error -> {
// Logica de recuperación con un nuevo flujo
return Flux.just("Error recuperado, continuando con nuevos datos", "9", "10")
.map(nuevoDato -> "Número: " + nuevoDato);
});
flujoRecuperado.subscribe(System.out::println);
En este caso, al ocurrir un error, onErrorResume
cambia el flujo original por uno nuevo, permitiendo que la aplicación siga procesando otros datos sin interrumpirse.
Operador doOnNext
El operador doOnNext
se utiliza para ejecutar acciones secundarias en cada elemento emitido sin alterar el flujo. Es útil para registrar información, auditar eventos o realizar operaciones auxiliares.
Flux<String> flujoAuditado = Flux.just("X", "Y", "Z")
.doOnNext(elemento -> {
// Acción secundaria, como un registro
System.out.println("Elemento procesado: " + elemento);
})
.map(elemento -> "Procesado: " + elemento);
flujoAuditado.subscribe(System.out::println);
Aquí, doOnNext
imprime cada elemento antes de que sea transformado por el operador map
. Esto permite monitorear el flujo sin interferir en la secuencia de datos.
Operador doOnError
El operador doOnError
permite ejecutar una acción cuando ocurre un error, sin manejarlo ni prevenir su propagación. Es útil para registrar errores o realizar operaciones de limpieza.
Flux<String> flujoConRegistroDeError = Flux.just("10", "20", "c", "40")
.map(dato -> Integer.parseInt(dato))
.doOnError(error -> {
// Registro del error
System.err.println("Error detectado: " + error.getMessage());
})
.map(numero -> "Número: " + numero);
flujoConRegistroDeError.subscribe(
System.out::println,
error -> System.err.println("Suscriptor recibe el error: " + error.getMessage())
);
Aunque doOnError
registra el error, no lo maneja. El error sigue propagándose al suscriptor, quien debe decidir cómo reaccionar ante la situación.
Operador doOnComplete
El operador doOnComplete
se activa cuando el flujo completa exitosamente, permitiendo ejecutar acciones finales. Es útil para operaciones de notificación, liberación de recursos o actualización de estado.
Flux<String> flujoConFinalizacion = Flux.just("A", "B", "C")
.map(String::toLowerCase)
.doOnComplete(() -> {
// Acción al completar el flujo
System.out.println("Flujo completado exitosamente");
});
flujoConFinalizacion.subscribe(System.out::println);
En este ejemplo, al terminar de procesar todos los elementos, doOnComplete
imprime un mensaje indicando la finalización exitosa del flujo.
Combinación de operadores para manejo robusto de errores
Es común combinar varios operadores de error para construir flujos más robustos. Por ejemplo, se puede usar doOnError
para registrar el error y luego onErrorResume
para recuperarse:
Flux<String> flujoRobusto = Flux.just("100", "200", "d", "400")
.map(dato -> Integer.parseInt(dato))
.doOnError(error -> {
System.err.println("Error registrado: " + error.getMessage());
})
.onErrorResume(error -> {
return Flux.just(300, 400, 500).map(numero -> "Número recuperado: " + numero);
});
flujoRobusto.subscribe(System.out::println);
Aquí, el flujo registra el error y se recupera con nuevos datos, manteniendo la aplicación operativa y evitando interrupciones inesperadas.
Manejo de errores en operaciones asíncronas
En operaciones que involucran llamadas asíncronas, el manejo de errores adquiere mayor relevancia. Por ejemplo, al realizar peticiones a servicios externos:
Mono<String> llamadaExterna = realizarLlamadaExterna()
.doOnError(error -> {
// Registro del error en la llamada externa
System.err.println("Fallo en llamada externa: " + error.getMessage());
})
.onErrorReturn("Respuesta predeterminada");
llamadaExterna.subscribe(System.out::println);
Al tratarse de una operación asíncrona, es fundamental gestionar adecuadamente los errores para proporcionar respuestas consistentes y evitar bloqueos en el flujo principal.
Importancia del manejo de errores en aplicaciones reactivas
El uso adecuado de estos operadores es crítico para mantener la integridad y confiabilidad de las aplicaciones reactivas. Un manejo de errores bien implementado mejora la experiencia del usuario y facilita el mantenimiento del sistema.
Al diseñar flujos con Reactor, es esencial anticipar posibles fallos y decidir cómo gestionarlos. Los operadores onErrorReturn
y onErrorResume
ofrecen estrategias para continuar operando ante errores, mientras que doOnError
y doOnComplete
permiten agregar lógica adicional sin alterar la secuencia principal.
Ejemplo práctico en un contexto web
Imagine que se desarrolla un servicio web que procesa solicitudes de usuarios y puede fallar al consultar una base de datos:
@GetMapping("/datos/{id}")
public Mono<ResponseEntity<String>> obtenerDatos(@PathVariable String id) {
return servicioDatos.obtenerDatoPorId(id)
.doOnError(error -> {
// Registro del error
System.err.println("Error al obtener datos: " + error.getMessage());
})
.onErrorReturn("Información no disponible")
.map(dato -> ResponseEntity.ok(dato));
}
En este caso, si ocurre un error al obtener los datos, el servicio responde con un mensaje alternativo, manteniendo la disponibilidad del endpoint y proporcionando al cliente una respuesta coherente.
El manejo de errores mediante operadores como onErrorReturn
, onErrorResume
, doOnNext
, doOnError
y doOnComplete
es fundamental en la programación reactiva con Reactor. Estos operadores proporcionan las herramientas necesarias para construir flujos resilientes y garantizar que las aplicaciones respondan de manera apropiada ante situaciones inesperadas, mejorando así la robustez y confiabilidad del sistema.
Operadores de acumulación y estadística: reduce, count
En la programación reactiva con Reactor, existen operadores que permiten acumular y realizar operaciones estadísticas sobre los elementos emitidos por un flujo. Los operadores **reduce**
y **count**
son fundamentales para resumir secuencias de datos de manera eficiente.
El operador **reduce**
realiza una operación de acumulación sobre los elementos de un flujo, produciendo un único resultado. Funciona aplicando una función binaria de manera iterativa sobre cada elemento emitido y el acumulador anterior. Este operador es útil para calcular sumas, productos o concatenaciones de elementos.
Por ejemplo, para sumar todos los números emitidos por un Flux:
Flux<Integer> numeros = Flux.range(1, 5); // Emite números del 1 al 5
Mono<Integer> suma = numeros.reduce((acumulador, siguiente) -> acumulador + siguiente);
suma.subscribe(resultado -> System.out.println("La suma es: " + resultado));
// Salida: La suma es: 15
En este caso, **reduce**
acumula los valores sumándolos, y el resultado final es emitido por un Mono. Es importante destacar que el operador retorna un Mono, ya que el resultado es un único valor.
Si se desea proporcionar un valor inicial al acumulador, se puede utilizar la versión sobrecargada de **reduce**
que acepta un valor de identidad:
Mono<Integer> sumaConIdentidad = numeros.reduce(10, (acumulador, siguiente) -> acumulador + siguiente);
sumaConIdentidad.subscribe(resultado -> System.out.println("La suma con identidad es: " + resultado));
// Salida: La suma con identidad es: 25
Aquí, el valor inicial del acumulador es 10, por lo que la suma total será 25. El uso de un valor de identidad puede ser útil en escenarios donde se requiere un punto de partida específico para la acumulación.
El operador **count**
calcula el número total de elementos emitidos por un flujo. Devuelve un Mono con la cantidad de emisiones, lo cual es especialmente útil para estadísticas y métricas.
Por ejemplo, para contar el número de elementos en un flujo de cadenas:
Flux<String> palabras = Flux.just("reactivo", "programación", "asíncrono", "backpressure");
Mono<Long> totalPalabras = palabras.count();
totalPalabras.subscribe(cantidad -> System.out.println("Número de palabras: " + cantidad));
// Salida: Número de palabras: 4
El operador **count**
se encarga de contar las emisiones y proporcionar el total una vez que el flujo completa. Es importante tener en cuenta el tipo de retorno Mono, ya que la cantidad puede ser un número grande y se utiliza Long para representarlo.
Además de estos operadores, es común combinarlos con otros para realizar cálculos más complejos. Por ejemplo, calcular la longitud total de todas las palabras en un flujo:
Mono<Integer> longitudTotal = palabras
.map(String::length)
.reduce(0, (acumulador, longitud) -> acumulador + longitud);
longitudTotal.subscribe(total -> System.out.println("Longitud total de todas las palabras: " + total));
// Salida: Longitud total de todas las palabras: 36
En este ejemplo, primero se mapea cada palabra a su longitud utilizando **map**
, y luego se acumulan las longitudes con **reduce**
, iniciando desde cero. El resultado es la suma de las longitudes de todas las palabras emitidas.
Otra aplicación práctica es concatenar todas las cadenas de un flujo en una sola cadena:
Mono<String> fraseCompleta = palabras.reduce((acumulador, siguiente) -> acumulador + " " + siguiente);
fraseCompleta.subscribe(frase -> System.out.println("Frase completa: " + frase));
// Salida: Frase completa: reactivo programación asíncrono backpressure
El operador **reduce**
permite combinar las palabras en una única frase. Este patrón es útil para generar resultados agregados a partir de emisiones individuales.
Es relevante mencionar que si se utiliza **reduce**
en un flujo vacío, el resultado será un Mono vacío, ya que no hay elementos para acumular. Para manejar este caso, se puede proporcionar un valor de identidad o utilizar el operador **defaultIfEmpty**
:
Flux<Integer> flujoVacio = Flux.empty();
Mono<Integer> sumaVacia = flujoVacio.reduce((a, b) -> a + b)
.defaultIfEmpty(0); // Proporciona un valor por defecto si el Mono está vacío
sumaVacia.subscribe(resultado -> System.out.println("Resultado en flujo vacío: " + resultado));
// Salida: Resultado en flujo vacío: 0
El operador **defaultIfEmpty**
emite un valor predeterminado en caso de que el Mono generado por **reduce**
no emita ningún elemento debido a un flujo vacío.
En cuanto al operador **count**
, también puede ser útil en combinación con otros operadores para realizar validaciones o controlar el flujo de la aplicación. Por ejemplo, verificar si un flujo contiene un número específico de elementos:
Mono<Boolean> tieneTresElementos = palabras.count()
.map(cantidad -> cantidad == 3);
tieneTresElementos.subscribe(esTres -> System.out.println("¿El flujo tiene tres elementos? " + esTres));
// Salida: ¿El flujo tiene tres elementos? false
En este caso, se cuenta el número de palabras y luego se verifica si la cantidad es igual a tres, emitiendo un valor booleano.
Para flujos grandes, es importante considerar el impacto en el rendimiento al utilizar estos operadores, ya que **reduce**
y **count**
requieren procesar todos los elementos antes de emitir el resultado final. En situaciones donde el flujo es infinito o muy extenso, puede ser necesario aplicar operadores como **take**
para limitar el número de emisiones:
Flux<Long> flujoInfinito = Flux.interval(Duration.ofMillis(100));
Mono<Long> conteoLimitado = flujoInfinito
.take(50) // Toma solo los primeros 50 elementos
.count();
conteoLimitado.subscribe(cantidad -> System.out.println("Cantidad de elementos tomados: " + cantidad));
// Salida: Cantidad de elementos tomados: 50
Aquí, el operador **take**
limita el flujo a las primeras 50 emisiones, permitiendo que **count**
pueda calcular el total sin esperar indefinidamente.
Es relevante aprovechar los operadores de acumulación y estadística para extraer información valiosa de los flujos reactivos. Por ejemplo, calcular el promedio de una serie de números:
Flux<Integer> valores = Flux.just(10, 20, 30, 40, 50);
Mono<Double> promedio = valores
.collectList() // Recopila los elementos en una lista
.flatMap(lista -> {
int suma = lista.stream().mapToInt(Integer::intValue).sum();
double promedioCalculado = (double) suma / lista.size();
return Mono.just(promedioCalculado);
});
promedio.subscribe(valor -> System.out.println("Promedio: " + valor));
// Salida: Promedio: 30.0
En este ejemplo, se utiliza **collectList**
para recopilar todos los elementos emitidos en una lista, y luego se calcula el promedio. Aunque no existe un operador específico **average**
en Reactor, se puede lograr este resultado combinando operadores y aplicando lógica personalizada.
Otro caso de uso es encontrar el valor máximo o mínimo en un flujo:
Mono<Integer> maximo = valores.reduce(Integer::max);
maximo.subscribe(valor -> System.out.println("Valor máximo: " + valor));
// Salida: Valor máximo: 50
Utilizando **reduce**
con **Integer::max**
, se obtiene el valor máximo de los elementos emitidos. De manera similar, se puede utilizar **Integer::min**
para obtener el valor mínimo.
Es fundamental entender que los operadores **reduce**
y **count**
procesan el flujo completo antes de emitir un resultado. Esto significa que se debe tener cautela al utilizarlos en flujos muy largos o potencialmente infinitos, ya que pueden causar consumo excesivo de memoria o bloquear la emisión indefinidamente.
En aplicaciones donde se requiere un procesamiento más eficiente de grandes volúmenes de datos, es recomendable explorar operadores como **reduce**
con reducción parcial o técnicas de ventana (windowing), aunque estas caen fuera del alcance de esta sección.
Finalmente, es importante mencionar que los operadores de acumulación y estadística son esenciales para analizar y resumir datos en aplicaciones reactivas. Su uso adecuado permite extraer insights y realizar cálculos agregados de manera declarativa, aprovechando el modelo reactivo para procesar secuencias de datos de forma eficiente y no bloqueante.
Ejercicios de esta lección Operadores reactivos básicos
Evalúa tus conocimientos de esta lección Operadores reactivos básicos con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.
API Query By Example (QBE)
Identificadores y relaciones JPA
Borrar datos de base de datos
Web y Test Starters
Métodos find en repositorios
Controladores Spring MVC
Inserción de datos
CRUD Customers Spring MVC + Spring Data JPA
Backend API REST con Spring Boot
Controladores Spring REST
Uso de Spring con Thymeleaf
API Specification
Registro de usuarios
Crear entidades JPA
Asociaciones en JPA
Asociaciones de entidades JPA
Integración con Vue
Consultas JPQL
Open API y cómo agregarlo en Spring Boot
Uso de Controladores REST
Repositorios reactivos
Inyección de dependencias
Introducción a Spring Boot
CRUD y JPA Repository
Inyección de dependencias
Vista en Spring MVC con Thymeleaf
Servicios en Spring
Operadores Reactivos
Configuración de Vue
Entidades JPA
Integración con Angular
API Specification
API Query By Example (QBE)
Controladores MVC
Anotaciones y mapeo en JPA
Consultas JPQL con @Query en Spring Data JPA
Repositorios Spring Data
Inyección de dependencias
Data JPA y Mail Starters
Configuración de Angular
Controladores Spring REST
Configuración de Controladores MVC
Consultas JPQL con @Query en Spring Data JPA
Actualizar datos de base de datos
Verificar token JWT en peticiones
Login de usuarios
Integración con React
Configuración de React
Todas las lecciones de SpringBoot
Accede a todas las lecciones de SpringBoot y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Spring Boot
Introducción Y Entorno
Spring Boot Starters
Introducción Y Entorno
Inyección De Dependencias
Introducción Y Entorno
Controladores Spring Mvc
Spring Web
Vista En Spring Mvc Con Thymeleaf
Spring Web
Controladores Spring Rest
Spring Web
Open Api Y Cómo Agregarlo En Spring Boot
Spring Web
Servicios En Spring
Spring Web
Clientes Resttemplate Y Restclient
Spring Web
Rxjava En Spring Web
Spring Web
Crear Entidades Jpa
Persistencia Spring Data
Asociaciones De Entidades Jpa
Persistencia Spring Data
Repositorios Spring Data
Persistencia Spring Data
Métodos Find En Repositorios
Persistencia Spring Data
Inserción De Datos
Persistencia Spring Data
Actualizar Datos De Base De Datos
Persistencia Spring Data
Borrar Datos De Base De Datos
Persistencia Spring Data
Consultas Jpql Con @Query En Spring Data Jpa
Persistencia Spring Data
Api Query By Example (Qbe)
Persistencia Spring Data
Api Specification
Persistencia Spring Data
Repositorios Reactivos
Persistencia Spring Data
Introducción E Instalación De Apache Kafka
Mensajería Asíncrona
Crear Proyecto Con Apache Kafka
Mensajería Asíncrona
Creación De Producers
Mensajería Asíncrona
Creación De Consumers
Mensajería Asíncrona
Kafka Streams En Spring Boot
Mensajería Asíncrona
Introducción A Spring Webflux
Reactividad Webflux
Spring Data R2dbc
Reactividad Webflux
Controlador Rest Reactivo Basado En Anotaciones
Reactividad Webflux
Controlador Rest Reactivo Funcional
Reactividad Webflux
Operadores Reactivos Básicos
Reactividad Webflux
Operadores Reactivos Avanzados
Reactividad Webflux
Cliente Reactivo Webclient
Reactividad Webflux
Introducción A Spring Security
Seguridad Con Spring Security
Seguridad Basada En Formulario En Mvc Con Thymeleaf
Seguridad Con Spring Security
Registro De Usuarios
Seguridad Con Spring Security
Login De Usuarios
Seguridad Con Spring Security
Verificar Token Jwt En Peticiones
Seguridad Con Spring Security
Seguridad Jwt En Api Rest Spring Web
Seguridad Con Spring Security
Seguridad Jwt En Api Rest Reactiva Spring Webflux
Seguridad Con Spring Security
Autenticación Y Autorización Con Anotaciones
Seguridad Con Spring Security
Testing Unitario De Componentes Y Servicios
Testing Con Spring Test
Testing De Repositorios Spring Data Jpa
Testing Con Spring Test
Testing Controladores Spring Mvc Con Thymeleaf
Testing Con Spring Test
Testing Controladores Rest Con Json
Testing Con Spring Test
Testing De Aplicaciones Reactivas Webflux
Testing Con Spring Test
Testing De Seguridad Spring Security
Testing Con Spring Test
Testing Con Apache Kafka
Testing Con Spring Test
Integración Con Angular
Integración Frontend
Integración Con React
Integración Frontend
Integración Con Vue
Integración Frontend
En esta lección
Objetivos de aprendizaje de esta lección
- Operadores de creación
- Operadores de transformación
- Operadores de filtrado
- Operadores de combinación
- Operadores de errores
- Operadores de acumulación y estadística