Spring Boot

SpringBoot

Tutorial SpringBoot: Operadores reactivos avanzados

Aprende programación reactiva con los operadores Reactor avanzados para control de flujo, programación temporal, combinación, backpressure, schedulers en Spring WebFlux sobre Spring Boot.

Aprende SpringBoot GRATIS y certifícate

Control de flujo: buffer, window, limitRate, throttleFirst, throttleLast

En programación reactiva, el control de flujo es esencial para gestionar adecuadamente la emisión y consumo de datos en sistemas asíncronos. Reactor ofrece operadores avanzados como buffer, window, limitRate, throttleFirst y throttleLast que permiten regular y optimizar el manejo de secuencias de datos.

El operador buffer recopila elementos emitidos por un Flux en una colección, emitiéndolos juntos como una lista cuando se cumple una condición específica. Por ejemplo, se puede agrupar cada cierto número de elementos:

Flux.range(1, 10)
    .buffer(3)
    .subscribe(lista -> System.out.println("Buffer recibido: " + lista));

Este código agrupa los números del 1 al 10 en listas de tres elementos, emitiendo: [1, 2, 3], [4, 5, 6], [7, 8, 9] y [10]. El uso de buffer es útil para procesos que requieren manejar conjuntos de datos simultáneamente, mejorando la eficiencia al reducir la frecuencia de operaciones sobre elementos individuales.

El operador window es similar a buffer, pero en lugar de recolectar los elementos en una colección, los agrupa en sub-Flux. Cada sub-Flux representa una ventana de elementos que puede ser procesada de forma independiente y reactiva:

Flux.range(1, 10)
    .window(3)
    .subscribe(ventana -> 
        ventana.collectList().subscribe(lista -> 
            System.out.println("Ventana recibida: " + lista)));

Aquí, window(3) divide el flujo original en ventanas de tres elementos. Esto es especialmente útil cuando se desea aplicar operaciones reactivas a grupos de elementos manteniendo la naturaleza asíncrona del procesamiento.

Con limitRate, es posible controlar la velocidad a la que un suscriptor consume elementos de un Flux. Este operador limita el número de elementos solicitados al publicador, evitando sobrecargas en el consumidor:

Flux.range(1, 1000)
    .limitRate(100)
    .subscribe(elemento -> procesar(elemento));

En este ejemplo, limitRate(100) permite que el suscriptor procese los elementos en lotes de 100, evitando que una cantidad excesiva de datos inunde el sistema y potencialmente cause problemas de rendimiento o memoria.

Los operadores throttleFirst y throttleLast permiten controlar la emisión de elementos en función del tiempo. Con throttleFirst, se emite el primer elemento que llega en cada intervalo de tiempo especificado, ignorando los siguientes hasta que el intervalo se reinicia:

Flux.interval(Duration.ofMillis(200))
    .throttleFirst(Duration.ofSeconds(1))
    .take(5)
    .subscribe(valor -> System.out.println("ThrottleFirst recibido: " + valor));

Este código emitirá un valor cada segundo, capturando el primer elemento disponible en ese lapso. Es útil en escenarios donde se necesita limitar la tasa de procesamiento, como en la implementación de funcionalidades de anti-rebote en interfaces de usuario.

Por el contrario, throttleLast (también conocido como sample) emite el último elemento producido en cada intervalo de tiempo especificado:

Flux.interval(Duration.ofMillis(200))
    .throttleLast(Duration.ofSeconds(1))
    .take(5)
    .subscribe(valor -> System.out.println("ThrottleLast recibido: " + valor));

En este caso, se emite el último valor generado dentro de cada segundo. Este patrón es adecuado cuando se requiere obtener el dato más reciente en intervalos regulares, como en actualizaciones periódicas de estado.

Estos operadores son herramientas poderosas para regular el flujo de datos en aplicaciones reactivas, permitiendo gestionar eficientemente recursos y adaptarse a distintas condiciones de carga. Al utilizar buffer y window, se controlan los agrupamientos de elementos; con limitRate, se ajusta la velocidad de consumo; y con throttleFirst y throttleLast, se modula la emisión de elementos en función del tiempo.

Programación temporal: delayElements, delaySequence, timeout

En aplicaciones reactivas, el manejo de operaciones temporales es esencial para controlar la sincronización y el ritmo en que se procesan los eventos. Reactor proporciona operadores como delayElements, delaySequence y timeout que permiten introducir retrasos y gestionar tiempo de espera en flujos reactivos.

El operador delayElements introduce un retraso entre la emisión de cada elemento de un Flux o Mono. Esto es útil cuando se desea espaciar la emisión de eventos para evitar saturar al consumidor o simular un flujo de datos con retardo. Por ejemplo:

