SpringBoot
Tutorial SpringBoot: Operadores reactivos avanzados
Aprende programación reactiva con los operadores Reactor avanzados para control de flujo, programación temporal, combinación, backpressure, schedulers en Spring WebFlux sobre Spring Boot.
Aprende SpringBoot GRATIS y certifícateControl de flujo: buffer, window, limitRate, throttleFirst, throttleLast
En programación reactiva, el control de flujo es esencial para gestionar adecuadamente la emisión y consumo de datos en sistemas asíncronos. Reactor ofrece operadores avanzados como buffer
, window
, limitRate
, throttleFirst
y throttleLast
que permiten regular y optimizar el manejo de secuencias de datos.
El operador buffer
recopila elementos emitidos por un Flux en una colección, emitiéndolos juntos como una lista cuando se cumple una condición específica. Por ejemplo, se puede agrupar cada cierto número de elementos:
Flux.range(1, 10)
.buffer(3)
.subscribe(lista -> System.out.println("Buffer recibido: " + lista));
Este código agrupa los números del 1 al 10 en listas de tres elementos, emitiendo: [1, 2, 3]
, [4, 5, 6]
, [7, 8, 9]
y [10]
. El uso de buffer
es útil para procesos que requieren manejar conjuntos de datos simultáneamente, mejorando la eficiencia al reducir la frecuencia de operaciones sobre elementos individuales.
El operador window
es similar a buffer
, pero en lugar de recolectar los elementos en una colección, los agrupa en sub-Flux. Cada sub-Flux representa una ventana de elementos que puede ser procesada de forma independiente y reactiva:
Flux.range(1, 10)
.window(3)
.subscribe(ventana ->
ventana.collectList().subscribe(lista ->
System.out.println("Ventana recibida: " + lista)));
Aquí, window(3)
divide el flujo original en ventanas de tres elementos. Esto es especialmente útil cuando se desea aplicar operaciones reactivas a grupos de elementos manteniendo la naturaleza asíncrona del procesamiento.
Con limitRate
, es posible controlar la velocidad a la que un suscriptor consume elementos de un Flux. Este operador limita el número de elementos solicitados al publicador, evitando sobrecargas en el consumidor:
Flux.range(1, 1000)
.limitRate(100)
.subscribe(elemento -> procesar(elemento));
En este ejemplo, limitRate(100)
permite que el suscriptor procese los elementos en lotes de 100, evitando que una cantidad excesiva de datos inunde el sistema y potencialmente cause problemas de rendimiento o memoria.
Los operadores throttleFirst
y throttleLast
permiten controlar la emisión de elementos en función del tiempo. Con throttleFirst
, se emite el primer elemento que llega en cada intervalo de tiempo especificado, ignorando los siguientes hasta que el intervalo se reinicia:
Flux.interval(Duration.ofMillis(200))
.throttleFirst(Duration.ofSeconds(1))
.take(5)
.subscribe(valor -> System.out.println("ThrottleFirst recibido: " + valor));
Este código emitirá un valor cada segundo, capturando el primer elemento disponible en ese lapso. Es útil en escenarios donde se necesita limitar la tasa de procesamiento, como en la implementación de funcionalidades de anti-rebote en interfaces de usuario.
Por el contrario, throttleLast
(también conocido como sample
) emite el último elemento producido en cada intervalo de tiempo especificado:
Flux.interval(Duration.ofMillis(200))
.throttleLast(Duration.ofSeconds(1))
.take(5)
.subscribe(valor -> System.out.println("ThrottleLast recibido: " + valor));
En este caso, se emite el último valor generado dentro de cada segundo. Este patrón es adecuado cuando se requiere obtener el dato más reciente en intervalos regulares, como en actualizaciones periódicas de estado.
Estos operadores son herramientas poderosas para regular el flujo de datos en aplicaciones reactivas, permitiendo gestionar eficientemente recursos y adaptarse a distintas condiciones de carga. Al utilizar buffer
y window
, se controlan los agrupamientos de elementos; con limitRate
, se ajusta la velocidad de consumo; y con throttleFirst
y throttleLast
, se modula la emisión de elementos en función del tiempo.
Programación temporal: delayElements, delaySequence, timeout
En aplicaciones reactivas, el manejo de operaciones temporales es esencial para controlar la sincronización y el ritmo en que se procesan los eventos. Reactor proporciona operadores como delayElements
, delaySequence
y timeout
que permiten introducir retrasos y gestionar tiempo de espera en flujos reactivos.
El operador delayElements
introduce un retraso entre la emisión de cada elemento de un Flux o Mono. Esto es útil cuando se desea espaciar la emisión de eventos para evitar saturar al consumidor o simular un flujo de datos con retardo. Por ejemplo:
Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.subscribe(elemento -> System.out.println("Elemento recibido: " + elemento));
En este código, cada elemento se emite con un retraso de un segundo respecto al anterior. La salida será:
Elemento recibido: 1
Elemento recibido: 2
Elemento recibido: 3
Elemento recibido: 4
Elemento recibido: 5
con un segundo de diferencia entre cada línea. Es importante destacar que delayElements
no bloquea el hilo desde el que se emite el Flux; en su lugar, programa la emisión de cada elemento utilizando un Scheduler por defecto, que es el parallel()
en Reactor.
Si se necesita retrasar la emisión completa de la secuencia, se utiliza delaySequence
. Este operador retrasa el inicio de la emisión por un periodo específico, manteniendo el intervalo original entre elementos. Un ejemplo práctico es:
Flux<String> flujoMensajes = Flux.just("Mensaje 1", "Mensaje 2", "Mensaje 3")
.delaySequence(Duration.ofSeconds(5));
flujoMensajes.subscribe(mensaje -> System.out.println("Recibido: " + mensaje));
Aquí, el flujo comenzará a emitir los mensajes después de un retraso total de cinco segundos, pero sin introducir demoras entre cada mensaje. La salida será:
(Tras 5 segundos de espera)
Recibido: Mensaje 1
Recibido: Mensaje 2
Recibido: Mensaje 3
Este operador es útil cuando se requiere una espera inicial antes de procesar una secuencia completa, por ejemplo, para esperar a que un recurso esté disponible o para sincronizar flujos.
Por otro lado, el operador timeout
es fundamental para manejar situaciones en las que una operación reactiva puede tardar más de lo esperado. Este operador permite establecer un tiempo máximo de espera para la emisión de los elementos. Si se supera este tiempo, se produce un error de tipo TimeoutException
. Un ejemplo de uso es:
Flux<Long> flujoIntervalo = Flux.interval(Duration.ofSeconds(2))
.timeout(Duration.ofSeconds(1))
.onErrorReturn(-1L);
flujoIntervalo.subscribe(valor -> System.out.println("Valor: " + valor));
En este caso, como el intervalo de emisión es de dos segundos y el timeout
está configurado en un segundo, se producirá una excepción después del primer segundo sin recibir ningún elemento. Gracias a onErrorReturn
, se captura el error y se emite -1L
en su lugar, obteniendo la salida:
Valor: -1
El operador timeout
es esencial para evitar que un flujo reactivo quede bloqueado indefinidamente ante demoras inesperadas, permitiendo implementar políticas de reintento o proporcionar valores por defecto.
También es posible proporcionar un flujo alternativo al producirse el timeout. Por ejemplo:
Flux<String> flujoDatos = obtenerDatosConRetraso()
.timeout(Duration.ofSeconds(3), flujoAlternativo());
flujoDatos.subscribe(dato -> System.out.println("Dato: " + dato));
En este código, si obtenerDatosConRetraso()
no emite ningún elemento en tres segundos, el flujo pasará a emitir los elementos proporcionados por flujoAlternativo()
. Esto permite ofrecer una ruta de respaldo ante retrasos, mejorando la resiliencia de la aplicación.
Es relevante mencionar que el uso de timeout
debe ser cuidadoso en sistemas donde las demoras pueden ser comunes, ya que un tiempo de espera demasiado corto puede provocar errores frecuentes. Ajustar adecuadamente los tiempos y manejar las excepciones de forma apropiada es clave para un comportamiento robusto.
En resumen, los operadores delayElements
, delaySequence
y timeout
ofrecen mecanismos precisos para controlar el timing en flujos reactivos. Permiten introducir retrasos estratégicos, simular latencias y gestionar tiempos de espera, aportando flexibilidad y control en la programación reactiva con Reactor y Spring Boot 3.
Combinación avanzada: combineLatest, withLatestFrom, groupBy
En aplicaciones reactivas complejas, es común trabajar con múltiples fuentes de datos que necesitan ser combinadas o agrupadas. Los operadores avanzados combineLatest, withLatestFrom y groupBy proporcionan mecanismos eficaces para fusionar y organizar flujos de datos en Reactor.
El operador combineLatest combina varios Publisher y emite un nuevo valor cada vez que alguno de ellos emite, utilizando los últimos valores emitidos por cada uno. Este operador es útil cuando se desea reaccionar ante cualquier cambio en las fuentes, trabajando siempre con los datos más recientes disponibles.
Por ejemplo, si se tienen dos Flux que representan movimientos en los ejes X e Y de un dispositivo, y se desea obtener la posición actual cada vez que alguno de los ejes cambia:
Flux<Integer> ejeX = Flux.just(0, 1, 2, 3)
.delayElements(Duration.ofMillis(500));
Flux<Integer> ejeY = Flux.just(0, -1, -2)
.delayElements(Duration.ofMillis(700));
Flux.combineLatest(ejeX, ejeY, (x, y) -> "Posición: (" + x + ", " + y + ")")
.subscribe(System.out::println);
En este ejemplo, combineLatest emite una nueva posición cada vez que el eje X o el eje Y emiten un valor, utilizando siempre el último valor conocido de cada eje. De esta manera, se obtiene una secuencia que refleja en tiempo real la posición del dispositivo.
El operador withLatestFrom combina un Flux fuente con los últimos valores emitidos por otros Publishers, pero emite valores solo cuando el Flux fuente emite. A diferencia de combineLatest, que emite cuando cualquiera de los Publishers emite, withLatestFrom se centra en el flujo principal.
Por ejemplo, supongamos que se tiene un Flux que emite órdenes de compra y otro que proporciona las tasas de cambio actuales. Se desea combinar cada orden con la tasa de cambio más reciente en el momento de la emisión de la orden:
Flux<Orden> ordenes = obtenerOrdenes(); // Flux<Orden>
Flux<Double> tasaCambio = obtenerTasasCambio(); // Flux<Double>
ordenes.withLatestFrom(tasaCambio, (orden, tasa) -> convertirMoneda(orden, tasa))
.subscribe(ordenConvertida -> procesarOrden(ordenConvertida));
En este ejemplo, cada vez que se emite una orden en el Flux de ordenes, se combina con la tasa de cambio más reciente proporcionada por el Flux tasaCambio. El operador withLatestFrom asegura que utilizamos la información más actualizada de la tasa de cambio en el momento de procesar cada orden.
El operador groupBy sirve para agrupar elementos de un Flux en función de una clave, creando un GroupedFlux por cada grupo identificado. Este operador es útil cuando se necesita procesar o segregar datos basados en algún criterio específico.
Por ejemplo, si se tiene un Flux de eventos de usuario y se desea agruparlos por tipo de evento:
Flux<EventoUsuario> eventos = obtenerEventosUsuario(); // Flux<EventoUsuario>
eventos.groupBy(EventoUsuario::getTipo)
.subscribe(grupo ->
grupo.subscribe(evento ->
System.out.println("Tipo: " + grupo.key() + ", Evento: " + evento)));
En este ejemplo, groupBy agrupa los eventos por su tipo. Cada GroupedFlux emitido representa un grupo de eventos del mismo tipo, y se puede procesar de manera independiente. El método key() proporciona la clave del grupo, permitiendo identificar el tipo de evento asociado.
Es importante considerar que al utilizar groupBy, el número de grupos puede afectar al consumo de memoria, ya que cada grupo mantiene su propio flujo. Por ello, es recomendable aplicar operadores como take, limit o procesar cada grupo de forma que no permanezca abierto indefinidamente.
La combinación de estos operadores avanzados permite construir flujos reactivos más sofisticados, adaptándose a las necesidades de aplicaciones modernas. El uso de combineLatest y withLatestFrom facilita la sincronización de múltiples fuentes de datos, mientras que groupBy habilita el procesamiento segmentado basado en claves.
Errores y reintentos: retry, retryWhen, onErrorContinue
En programación reactiva, el manejo adecuado de errores es fundamental para garantizar la robustez y resiliencia de las aplicaciones. Reactor proporciona operadores como retry
, retryWhen
y onErrorContinue
que permiten gestionar y recuperar flujos ante situaciones de error de manera eficiente.
El operador retry
permite reintentar la suscripción a un Publisher en caso de que ocurra una excepción. Al utilizar retry
, el flujo volverá a suscribirse desde el inicio cada vez que se produzca un error, hasta completar un número de reintentos especificado o indefinidamente si no se indica límite.
Por ejemplo, supongamos un flujo que puede fallar intermitentemente al obtener datos de una fuente externa:
Flux<String> flujoInestable = obtenerDatosInestables();
flujoInestable
.retry(3)
.subscribe(
dato -> System.out.println("Dato recibido: " + dato),
error -> System.err.println("Error después de reintentos: " + error.getMessage())
);
En este ejemplo, el operador retry(3)
indica que se reintentará la suscripción hasta tres veces en caso de error. Si después de los reintentos persiste el fallo, se propagará el error al suscriptor.
Es importante considerar que retry
reinicia la suscripción desde el principio, lo que implica que todas las operaciones anteriores se repetirán. Esto puede no ser deseable si el procesamiento previo es costoso o si se desean reintentar únicamente a partir del punto de fallo.
Para escenarios más avanzados, el operador retryWhen
ofrece mayor control sobre el comportamiento de reintento. retryWhen
acepta una función que recibe un flujo de errores y devuelve un Publisher que determina cuándo realizar el reintento. Esto permite, por ejemplo, introducir reintentos exponenciales u otras estrategias de espera entre intentos.
A continuación, se muestra un ejemplo de uso de retryWhen
con una estrategia de reintentos con demora incremental:
Flux<String> flujoDatos = obtenerDatosRemotos();
flujoDatos
.retryWhen(companion ->
companion.zipWith(Flux.range(1, 4), (error, intento) -> intento)
.flatMap(reintento -> {
System.out.println("Reintento nº " + reintento);
return Mono.delay(Duration.ofSeconds(reintento));
})
)
.subscribe(
dato -> System.out.println("Dato recibido: " + dato),
error -> System.err.println("Error tras reintentos: " + error.getMessage())
);
En este código, se configura retryWhen
para que reintente hasta cuatro veces, con una espera creciente entre cada intento. Se utiliza zipWith
para asociar el error con el número de intento, y flatMap
para introducir la demora correspondiente.
Es crucial manejar cuidadosamente los reintentos para evitar bucles infinitos o saturación de recursos. Establecer límites y considerar el uso de demoras aleatorias o políticas de retroceso (backoff) ayuda a reducir la carga sobre los sistemas y mejorar el comportamiento bajo condiciones de error.
El operador onErrorContinue
permite continuar procesando el flujo incluso si se produce un error en uno de los elementos. En lugar de cancelar todo el flujo ante una excepción, onErrorContinue
ignora el error y sigue procesando los siguientes elementos.
Por ejemplo, en un flujo donde algunos elementos pueden provocar una excepción al ser procesados, se puede utilizar onErrorContinue
para manejar esos casos:
Flux<String> flujoDatos = obtenerListaDatos();
flujoDatos
.map(dato -> procesarDato(dato))
.onErrorContinue((error, dato) -> System.err.println("Error al procesar '" + dato + "': " + error.getMessage()))
.subscribe(resultado -> System.out.println("Resultado: " + resultado));
En este ejemplo, si procesarDato
lanza una excepción para algún elemento, el operador onErrorContinue
captura el error y permite que el flujo continúe con el siguiente dato. Se proporciona una función que recibe el error y el elemento que lo causó, permitiendo realizar acciones como registrar el incidente.
Es importante tener en cuenta que onErrorContinue puede omitir errores silenciosamente, lo que podría esconder problemas críticos en el procesamiento de datos. Por ello, su uso debe estar justificado y ser acorde con los requisitos de la aplicación.
Al utilizar estos operadores, es fundamental entender la naturaleza de los errores en flujos reactivos. Reactor distingue entre errores transitorios, que pueden resolverse con un reintento, y errores fatales, que requieren atención inmediata o no son recuperables. Diseñar estrategias adecuadas de manejo de errores mejora la resiliencia y confiabilidad del sistema.
Además, es recomendable combinar los operadores de reintento con filtros que determinen para qué tipos de excepciones se deben aplicar. Por ejemplo, es posible reintentar únicamente ante excepciones de tipo TimeoutException
:
flujoDatos
.retryWhen(companion ->
companion.ofType(TimeoutException.class)
.flatMap(err -> {
System.out.println("Reintentando tras TimeoutException");
return Mono.delay(Duration.ofSeconds(1));
})
)
En este caso, retryWhen
filtrará los errores para aplicar reintentos solo cuando se trate de una TimeoutException, proporcionando un control más fino sobre el flujo de reintentos.
En líneas generales, los operadores retry
, retryWhen
y onErrorContinue
son herramientas valiosas para manejar errores en flujos reactivos. Su uso adecuado permite construir aplicaciones más robustas y capaces de adaptarse a condiciones adversas, manteniendo una experiencia fluida para el usuario.
Backpressure: onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest
En sistemas de programación reactiva, una de las preocupaciones fundamentales es el manejo adecuado del flujo de datos entre productores y consumidores. Cuando la velocidad de emisión del productor supera la capacidad de procesamiento del consumidor, se genera una situación conocida como backpressure o contrapresión. Para gestionar eficientemente este escenario, Reactor proporciona operadores específicos: onBackpressureBuffer
, onBackpressureDrop
y onBackpressureLatest
.
El operador onBackpressureBuffer
almacena temporalmente en un buffer los elementos emitidos que el consumidor aún no ha procesado. Esto permite al sistema acomodar ráfagas de datos sin perder información:
Flux.interval(Duration.ofMillis(1))
.onBackpressureBuffer()
.subscribe(valor -> {
// Procesamiento intensivo
procesarDato(valor);
});
En este ejemplo, se genera un Flux que emite un elemento cada milisegundo. Si el método procesarDato
tarda más en ejecutarse, los elementos se acumulan en el buffer hasta que puedan ser procesados. Sin embargo, es importante considerar que el buffer puede crecer indefinidamente, lo que podría provocar problemas de memoria si el productor sigue superando al consumidor.
Para limitar el tamaño del buffer y evitar desbordamientos, es posible especificar una capacidad máxima:
Flux.interval(Duration.ofMillis(1))
.onBackpressureBuffer(1000)
.subscribe(valor -> procesarDato(valor));
Aquí, el buffer almacena hasta 1000 elementos. Si se supera este límite, se lanzará una excepción BufferOverflowException
, a menos que se defina una estrategia alternativa.
Cuando no es viable almacenar todos los elementos producidos, el operador onBackpressureDrop
permite descartar aquellos que el consumidor no pueda procesar a tiempo:
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop()
.subscribe(valor -> procesarDato(valor));
Con onBackpressureDrop
, los elementos excedentes se eliminan, asegurando que el consumidor solo procese lo que su capacidad permite. Esta estrategia es útil en aplicaciones donde es aceptable perder algunos datos para mantener la estabilidad y rendimiento del sistema. Además, es posible manejar los elementos descartados proporcionando un callback:
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop(elementoDescartado ->
System.out.println("Elemento descartado: " + elementoDescartado))
.subscribe(valor -> procesarDato(valor));
Este enfoque permite realizar acciones adicionales, como registrar o analizar los datos perdidos, mejorando el monitoreo y diagnóstico del sistema.
El operador onBackpressureLatest
es idóneo cuando solo interesa el dato más reciente. Con este operador, si el consumidor no puede procesar todos los elementos, se retiene únicamente el último elemento emitido, descartando los anteriores no procesados:
Flux.interval(Duration.ofMillis(1))
.onBackpressureLatest()
.subscribe(valor -> procesarDato(valor));
Esta estrategia es especialmente útil en contextos donde es crucial mantener actualizada la información más reciente, como en sistemas de actualización de estados o mediciones en tiempo real.
Es relevante destacar que estos operadores de backpressure se aplican cuando no existe un control de demanda efectivo entre productor y consumidor. En situaciones ideales, la programación reactiva utiliza el mecanismo de demanda bajo suscripción (request), donde el consumidor solicita al productor la cantidad exacta de elementos que puede manejar. Sin embargo, cuando el productor no puede ajustar su ritmo de emisión, como en fuentes de eventos externas, estos operadores son esenciales para gestionar la contrapresión.
Un ejemplo práctico que combina estas estrategias es el siguiente:
Flux<String> flujoEventos = obtenerEventosExternos();
flujoEventos
.onBackpressureBuffer(
500,
elementoDescartado -> registrarEventoPerdido(elementoDescartado),
BufferOverflowStrategy.DROP_OLDEST
)
.subscribe(this::procesarEvento);
En este caso, se establece un buffer limitado a 500 elementos y, al superar este límite, se aplica la estrategia DROP_OLDEST
, que descarta los elementos más antiguos en favor de los nuevos. Además, se registra cada elemento descartado mediante la función registrarEventoPerdido
, lo que facilita el seguimiento de datos perdidos.
La selección del operador adecuado depende de los requisitos de cada aplicación. Si es crítico procesar todos los datos y se dispone de suficiente memoria, onBackpressureBuffer
es apropiado. Si es preferible mantener el rendimiento sacrificando algunos datos, onBackpressureDrop
es más conveniente. Cuando solo importa el dato más reciente, onBackpressureLatest
es la opción indicada.
Es aconsejable diseñar aplicaciones reactivas considerando el control de flujo desde el inicio, implementando mecanismos que permitan al consumidor regular la velocidad del productor. Esto reduce la necesidad de utilizar operadores de backpressure y ayuda a construir sistemas más eficientes y escalables.
Planificación y subprocesos Schedulers: subscribeOn, publishOn, parallel
En programación reactiva con Reactor, la gestión de los hilos de ejecución es esencial para optimizar el rendimiento y la eficiencia de las aplicaciones. Los Schedulers proporcionan una forma de controlar en qué hilos se ejecutan las operaciones reactivas, permitiendo gestionar la planificación y la concurrencia de manera precisa. Los operadores subscribeOn
, publishOn
y el uso del Scheduler parallel
son herramientas clave para este propósito.
El operador subscribeOn
especifica el Scheduler en el que se suscribirán los suscriptores a un Publisher. Esto significa que determina el hilo desde el cual se iniciará la ejecución del flujo reactivo. Por ejemplo:
Flux<Integer> flujoNumeros = Flux.range(1, 5)
.map(numero -> {
System.out.println("map 1 - Hilo: " + Thread.currentThread().getName());
return numero * 2;
})
.subscribeOn(Schedulers.boundedElastic())
.map(numero -> {
System.out.println("map 2 - Hilo: " + Thread.currentThread().getName());
return numero + 1;
});
flujoNumeros.subscribe(valor ->
System.out.println("Suscriptor - Hilo: " + Thread.currentThread().getName() + ", Valor: " + valor));
En este ejemplo, al utilizar subscribeOn(Schedulers.boundedElastic())
, se indica que la suscripción al Flux y las operaciones posteriores se ejecutarán en el Scheduler boundedElastic, que es adecuado para operaciones bloqueantes o de larga duración. La salida mostrará que los hilos utilizados en las operaciones map
y en el suscriptor pertenecen al pool del Scheduler seleccionado.
Por otro lado, el operador publishOn
cambia el contexto de ejecución a partir del punto en que se aplica en la cadena de operadores. Esto permite alternar entre diferentes Schedulers en distintas partes del flujo. Veamos un ejemplo:
Flux<String> flujoDatos = Flux.just("A", "B", "C")
.map(dato -> {
System.out.println("map 1 - Hilo: " + Thread.currentThread().getName());
return dato.toLowerCase();
})
.publishOn(Schedulers.parallel())
.map(dato -> {
System.out.println("map 2 - Hilo: " + Thread.currentThread().getName());
return dato + "1";
});
flujoDatos.subscribe(dato ->
System.out.println("Suscriptor - Hilo: " + Thread.currentThread().getName() + ", Dato: " + dato));
En este caso, las operaciones previas a publishOn
se ejecutan en el hilo inicial, mientras que las operaciones posteriores se cambian al Scheduler parallel, que utiliza un pool de hilos optimizado para tareas CPU-bound. Esto permite aprovechar múltiples núcleos del procesador para operaciones intensivas en cálculo.
Es importante destacar que subscribeOn
solo tiene efecto cuando se coloca al principio del flujo. Si se utiliza múltiples veces, solo el primer subscribeOn
será efectivo, ya que la suscripción ocurre una única vez. En cambio, publishOn
puede aplicarse en varios puntos del flujo para cambiar el contexto de ejecución según sea necesario.
El Scheduler parallel
está diseñado para operaciones que pueden ejecutarse en paralelo, aprovechando el paralelismo a nivel de CPU. Cuando se utiliza junto con publishOn
o subscribeOn
, permite distribuir la carga de trabajo entre varios hilos. Por ejemplo:
Flux.range(1, 10)
.parallel()
.runOn(Schedulers.parallel())
.map(numero -> {
System.out.println("Procesando " + numero + " en hilo " + Thread.currentThread().getName());
return numero * 2;
})
.sequential()
.subscribe(resultado ->
System.out.println("Resultado: " + resultado));
En este ejemplo, el uso de parallel()
transforma el Flux en un ParallelFlux, permitiendo procesamiento en paralelo. La combinación con runOn(Schedulers.parallel())
especifica el Scheduler donde se ejecutarán las operaciones en paralelo. Finalmente, sequential()
convierte el ParallelFlux de vuelta a un Flux secuencial para su suscripción. Esto es útil para procesar grandes cantidades de datos de manera eficiente.
Es fundamental comprender las diferencias entre subscribeOn
y publishOn
. Mientras que subscribeOn
afecta principalmente al hilo desde el cual se suscribe al Publisher y, por ende, a las operaciones upstream, publishOn
modifica el hilo de ejecución a partir de su posición en el flujo. Por lo tanto, son operadores complementarios que deben utilizarse según las necesidades del flujo reactivo.
Otro aspecto relevante es el Scheduler immediate(), que ejecuta las operaciones en el mismo hilo que invoca el operador. Por defecto, si no se especifica un Scheduler, las operaciones se ejecutan en el hilo donde se realiza la suscripción. Sin embargo, para mejorar el rendimiento y evitar bloquear el hilo principal, es recomendable utilizar Schedulers apropiados.
Cuando se trabaja con aplicaciones web reactivas en Spring Boot, es común manejar flujos de datos que interactúan con fuentes externas, como bases de datos o servicios remotos. En estos casos, el Scheduler boundedElastic es adecuado para operaciones que pueden bloquear, ya que gestiona un pool de hilos elásticos que se expande bajo demanda:
Mono<String> resultado = realizarOperacionBloqueante()
.subscribeOn(Schedulers.boundedElastic());
resultado.subscribe(res ->
System.out.println("Operación completada: " + res));
Aquí, realizarOperacionBloqueante()
representa una llamada a un método que ejecuta una tarea que puede bloquear, como una llamada a una API externa o una consulta a una base de datos tradicional. Al usar subscribeOn(Schedulers.boundedElastic())
, se evita bloquear el hilo de evento de Netty, manteniendo la reactividad de la aplicación.
Es esencial evitar bloquear hilos en el contexto reactivo predeterminado, ya que puede causar problemas de rendimiento y afectar la escalabilidad. Utilizar Schedulers adecuados asegura que las operaciones se ejecuten en hilos apropiados sin comprometer la naturaleza no bloqueante de la aplicación.
Además, es posible crear un Scheduler personalizado utilizando Schedulers.newBoundedElastic()
o Schedulers.newParallel()
, configurando parámetros como el tamaño del pool de hilos y el tiempo de vida de los mismos. Esto ofrece flexibilidad para adaptar la gestión de hilos a las necesidades específicas de la aplicación.
Por ejemplo, para crear un Scheduler paralelo personalizado:
Scheduler schedulerPersonalizado = Schedulers.newParallel("mi-paralelo", 8);
Flux<Integer> flujoPersonalizado = Flux.range(1, 100)
.publishOn(schedulerPersonalizado)
.map(this::procesar);
flujoPersonalizado.subscribe(resultado ->
System.out.println("Resultado: " + resultado));
En este caso, Schedulers.newParallel("mi-paralelo", 8)
crea un Scheduler con un pool de 8 hilos, identificados con el prefijo "mi-paralelo". Esto permite controlar el grado de paralelismo y optimizar el uso de recursos según la carga esperada.
Al utilizar Schedulers, es importante también manejar adecuadamente los recursos. Aunque Reactor gestiona automáticamente los pools de hilos para los Schedulers compartidos, si se crean Schedulers personalizados, es recomendable cerrarlos al finalizar su uso para liberar recursos:
schedulerPersonalizado.dispose();
La comprensión y uso correcto de subscribeOn
, publishOn
y los Schedulers como parallel
son fundamentales para optimizar la ejecución de flujos reactivos en aplicaciones con Spring Boot 3. Permiten controlar la concurrencia, evitar bloqueos y aprovechar al máximo los recursos disponibles, mejorando el rendimiento y la escalabilidad de las aplicaciones reactivas.
Ejercicios de esta lección Operadores reactivos avanzados
Evalúa tus conocimientos de esta lección Operadores reactivos avanzados con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.
API Query By Example (QBE)
Identificadores y relaciones JPA
Borrar datos de base de datos
Web y Test Starters
Métodos find en repositorios
Controladores Spring MVC
Inserción de datos
CRUD Customers Spring MVC + Spring Data JPA
Backend API REST con Spring Boot
Controladores Spring REST
Uso de Spring con Thymeleaf
API Specification
Registro de usuarios
Crear entidades JPA
Asociaciones en JPA
Asociaciones de entidades JPA
Integración con Vue
Consultas JPQL
Open API y cómo agregarlo en Spring Boot
Uso de Controladores REST
Repositorios reactivos
Inyección de dependencias
Introducción a Spring Boot
CRUD y JPA Repository
Inyección de dependencias
Vista en Spring MVC con Thymeleaf
Servicios en Spring
Operadores Reactivos
Configuración de Vue
Entidades JPA
Integración con Angular
API Specification
API Query By Example (QBE)
Controladores MVC
Anotaciones y mapeo en JPA
Consultas JPQL con @Query en Spring Data JPA
Repositorios Spring Data
Inyección de dependencias
Data JPA y Mail Starters
Configuración de Angular
Controladores Spring REST
Configuración de Controladores MVC
Consultas JPQL con @Query en Spring Data JPA
Actualizar datos de base de datos
Verificar token JWT en peticiones
Login de usuarios
Integración con React
Configuración de React
Todas las lecciones de SpringBoot
Accede a todas las lecciones de SpringBoot y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Spring Boot
Introducción Y Entorno
Spring Boot Starters
Introducción Y Entorno
Inyección De Dependencias
Introducción Y Entorno
Controladores Spring Mvc
Spring Web
Vista En Spring Mvc Con Thymeleaf
Spring Web
Controladores Spring Rest
Spring Web
Open Api Y Cómo Agregarlo En Spring Boot
Spring Web
Servicios En Spring
Spring Web
Clientes Resttemplate Y Restclient
Spring Web
Rxjava En Spring Web
Spring Web
Crear Entidades Jpa
Persistencia Spring Data
Asociaciones De Entidades Jpa
Persistencia Spring Data
Repositorios Spring Data
Persistencia Spring Data
Métodos Find En Repositorios
Persistencia Spring Data
Inserción De Datos
Persistencia Spring Data
Actualizar Datos De Base De Datos
Persistencia Spring Data
Borrar Datos De Base De Datos
Persistencia Spring Data
Consultas Jpql Con @Query En Spring Data Jpa
Persistencia Spring Data
Api Query By Example (Qbe)
Persistencia Spring Data
Api Specification
Persistencia Spring Data
Repositorios Reactivos
Persistencia Spring Data
Introducción E Instalación De Apache Kafka
Mensajería Asíncrona
Crear Proyecto Con Apache Kafka
Mensajería Asíncrona
Creación De Producers
Mensajería Asíncrona
Creación De Consumers
Mensajería Asíncrona
Kafka Streams En Spring Boot
Mensajería Asíncrona
Introducción A Spring Webflux
Reactividad Webflux
Spring Data R2dbc
Reactividad Webflux
Controlador Rest Reactivo Basado En Anotaciones
Reactividad Webflux
Controlador Rest Reactivo Funcional
Reactividad Webflux
Operadores Reactivos Básicos
Reactividad Webflux
Operadores Reactivos Avanzados
Reactividad Webflux
Cliente Reactivo Webclient
Reactividad Webflux
Introducción A Spring Security
Seguridad Con Spring Security
Seguridad Basada En Formulario En Mvc Con Thymeleaf
Seguridad Con Spring Security
Registro De Usuarios
Seguridad Con Spring Security
Login De Usuarios
Seguridad Con Spring Security
Verificar Token Jwt En Peticiones
Seguridad Con Spring Security
Seguridad Jwt En Api Rest Spring Web
Seguridad Con Spring Security
Seguridad Jwt En Api Rest Reactiva Spring Webflux
Seguridad Con Spring Security
Autenticación Y Autorización Con Anotaciones
Seguridad Con Spring Security
Testing Unitario De Componentes Y Servicios
Testing Con Spring Test
Testing De Repositorios Spring Data Jpa
Testing Con Spring Test
Testing Controladores Spring Mvc Con Thymeleaf
Testing Con Spring Test
Testing Controladores Rest Con Json
Testing Con Spring Test
Testing De Aplicaciones Reactivas Webflux
Testing Con Spring Test
Testing De Seguridad Spring Security
Testing Con Spring Test
Testing Con Apache Kafka
Testing Con Spring Test
Integración Con Angular
Integración Frontend
Integración Con React
Integración Frontend
Integración Con Vue
Integración Frontend
En esta lección
Objetivos de aprendizaje de esta lección
- Control de flujo
- Programación temporal
- Combinación avanzada
- Errores y reintentos
- Backpressure
- Planificación y subprocesos Schedulers