Flux.range(1, 5)
    .delayElements(Duration.ofSeconds(1))
    .subscribe(elemento -> System.out.println("Elemento recibido: " + elemento));

En este código, cada elemento se emite con un retraso de un segundo respecto al anterior. La salida será:

Elemento recibido: 1
Elemento recibido: 2
Elemento recibido: 3
Elemento recibido: 4
Elemento recibido: 5

con un segundo de diferencia entre cada línea. Es importante destacar que delayElements no bloquea el hilo desde el que se emite el Flux; en su lugar, programa la emisión de cada elemento utilizando un Scheduler por defecto, que es el parallel() en Reactor.

Si se necesita retrasar la emisión completa de la secuencia, se utiliza delaySequence. Este operador retrasa el inicio de la emisión por un periodo específico, manteniendo el intervalo original entre elementos. Un ejemplo práctico es:

Flux<String> flujoMensajes = Flux.just("Mensaje 1", "Mensaje 2", "Mensaje 3")
    .delaySequence(Duration.ofSeconds(5));

flujoMensajes.subscribe(mensaje -> System.out.println("Recibido: " + mensaje));

Aquí, el flujo comenzará a emitir los mensajes después de un retraso total de cinco segundos, pero sin introducir demoras entre cada mensaje. La salida será:

(Tras 5 segundos de espera)

Recibido: Mensaje 1
Recibido: Mensaje 2
Recibido: Mensaje 3

Este operador es útil cuando se requiere una espera inicial antes de procesar una secuencia completa, por ejemplo, para esperar a que un recurso esté disponible o para sincronizar flujos.

Por otro lado, el operador timeout es fundamental para manejar situaciones en las que una operación reactiva puede tardar más de lo esperado. Este operador permite establecer un tiempo máximo de espera para la emisión de los elementos. Si se supera este tiempo, se produce un error de tipo TimeoutException. Un ejemplo de uso es:

Flux<Long> flujoIntervalo = Flux.interval(Duration.ofSeconds(2))
    .timeout(Duration.ofSeconds(1))
    .onErrorReturn(-1L);

flujoIntervalo.subscribe(valor -> System.out.println("Valor: " + valor));

En este caso, como el intervalo de emisión es de dos segundos y el timeout está configurado en un segundo, se producirá una excepción después del primer segundo sin recibir ningún elemento. Gracias a onErrorReturn, se captura el error y se emite -1L en su lugar, obteniendo la salida:

Valor: -1

El operador timeout es esencial para evitar que un flujo reactivo quede bloqueado indefinidamente ante demoras inesperadas, permitiendo implementar políticas de reintento o proporcionar valores por defecto.

También es posible proporcionar un flujo alternativo al producirse el timeout. Por ejemplo:

Flux<String> flujoDatos = obtenerDatosConRetraso()
    .timeout(Duration.ofSeconds(3), flujoAlternativo());

flujoDatos.subscribe(dato -> System.out.println("Dato: " + dato));

En este código, si obtenerDatosConRetraso() no emite ningún elemento en tres segundos, el flujo pasará a emitir los elementos proporcionados por flujoAlternativo(). Esto permite ofrecer una ruta de respaldo ante retrasos, mejorando la resiliencia de la aplicación.

Es relevante mencionar que el uso de timeout debe ser cuidadoso en sistemas donde las demoras pueden ser comunes, ya que un tiempo de espera demasiado corto puede provocar errores frecuentes. Ajustar adecuadamente los tiempos y manejar las excepciones de forma apropiada es clave para un comportamiento robusto.

En resumen, los operadores delayElements, delaySequence y timeout ofrecen mecanismos precisos para controlar el timing en flujos reactivos. Permiten introducir retrasos estratégicos, simular latencias y gestionar tiempos de espera, aportando flexibilidad y control en la programación reactiva con Reactor y Spring Boot 3.

Combinación avanzada: combineLatest, withLatestFrom, groupBy

En aplicaciones reactivas complejas, es común trabajar con múltiples fuentes de datos que necesitan ser combinadas o agrupadas. Los operadores avanzados combineLatest, withLatestFrom y groupBy proporcionan mecanismos eficaces para fusionar y organizar flujos de datos en Reactor.

El operador combineLatest combina varios Publisher y emite un nuevo valor cada vez que alguno de ellos emite, utilizando los últimos valores emitidos por cada uno. Este operador es útil cuando se desea reaccionar ante cualquier cambio en las fuentes, trabajando siempre con los datos más recientes disponibles.

Por ejemplo, si se tienen dos Flux que representan movimientos en los ejes X e Y de un dispositivo, y se desea obtener la posición actual cada vez que alguno de los ejes cambia:

Flux<Integer> ejeX = Flux.just(0, 1, 2, 3)
    .delayElements(Duration.ofMillis(500));

Flux<Integer> ejeY = Flux.just(0, -1, -2)
    .delayElements(Duration.ofMillis(700));

Flux.combineLatest(ejeX, ejeY, (x, y) -> "Posición: (" + x + ", " + y + ")")
    .subscribe(System.out::println);

En este ejemplo, combineLatest emite una nueva posición cada vez que el eje X o el eje Y emiten un valor, utilizando siempre el último valor conocido de cada eje. De esta manera, se obtiene una secuencia que refleja en tiempo real la posición del dispositivo.

El operador withLatestFrom combina un Flux fuente con los últimos valores emitidos por otros Publishers, pero emite valores solo cuando el Flux fuente emite. A diferencia de combineLatest, que emite cuando cualquiera de los Publishers emite, withLatestFrom se centra en el flujo principal.

Por ejemplo, supongamos que se tiene un Flux que emite órdenes de compra y otro que proporciona las tasas de cambio actuales. Se desea combinar cada orden con la tasa de cambio más reciente en el momento de la emisión de la orden:

Flux<Orden> ordenes = obtenerOrdenes(); // Flux<Orden>
Flux<Double> tasaCambio = obtenerTasasCambio(); // Flux<Double>

ordenes.withLatestFrom(tasaCambio, (orden, tasa) -> convertirMoneda(orden, tasa))
    .subscribe(ordenConvertida -> procesarOrden(ordenConvertida));

En este ejemplo, cada vez que se emite una orden en el Flux de ordenes, se combina con la tasa de cambio más reciente proporcionada por el Flux tasaCambio. El operador withLatestFrom asegura que utilizamos la información más actualizada de la tasa de cambio en el momento de procesar cada orden.

El operador groupBy sirve para agrupar elementos de un Flux en función de una clave, creando un GroupedFlux por cada grupo identificado. Este operador es útil cuando se necesita procesar o segregar datos basados en algún criterio específico.

Por ejemplo, si se tiene un Flux de eventos de usuario y se desea agruparlos por tipo de evento:

Flux<EventoUsuario> eventos = obtenerEventosUsuario(); // Flux<EventoUsuario>

eventos.groupBy(EventoUsuario::getTipo)
    .subscribe(grupo -> 
        grupo.subscribe(evento -> 
            System.out.println("Tipo: " + grupo.key() + ", Evento: " + evento)));

En este ejemplo, groupBy agrupa los eventos por su tipo. Cada GroupedFlux emitido representa un grupo de eventos del mismo tipo, y se puede procesar de manera independiente. El método key() proporciona la clave del grupo, permitiendo identificar el tipo de evento asociado.

Es importante considerar que al utilizar groupBy, el número de grupos puede afectar al consumo de memoria, ya que cada grupo mantiene su propio flujo. Por ello, es recomendable aplicar operadores como take, limit o procesar cada grupo de forma que no permanezca abierto indefinidamente.

La combinación de estos operadores avanzados permite construir flujos reactivos más sofisticados, adaptándose a las necesidades de aplicaciones modernas. El uso de combineLatest y withLatestFrom facilita la sincronización de múltiples fuentes de datos, mientras que groupBy habilita el procesamiento segmentado basado en claves.

Errores y reintentos: retry, retryWhen, onErrorContinue

En programación reactiva, el manejo adecuado de errores es fundamental para garantizar la robustez y resiliencia de las aplicaciones. Reactor proporciona operadores como retry, retryWhen y onErrorContinue que permiten gestionar y recuperar flujos ante situaciones de error de manera eficiente.

El operador retry permite reintentar la suscripción a un Publisher en caso de que ocurra una excepción. Al utilizar retry, el flujo volverá a suscribirse desde el inicio cada vez que se produzca un error, hasta completar un número de reintentos especificado o indefinidamente si no se indica límite.

Por ejemplo, supongamos un flujo que puede fallar intermitentemente al obtener datos de una fuente externa:

Flux<String> flujoInestable = obtenerDatosInestables();

flujoInestable
    .retry(3)
    .subscribe(
        dato -> System.out.println("Dato recibido: " + dato),
        error -> System.err.println("Error después de reintentos: " + error.getMessage())
    );

En este ejemplo, el operador retry(3) indica que se reintentará la suscripción hasta tres veces en caso de error. Si después de los reintentos persiste el fallo, se propagará el error al suscriptor.

Es importante considerar que retry reinicia la suscripción desde el principio, lo que implica que todas las operaciones anteriores se repetirán. Esto puede no ser deseable si el procesamiento previo es costoso o si se desean reintentar únicamente a partir del punto de fallo.

Para escenarios más avanzados, el operador retryWhen ofrece mayor control sobre el comportamiento de reintento. retryWhen acepta una función que recibe un flujo de errores y devuelve un Publisher que determina cuándo realizar el reintento. Esto permite, por ejemplo, introducir reintentos exponenciales u otras estrategias de espera entre intentos.

A continuación, se muestra un ejemplo de uso de retryWhen con una estrategia de reintentos con demora incremental:

Flux<String> flujoDatos = obtenerDatosRemotos();

flujoDatos
    .retryWhen(companion -> 
        companion.zipWith(Flux.range(1, 4), (error, intento) -> intento)
                 .flatMap(reintento -> {
                     System.out.println("Reintento nº " + reintento);
                     return Mono.delay(Duration.ofSeconds(reintento));
                 })
    )
    .subscribe(
        dato -> System.out.println("Dato recibido: " + dato),
        error -> System.err.println("Error tras reintentos: " + error.getMessage())
    );

En este código, se configura retryWhen para que reintente hasta cuatro veces, con una espera creciente entre cada intento. Se utiliza zipWith para asociar el error con el número de intento, y flatMap para introducir la demora correspondiente.

Es crucial manejar cuidadosamente los reintentos para evitar bucles infinitos o saturación de recursos. Establecer límites y considerar el uso de demoras aleatorias o políticas de retroceso (backoff) ayuda a reducir la carga sobre los sistemas y mejorar el comportamiento bajo condiciones de error.

El operador onErrorContinue permite continuar procesando el flujo incluso si se produce un error en uno de los elementos. En lugar de cancelar todo el flujo ante una excepción, onErrorContinue ignora el error y sigue procesando los siguientes elementos.

Por ejemplo, en un flujo donde algunos elementos pueden provocar una excepción al ser procesados, se puede utilizar onErrorContinue para manejar esos casos:

Flux<String> flujoDatos = obtenerListaDatos();

flujoDatos
    .map(dato -> procesarDato(dato))
    .onErrorContinue((error, dato) -> System.err.println("Error al procesar '" + dato + "': " + error.getMessage()))
    .subscribe(resultado -> System.out.println("Resultado: " + resultado));

En este ejemplo, si procesarDato lanza una excepción para algún elemento, el operador onErrorContinue captura el error y permite que el flujo continúe con el siguiente dato. Se proporciona una función que recibe el error y el elemento que lo causó, permitiendo realizar acciones como registrar el incidente.

Es importante tener en cuenta que onErrorContinue puede omitir errores silenciosamente, lo que podría esconder problemas críticos en el procesamiento de datos. Por ello, su uso debe estar justificado y ser acorde con los requisitos de la aplicación.

Al utilizar estos operadores, es fundamental entender la naturaleza de los errores en flujos reactivos. Reactor distingue entre errores transitorios, que pueden resolverse con un reintento, y errores fatales, que requieren atención inmediata o no son recuperables. Diseñar estrategias adecuadas de manejo de errores mejora la resiliencia y confiabilidad del sistema.

Además, es recomendable combinar los operadores de reintento con filtros que determinen para qué tipos de excepciones se deben aplicar. Por ejemplo, es posible reintentar únicamente ante excepciones de tipo TimeoutException:

flujoDatos
    .retryWhen(companion ->
        companion.ofType(TimeoutException.class)
                 .flatMap(err -> {
                     System.out.println("Reintentando tras TimeoutException");
                     return Mono.delay(Duration.ofSeconds(1));
                 })
    )

En este caso, retryWhen filtrará los errores para aplicar reintentos solo cuando se trate de una TimeoutException, proporcionando un control más fino sobre el flujo de reintentos.

En líneas generales, los operadores retry, retryWhen y onErrorContinue son herramientas valiosas para manejar errores en flujos reactivos. Su uso adecuado permite construir aplicaciones más robustas y capaces de adaptarse a condiciones adversas, manteniendo una experiencia fluida para el usuario.

Backpressure: onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest

En sistemas de programación reactiva, una de las preocupaciones fundamentales es el manejo adecuado del flujo de datos entre productores y consumidores. Cuando la velocidad de emisión del productor supera la capacidad de procesamiento del consumidor, se genera una situación conocida como backpressure o contrapresión. Para gestionar eficientemente este escenario, Reactor proporciona operadores específicos: onBackpressureBuffer, onBackpressureDrop y onBackpressureLatest.

El operador onBackpressureBuffer almacena temporalmente en un buffer los elementos emitidos que el consumidor aún no ha procesado. Esto permite al sistema acomodar ráfagas de datos sin perder información:

Flux.interval(Duration.ofMillis(1))
    .onBackpressureBuffer()
    .subscribe(valor -> {
        // Procesamiento intensivo
        procesarDato(valor);
    });

En este ejemplo, se genera un Flux que emite un elemento cada milisegundo. Si el método procesarDato tarda más en ejecutarse, los elementos se acumulan en el buffer hasta que puedan ser procesados. Sin embargo, es importante considerar que el buffer puede crecer indefinidamente, lo que podría provocar problemas de memoria si el productor sigue superando al consumidor.

Para limitar el tamaño del buffer y evitar desbordamientos, es posible especificar una capacidad máxima:

Flux.interval(Duration.ofMillis(1))
    .onBackpressureBuffer(1000)
    .subscribe(valor -> procesarDato(valor));

Aquí, el buffer almacena hasta 1000 elementos. Si se supera este límite, se lanzará una excepción BufferOverflowException, a menos que se defina una estrategia alternativa.

Cuando no es viable almacenar todos los elementos producidos, el operador onBackpressureDrop permite descartar aquellos que el consumidor no pueda procesar a tiempo:

Flux.interval(Duration.ofMillis(1))
    .onBackpressureDrop()
    .subscribe(valor -> procesarDato(valor));

Con onBackpressureDrop, los elementos excedentes se eliminan, asegurando que el consumidor solo procese lo que su capacidad permite. Esta estrategia es útil en aplicaciones donde es aceptable perder algunos datos para mantener la estabilidad y rendimiento del sistema. Además, es posible manejar los elementos descartados proporcionando un callback:

Flux.interval(Duration.ofMillis(1))
    .onBackpressureDrop(elementoDescartado -> 
        System.out.println("Elemento descartado: " + elementoDescartado))
    .subscribe(valor -> procesarDato(valor));

Este enfoque permite realizar acciones adicionales, como registrar o analizar los datos perdidos, mejorando el monitoreo y diagnóstico del sistema.

El operador onBackpressureLatest es idóneo cuando solo interesa el dato más reciente. Con este operador, si el consumidor no puede procesar todos los elementos, se retiene únicamente el último elemento emitido, descartando los anteriores no procesados:

Flux.interval(Duration.ofMillis(1))
    .onBackpressureLatest()
    .subscribe(valor -> procesarDato(valor));

Esta estrategia es especialmente útil en contextos donde es crucial mantener actualizada la información más reciente, como en sistemas de actualización de estados o mediciones en tiempo real.

Es relevante destacar que estos operadores de backpressure se aplican cuando no existe un control de demanda efectivo entre productor y consumidor. En situaciones ideales, la programación reactiva utiliza el mecanismo de demanda bajo suscripción (request), donde el consumidor solicita al productor la cantidad exacta de elementos que puede manejar. Sin embargo, cuando el productor no puede ajustar su ritmo de emisión, como en fuentes de eventos externas, estos operadores son esenciales para gestionar la contrapresión.

Un ejemplo práctico que combina estas estrategias es el siguiente:

Flux<String> flujoEventos = obtenerEventosExternos();
flujoEventos
    .onBackpressureBuffer(
        500,
        elementoDescartado -> registrarEventoPerdido(elementoDescartado),
        BufferOverflowStrategy.DROP_OLDEST
    )
    .subscribe(this::procesarEvento);

En este caso, se establece un buffer limitado a 500 elementos y, al superar este límite, se aplica la estrategia DROP_OLDEST, que descarta los elementos más antiguos en favor de los nuevos. Además, se registra cada elemento descartado mediante la función registrarEventoPerdido, lo que facilita el seguimiento de datos perdidos.

La selección del operador adecuado depende de los requisitos de cada aplicación. Si es crítico procesar todos los datos y se dispone de suficiente memoria, onBackpressureBuffer es apropiado. Si es preferible mantener el rendimiento sacrificando algunos datos, onBackpressureDrop es más conveniente. Cuando solo importa el dato más reciente, onBackpressureLatest es la opción indicada.

Es aconsejable diseñar aplicaciones reactivas considerando el control de flujo desde el inicio, implementando mecanismos que permitan al consumidor regular la velocidad del productor. Esto reduce la necesidad de utilizar operadores de backpressure y ayuda a construir sistemas más eficientes y escalables.

Planificación y subprocesos Schedulers: subscribeOn, publishOn, parallel

En programación reactiva con Reactor, la gestión de los hilos de ejecución es esencial para optimizar el rendimiento y la eficiencia de las aplicaciones. Los Schedulers proporcionan una forma de controlar en qué hilos se ejecutan las operaciones reactivas, permitiendo gestionar la planificación y la concurrencia de manera precisa. Los operadores subscribeOn, publishOn y el uso del Scheduler parallel son herramientas clave para este propósito.

El operador subscribeOn especifica el Scheduler en el que se suscribirán los suscriptores a un Publisher. Esto significa que determina el hilo desde el cual se iniciará la ejecución del flujo reactivo. Por ejemplo:

Flux<Integer> flujoNumeros = Flux.range(1, 5)
    .map(numero -> {
        System.out.println("map 1 - Hilo: " + Thread.currentThread().getName());
        return numero * 2;
    })
    .subscribeOn(Schedulers.boundedElastic())
    .map(numero -> {
        System.out.println("map 2 - Hilo: " + Thread.currentThread().getName());
        return numero + 1;
    });

flujoNumeros.subscribe(valor -> 
    System.out.println("Suscriptor - Hilo: " + Thread.currentThread().getName() + ", Valor: " + valor));

En este ejemplo, al utilizar subscribeOn(Schedulers.boundedElastic()), se indica que la suscripción al Flux y las operaciones posteriores se ejecutarán en el Scheduler boundedElastic, que es adecuado para operaciones bloqueantes o de larga duración. La salida mostrará que los hilos utilizados en las operaciones map y en el suscriptor pertenecen al pool del Scheduler seleccionado.

Por otro lado, el operador publishOn cambia el contexto de ejecución a partir del punto en que se aplica en la cadena de operadores. Esto permite alternar entre diferentes Schedulers en distintas partes del flujo. Veamos un ejemplo:

Flux<String> flujoDatos = Flux.just("A", "B", "C")
    .map(dato -> {
        System.out.println("map 1 - Hilo: " + Thread.currentThread().getName());
        return dato.toLowerCase();
    })
    .publishOn(Schedulers.parallel())
    .map(dato -> {
        System.out.println("map 2 - Hilo: " + Thread.currentThread().getName());
        return dato + "1";
    });

flujoDatos.subscribe(dato ->
    System.out.println("Suscriptor - Hilo: " + Thread.currentThread().getName() + ", Dato: " + dato));

En este caso, las operaciones previas a publishOn se ejecutan en el hilo inicial, mientras que las operaciones posteriores se cambian al Scheduler parallel, que utiliza un pool de hilos optimizado para tareas CPU-bound. Esto permite aprovechar múltiples núcleos del procesador para operaciones intensivas en cálculo.

Es importante destacar que subscribeOn solo tiene efecto cuando se coloca al principio del flujo. Si se utiliza múltiples veces, solo el primer subscribeOn será efectivo, ya que la suscripción ocurre una única vez. En cambio, publishOn puede aplicarse en varios puntos del flujo para cambiar el contexto de ejecución según sea necesario.

El Scheduler parallel está diseñado para operaciones que pueden ejecutarse en paralelo, aprovechando el paralelismo a nivel de CPU. Cuando se utiliza junto con publishOn o subscribeOn, permite distribuir la carga de trabajo entre varios hilos. Por ejemplo:

Flux.range(1, 10)
    .parallel()
    .runOn(Schedulers.parallel())
    .map(numero -> {
        System.out.println("Procesando " + numero + " en hilo " + Thread.currentThread().getName());
        return numero * 2;
    })
    .sequential()
    .subscribe(resultado -> 
        System.out.println("Resultado: " + resultado));

En este ejemplo, el uso de parallel() transforma el Flux en un ParallelFlux, permitiendo procesamiento en paralelo. La combinación con runOn(Schedulers.parallel()) especifica el Scheduler donde se ejecutarán las operaciones en paralelo. Finalmente, sequential() convierte el ParallelFlux de vuelta a un Flux secuencial para su suscripción. Esto es útil para procesar grandes cantidades de datos de manera eficiente.

Es fundamental comprender las diferencias entre subscribeOn y publishOn. Mientras que subscribeOn afecta principalmente al hilo desde el cual se suscribe al Publisher y, por ende, a las operaciones upstream, publishOn modifica el hilo de ejecución a partir de su posición en el flujo. Por lo tanto, son operadores complementarios que deben utilizarse según las necesidades del flujo reactivo.

Otro aspecto relevante es el Scheduler immediate(), que ejecuta las operaciones en el mismo hilo que invoca el operador. Por defecto, si no se especifica un Scheduler, las operaciones se ejecutan en el hilo donde se realiza la suscripción. Sin embargo, para mejorar el rendimiento y evitar bloquear el hilo principal, es recomendable utilizar Schedulers apropiados.

Cuando se trabaja con aplicaciones web reactivas en Spring Boot, es común manejar flujos de datos que interactúan con fuentes externas, como bases de datos o servicios remotos. En estos casos, el Scheduler boundedElastic es adecuado para operaciones que pueden bloquear, ya que gestiona un pool de hilos elásticos que se expande bajo demanda:

Mono<String> resultado = realizarOperacionBloqueante()
    .subscribeOn(Schedulers.boundedElastic());

resultado.subscribe(res ->
    System.out.println("Operación completada: " + res));

Aquí, realizarOperacionBloqueante() representa una llamada a un método que ejecuta una tarea que puede bloquear, como una llamada a una API externa o una consulta a una base de datos tradicional. Al usar subscribeOn(Schedulers.boundedElastic()), se evita bloquear el hilo de evento de Netty, manteniendo la reactividad de la aplicación.

Es esencial evitar bloquear hilos en el contexto reactivo predeterminado, ya que puede causar problemas de rendimiento y afectar la escalabilidad. Utilizar Schedulers adecuados asegura que las operaciones se ejecuten en hilos apropiados sin comprometer la naturaleza no bloqueante de la aplicación.

Además, es posible crear un Scheduler personalizado utilizando Schedulers.newBoundedElastic() o Schedulers.newParallel(), configurando parámetros como el tamaño del pool de hilos y el tiempo de vida de los mismos. Esto ofrece flexibilidad para adaptar la gestión de hilos a las necesidades específicas de la aplicación.

Por ejemplo, para crear un Scheduler paralelo personalizado:

Scheduler schedulerPersonalizado = Schedulers.newParallel("mi-paralelo", 8);

Flux<Integer> flujoPersonalizado = Flux.range(1, 100)
    .publishOn(schedulerPersonalizado)
    .map(this::procesar);

flujoPersonalizado.subscribe(resultado ->
    System.out.println("Resultado: " + resultado));

En este caso, Schedulers.newParallel("mi-paralelo", 8) crea un Scheduler con un pool de 8 hilos, identificados con el prefijo "mi-paralelo". Esto permite controlar el grado de paralelismo y optimizar el uso de recursos según la carga esperada.

Al utilizar Schedulers, es importante también manejar adecuadamente los recursos. Aunque Reactor gestiona automáticamente los pools de hilos para los Schedulers compartidos, si se crean Schedulers personalizados, es recomendable cerrarlos al finalizar su uso para liberar recursos:

schedulerPersonalizado.dispose();

La comprensión y uso correcto de subscribeOn, publishOn y los Schedulers como parallel son fundamentales para optimizar la ejecución de flujos reactivos en aplicaciones con Spring Boot 3. Permiten controlar la concurrencia, evitar bloqueos y aprovechar al máximo los recursos disponibles, mejorando el rendimiento y la escalabilidad de las aplicaciones reactivas.

Aprende SpringBoot GRATIS online

Ejercicios de esta lección Operadores reactivos avanzados

Evalúa tus conocimientos de esta lección Operadores reactivos avanzados con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.

API Query By Example (QBE)

Spring Boot
Test

Identificadores y relaciones JPA

Spring Boot
Puzzle

Borrar datos de base de datos

Spring Boot
Test

Web y Test Starters

Spring Boot
Puzzle

Métodos find en repositorios

Spring Boot
Test

Controladores Spring MVC

Spring Boot
Código

Inserción de datos

Spring Boot
Test

CRUD Customers Spring MVC + Spring Data JPA

Spring Boot
Proyecto

Backend API REST con Spring Boot

Spring Boot
Proyecto

Controladores Spring REST

Spring Boot
Código

Uso de Spring con Thymeleaf

Spring Boot
Puzzle

API Specification

Spring Boot
Puzzle

Registro de usuarios

Spring Boot
Test

Crear entidades JPA

Spring Boot
Código

Asociaciones en JPA

Spring Boot
Test

Asociaciones de entidades JPA

Spring Boot
Código

Integración con Vue

Spring Boot
Test

Consultas JPQL

Spring Boot
Código

Open API y cómo agregarlo en Spring Boot

Spring Boot
Puzzle

Uso de Controladores REST

Spring Boot
Puzzle

Repositorios reactivos

Spring Boot
Test

Inyección de dependencias

Spring Boot
Test

Introducción a Spring Boot

Spring Boot
Test

CRUD y JPA Repository

Spring Boot
Puzzle

Inyección de dependencias

Spring Boot
Código

Vista en Spring MVC con Thymeleaf

Spring Boot
Test

Servicios en Spring

Spring Boot
Código

Operadores Reactivos

Spring Boot
Puzzle

Configuración de Vue

Spring Boot
Puzzle

Entidades JPA

Spring Boot
Test

Integración con Angular

Spring Boot
Test

API Specification

Spring Boot
Test

API Query By Example (QBE)

Spring Boot
Puzzle

Controladores MVC

Spring Boot
Test

Anotaciones y mapeo en JPA

Spring Boot
Puzzle

Consultas JPQL con @Query en Spring Data JPA

Spring Boot
Test

Repositorios Spring Data

Spring Boot
Test

Inyección de dependencias

Spring Boot
Puzzle

Data JPA y Mail Starters

Spring Boot
Test

Configuración de Angular

Spring Boot
Puzzle

Controladores Spring REST

Spring Boot
Test

Configuración de Controladores MVC

Spring Boot
Puzzle

Consultas JPQL con @Query en Spring Data JPA

Spring Boot
Puzzle

Actualizar datos de base de datos

Spring Boot
Test

Verificar token JWT en peticiones

Spring Boot
Test

Login de usuarios

Spring Boot
Test

Integración con React

Spring Boot
Test

Configuración de React

Spring Boot
Puzzle

Todas las lecciones de SpringBoot

Accede a todas las lecciones de SpringBoot y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Introducción A Spring Boot

Spring Boot

Introducción Y Entorno

Spring Boot Starters

Spring Boot

Introducción Y Entorno

Inyección De Dependencias

Spring Boot

Introducción Y Entorno

Controladores Spring Mvc

Spring Boot

Spring Web

Vista En Spring Mvc Con Thymeleaf

Spring Boot

Spring Web

Controladores Spring Rest

Spring Boot

Spring Web

Open Api Y Cómo Agregarlo En Spring Boot

Spring Boot

Spring Web

Servicios En Spring

Spring Boot

Spring Web

Clientes Resttemplate Y Restclient

Spring Boot

Spring Web

Rxjava En Spring Web

Spring Boot

Spring Web

Crear Entidades Jpa

Spring Boot

Persistencia Spring Data

Asociaciones De Entidades Jpa

Spring Boot

Persistencia Spring Data

Repositorios Spring Data

Spring Boot

Persistencia Spring Data

Métodos Find En Repositorios

Spring Boot

Persistencia Spring Data

Inserción De Datos

Spring Boot

Persistencia Spring Data

Actualizar Datos De Base De Datos

Spring Boot

Persistencia Spring Data

Borrar Datos De Base De Datos

Spring Boot

Persistencia Spring Data

Consultas Jpql Con @Query En Spring Data Jpa

Spring Boot

Persistencia Spring Data

Api Query By Example (Qbe)

Spring Boot

Persistencia Spring Data

Api Specification

Spring Boot

Persistencia Spring Data

Repositorios Reactivos

Spring Boot

Persistencia Spring Data

Introducción E Instalación De Apache Kafka

Spring Boot

Mensajería Asíncrona

Crear Proyecto Con Apache Kafka

Spring Boot

Mensajería Asíncrona

Creación De Producers

Spring Boot

Mensajería Asíncrona

Creación De Consumers

Spring Boot

Mensajería Asíncrona

Kafka Streams En Spring Boot

Spring Boot

Mensajería Asíncrona

Introducción A Spring Webflux

Spring Boot

Reactividad Webflux

Spring Data R2dbc

Spring Boot

Reactividad Webflux

Controlador Rest Reactivo Basado En Anotaciones

Spring Boot

Reactividad Webflux

Controlador Rest Reactivo Funcional

Spring Boot

Reactividad Webflux

Operadores Reactivos Básicos

Spring Boot

Reactividad Webflux

Operadores Reactivos Avanzados

Spring Boot

Reactividad Webflux

Cliente Reactivo Webclient

Spring Boot

Reactividad Webflux

Introducción A Spring Security

Spring Boot

Seguridad Con Spring Security

Seguridad Basada En Formulario En Mvc Con Thymeleaf

Spring Boot

Seguridad Con Spring Security

Registro De Usuarios

Spring Boot

Seguridad Con Spring Security

Login De Usuarios

Spring Boot

Seguridad Con Spring Security

Verificar Token Jwt En Peticiones

Spring Boot

Seguridad Con Spring Security

Seguridad Jwt En Api Rest Spring Web

Spring Boot

Seguridad Con Spring Security

Seguridad Jwt En Api Rest Reactiva Spring Webflux

Spring Boot

Seguridad Con Spring Security

Autenticación Y Autorización Con Anotaciones

Spring Boot

Seguridad Con Spring Security

Testing Unitario De Componentes Y Servicios

Spring Boot

Testing Con Spring Test

Testing De Repositorios Spring Data Jpa Y Acceso A Datos Con Spring Test

Spring Boot

Testing Con Spring Test

Testing Controladores Spring Mvc Con Thymeleaf

Spring Boot

Testing Con Spring Test

Testing Controladores Rest Con Json

Spring Boot

Testing Con Spring Test

Testing De Aplicaciones Reactivas Webflux

Spring Boot

Testing Con Spring Test

Testing De Seguridad Spring Security

Spring Boot

Testing Con Spring Test

Testing Con Apache Kafka

Spring Boot

Testing Con Spring Test

Integración Con Angular

Spring Boot

Integración Frontend

Integración Con React

Spring Boot

Integración Frontend

Integración Con Vue

Spring Boot

Integración Frontend

Accede GRATIS a SpringBoot y certifícate

En esta lección

Objetivos de aprendizaje de esta lección

  • Control de flujo
  • Programación temporal
  • Combinación avanzada
  • Errores y reintentos
  • Backpressure
  • Planificación y subprocesos Schedulers