Spring Boot

SpringBoot

Tutorial SpringBoot: RxJava en Spring Web

Aprende programación reactiva utilizando RxJava en Spring Web sobre Spring Boot para comunicarte de forma reactiva con API REST y servicios externos desde backend.

Aprende SpringBoot GRATIS y certifícate

Introducción a RxJava y cómo agregarlo en proyectos Spring Web con maven

RxJava es una biblioteca que implementa el paradigma de programación reactiva siguiendo el modelo de ReactiveX, permitiendo manejar flujos de datos de forma asíncrona y con operaciones funcionales. Aunque Spring Web es tradicionalmente sincrónico y bloqueante, la integración de RxJava en proyectos Spring Boot permite mejorar la eficiencia y escalabilidad de las aplicaciones al gestionar de manera más efectiva recursos y eventos asíncronos.

Para incorporar RxJava en un proyecto Spring Boot 3 con Spring Web, es necesario añadir las dependencias adecuadas en el archivo pom.xml de Maven. Esto permitirá utilizar las clases y métodos de RxJava en los componentes de la aplicación, como servicios y controladores. A continuación, se detallan los pasos para integrar RxJava correctamente.

Paso 1: Añadir la dependencia de RxJava en Maven

En el archivo pom.xml, es necesario incluir la dependencia de RxJava. Se recomienda utilizar la versión más reciente compatible con el proyecto. La configuración mínima sería:

<dependencies>
    <!-- Otras dependencias del proyecto -->
    <dependency>
        <groupId>io.reactivex.rxjava3</groupId>
        <artifactId>rxjava</artifactId>
        <version>3.1.9</version>
    </dependency>
</dependencies>

Es importante verificar que la versión de RxJava sea la adecuada y esté actualizada según las necesidades del proyecto. Una vez agregada la dependencia, Maven descargará automáticamente los archivos necesarios al reconstruir el proyecto.

Paso 2: Importar las clases de RxJava en el código

Con la dependencia añadida, ya es posible utilizar las clases de RxJava en los componentes de la aplicación. Por ejemplo, en un servicio, se pueden importar e implementar tipos como Observable o Single para manejar flujos de datos:

import io.reactivex.rxjava3.core.Observable;
import org.springframework.stereotype.Service;

@Service
public class UsuarioService {

    public Observable<Usuario> obtenerUsuarios() {
        return Observable.create(emitter -> {
            try {
                // Lógica para obtener usuarios
                List<Usuario> usuarios = /* obtener lista de usuarios */;
                for (Usuario usuario : usuarios) {
                    emitter.onNext(usuario);
                }
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }
}

En este ejemplo, se crea un Observable que emite elementos de tipo Usuario, proporcionando una forma reactiva de manejar la información.

Paso 3: Utilizar RxJava en controladores Spring

En los controladores, se pueden definir endpoints que utilicen los tipos de RxJava para devolver respuestas asíncronas. Aunque Spring Web es bloqueante, el uso de RxJava permite gestionar mejor las operaciones asíncronas internamente. Un ejemplo de controlador podría ser:

import io.reactivex.rxjava3.core.Observable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UsuarioController {

    private final UsuarioService usuarioService;

    public UsuarioController(UsuarioService usuarioService) {
        this.usuarioService = usuarioService;
    }

    @GetMapping("/usuarios")
    public Observable<Usuario> obtenerUsuarios() {
        return usuarioService.obtenerUsuarios();
    }
}

Aquí, el endpoint /usuarios devuelve un Observable de usuarios, permitiendo al cliente suscribirse y recibir los datos conforme estén disponibles.

Paso 4: Manejo de suscripciones y flujos de datos

Es fundamental gestionar correctamente las suscripciones y el flujo de datos para evitar fugas de memoria o comportamientos inesperados. RxJava proporciona varios operadores y herramientas para manipular y controlar los flujos, como map, filter o flatMap.

Por ejemplo, para transformar los datos antes de enviarlos al cliente:

@GetMapping("/usuarios/nombres")
public Observable<String> obtenerNombresUsuarios() {
    return usuarioService.obtenerUsuarios()
            .map(Usuario::getNombre);
}

En este caso, se utiliza el operador map para transformar cada objeto Usuario en su nombre, enviando al cliente un flujo de cadenas de texto.

Consideraciones al integrar RxJava en Spring Web

Aunque RxJava aporta beneficios en la gestión de operaciones asíncronas, es importante considerar que Spring Web es un framework bloqueante. Esto significa que, a nivel de servidor, las peticiones seguirán siendo gestionadas por hilos que pueden quedar bloqueados esperando respuestas. Sin embargo, al utilizar RxJava, se puede mejorar la eficiencia interna del manejo de datos y operaciones, especialmente en servicios que realizan llamadas a recursos externos o requieren procesamiento asíncrono.

Además, es crucial manejar correctamente los hilos y planificadores (schedulers) de RxJava para evitar bloqueos innecesarios. Por defecto, las operaciones de RxJava se ejecutan en el hilo donde se creó el flujo, pero se pueden especificar planificadores para cambiar este comportamiento:

import io.reactivex.rxjava3.schedulers.Schedulers;

@GetMapping("/usuarios/async")
public Observable<Usuario> obtenerUsuariosAsync() {
    return usuarioService.obtenerUsuarios()
            .subscribeOn(Schedulers.io());
}

Aquí, se indica que la suscripción al Observable se realizará en un hilo del planificador io(), evitando bloquear el hilo principal.

Integración con otros componentes y buenas prácticas

Al utilizar RxJava en un proyecto Spring Boot 3, se pueden integrar sus flujos con otros componentes del ecosistema Spring, como RestTemplate o RestClient o bases de datos. Por ejemplo, para realizar llamadas a servicios externos de forma reactiva:

public Observable<DatosExternos> obtenerDatosExternos() {
    return Observable.fromCallable(() -> restTemplate.getForObject(url, DatosExternos.class));
}

Se recomienda también centralizar el manejo de errores y excepciones utilizando los operadores proporcionados por RxJava, mejorando la resiliencia de la aplicación.

Tipos de RxJava: Observable, Single, Maybe, Completable, Flowable

En RxJava, los tipos base son fundamentales para modelar y manejar flujos de datos asíncronos. Cada tipo ofrece una abstracción específica que se adapta a distintos escenarios de programación reactiva en aplicaciones Spring Web. A continuación, se detallan los principales tipos: Observable, Single, Maybe, Completable y Flowable, junto con sus características y usos habituales.

Observable

El Observable es el tipo más común en RxJava y representa una secuencia de datos que puede emitir cero o más valores, y posteriormente finalizar con éxito o error. Es idóneo para flujos de datos que generen múltiples elementos a lo largo del tiempo, como lecturas de bases de datos o eventos de usuario.

Ejemplo de uso de Observable en un servicio:

import io.reactivex.rxjava3.core.Observable;
import org.springframework.stereotype.Service;

@Service
public class ProductoService {

    public Observable<Producto> obtenerProductos() {
        return Observable.create(emitter -> {
            try {
                List<Producto> productos = // Lógica para obtener productos
                for (Producto producto : productos) {
                    emitter.onNext(producto);
                }
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }
}

En este ejemplo, se crea un Observable que emite una serie de objetos Producto, uno por uno, y concluye cuando se han emitido todos.

Single

El Single es un tipo especializado que emite exactamente un valor o un error. Es adecuado para operaciones que retornan un único resultado, como consultas específicas a una base de datos o llamadas a servicios que devuelven un solo objeto.

Ejemplo de uso de Single:

import io.reactivex.rxjava3.core.Single;
import org.springframework.stereotype.Service;

@Service
public class UsuarioService {

    public Single<Usuario> obtenerUsuarioPorId(Long id) {
        return Single.create(emitter -> {
            try {
                Usuario usuario = // Lógica para obtener usuario por ID
                if (usuario != null) {
                    emitter.onSuccess(usuario);
                } else {
                    emitter.onError(new NoSuchElementException("Usuario no encontrado"));
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }
}

Aquí, el Single emitirá el Usuario encontrado o un error si no existe.

Maybe

El Maybe combina características de Single y Completable. Puede emitir un valor, no emitir nada o emitir un error. Es útil cuando una operación podría devolver un resultado o no, como al buscar un registro que podría no existir.

Ejemplo de uso de Maybe:

import io.reactivex.rxjava3.core.Maybe;
import org.springframework.stereotype.Service;

@Service
public class PedidoService {

    public Maybe<Pedido> obtenerPedidoPorCodigo(String codigo) {
        return Maybe.create(emitter -> {
            try {
                Pedido pedido = // Lógica para obtener pedido por código
                if (pedido != null) {
                    emitter.onSuccess(pedido);
                } else {
                    emitter.onComplete();
                }
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }
}

En este caso, el Maybe emitirá el Pedido si existe, completará sin emitir si no se encuentra, o emitirá un error en caso de excepción.

Completable

El Completable representa una operación que no emite ningún valor, pero que avisa de su finalización o error. Es adecuado para acciones donde el resultado no es relevante, como operaciones de guardado o actualización que no necesitan devolver datos.

Ejemplo de uso de Completable:

import io.reactivex.rxjava3.core.Completable;
import org.springframework.stereotype.Service;

@Service
public class NotificacionService {

    public Completable enviarNotificacion(Notificacion notificacion) {
        return Completable.create(emitter -> {
            try {
                // Lógica para enviar notificación
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
    }
}

En este ejemplo, el Completable simplemente indica si la notificación se envió correctamente o si ocurrió un error.

Flowable

El Flowable es similar al Observable, pero está diseñado para manejar flujos de datos que pueden emitir una gran cantidad de elementos, gestionando la presión de retroceso (backpressure). Es esencial cuando se trabaja con fuentes que generan datos más rápido de lo que pueden procesarse.

Ejemplo de uso de Flowable con manejo de backpressure:

import io.reactivex.rxjava3.core.Flowable;
import org.springframework.stereotype.Service;

@Service
public class EventoService {

    public Flowable<Evento> suscribirseEventos() {
        return Flowable.create(emitter -> {
            // Lógica para emitir eventos continuamente
            // Se debe gestionar la backpressure correctamente
        }, BackpressureStrategy.BUFFER);
    }
}

El Flowable requiere especificar una estrategia de backpressure, como BUFFER, para determinar cómo manejar el exceso de datos.

Integración en Controladores Spring Web

Al utilizar estos tipos en controladores Spring Web, es posible manejar peticiones y respuestas de forma reactiva, mejorando la eficiencia en operaciones asíncronas.

Ejemplo de controlador con diferentes tipos:

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Completable;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api")
public class ApiController {

    private final ProductoService productoService;
    private final UsuarioService usuarioService;
    private final PedidoService pedidoService;
    private final NotificacionService notificacionService;

    public ApiController(ProductoService productoService,
                         UsuarioService usuarioService,
                         PedidoService pedidoService,
                         NotificacionService notificacionService) {
        this.productoService = productoService;
        this.usuarioService = usuarioService;
        this.pedidoService = pedidoService;
        this.notificacionService = notificacionService;
    }

    @GetMapping("/productos")
    public Observable<Producto> listarProductos() {
        return productoService.obtenerProductos();
    }

    @GetMapping("/usuarios/{id}")
    public Single<Usuario> obtenerUsuario(@PathVariable Long id) {
        return usuarioService.obtenerUsuarioPorId(id);
    }

    @GetMapping("/pedidos/{codigo}")
    public Maybe<Pedido> obtenerPedido(@PathVariable String codigo) {
        return pedidoService.obtenerPedidoPorCodigo(codigo);
    }

    @PostMapping("/notificaciones")
    public Completable enviarNotificacion(@RequestBody Notificacion notificacion) {
        return notificacionService.enviarNotificacion(notificacion);
    }
}

Este controlador muestra cómo integrar los distintos tipos de RxJava en endpoints REST, permitiendo una programación más reactiva y manejando de forma eficiente las operaciones asíncronas.

Elección del tipo adecuado:

La elección entre Observable, Single, Maybe, Completable y Flowable depende del comportamiento esperado y de la naturaleza de los datos:

  • Observable: Para múltiples emisiones de datos, como listas o secuencias continuas.
  • Single: Cuando se espera un único resultado obligatorio.
  • Maybe: Si el resultado puede existir o no, y ambas posibilidades son normales.
  • Completable: Cuando solo interesa el éxito o fallo de una operación sin datos adicionales.
  • Flowable: En casos de grandes volúmenes de datos o cuando la fuente puede producir elementos más rápido de lo que se consumen.

Manejo de hilos y concurrencia:

Es importante tener en cuenta la concurrencia al trabajar con RxJava en aplicaciones Spring Web. Si bien Spring Web es bloqueante, RxJava permite manejar operaciones asíncronas de manera más eficiente. Se pueden utilizar schedulers para controlar en qué hilos se ejecutan las operaciones.

Ejemplo de especificar un scheduler:

import io.reactivex.rxjava3.schedulers.Schedulers;

public Observable<Producto> obtenerProductosAsync() {
    return productoService.obtenerProductos()
            .subscribeOn(Schedulers.io());
}

Al usar Schedulers.io(), se designa que la suscripción se realizará en un hilo de I/O, liberando el hilo principal y mejorando la eficiencia de la aplicación.

Operadores: map, flatMap, filter, zip, combinación y reducción

En RxJava, los operadores son fundamentales para manipular y transformar flujos de datos de manera declarativa y eficiente. Estos operadores permiten aplicar funciones a los datos emitidos, combinar flujos y reducir secuencias a valores únicos. A continuación, se explorarán los operadores más utilizados: map, flatMap, filter, zip, así como técnicas de combinación y reducción en el contexto de aplicaciones Spring Web con RxJava.

Operador map

El operador map transforma los elementos emitidos por un Observable aplicando una función a cada uno de ellos. Es útil cuando se necesita convertir los datos de un tipo a otro o modificar su contenido sin alterar la secuencia.

Ejemplo de uso de map:

import io.reactivex.rxjava3.core.Observable;

public Observable<String> obtenerNombresProductos() {
    return productoService.obtenerProductos()
            .map(producto -> producto.getNombre().toUpperCase());
}

En este ejemplo, se obtiene un Observable de productos y se aplica map para transformar cada Producto en su nombre en mayúsculas. El resultado es un flujo de cadenas de texto con los nombres transformados.

Operador flatMap

El operador flatMap transforma cada elemento emitido en un Observable, y luego aplana las emisiones resultantes en un único flujo. Es especialmente útil cuando cada elemento inicial desencadena una operación asíncrona que devuelve otro Observable.

Ejemplo de uso de flatMap:

import io.reactivex.rxjava3.core.Observable;

public Observable<DetalleProducto> obtenerDetallesProductos() {
    return productoService.obtenerProductos()
            .flatMap(producto -> detalleService.obtenerDetalleProducto(producto.getId()));
}

Aquí, por cada Producto emitido, se obtiene un Observable de DetalleProducto llamando al servicio de detalles. flatMap se encarga de consolidar todos los flujos en uno solo, proporcionando un mecanismo eficaz para manejar operaciones dependientes que son asíncronas.

Operador filter

El operador filter permite filtrar los elementos emitidos por un Observable según una condición especificada. Solo los elementos que cumplen la predicado proporcionado se emiten al suscriptor.

Ejemplo de uso de filter:

import io.reactivex.rxjava3.core.Observable;

public Observable<Producto> obtenerProductosDisponibles() {
    return productoService.obtenerProductos()
            .filter(producto -> producto.isEnStock());
}

En este caso, se filtran los productos para emitir solo aquellos que estén en stock. El operador filter evalúa cada producto y emite únicamente los que satisfacen la condición.

Operador zip

El operador zip combina las emisiones de múltiples Observables en una sola emisión aplicando una función combinadora. Es útil cuando se necesitan sincronizar varios flujos y combinarlos en un único resultado.

Ejemplo de uso de zip:

import io.reactivex.rxjava3.core.Observable;

public Observable<ProductoConCategoria> obtenerProductosConCategoria() {
    Observable<Producto> productos = productoService.obtenerProductos();
    Observable<Categoria> categorias = categoriaService.obtenerCategorias();

    return Observable.zip(productos, categorias, (producto, categoria) -> 
        new ProductoConCategoria(producto, categoria));
}

En este ejemplo, se combinan las emisiones de productos y categorías para crear objetos ProductoConCategoria. El operador zip sincroniza los dos Observables y aplica la función combinadora a los elementos correspondientes.

Combinación de Observables

Además de zip, RxJava ofrece otros operadores para combinar flujos, como merge, concat y combineLatest. Estos operadores permiten gestionar múltiples fuentes de datos de diferentes maneras según el comportamiento deseado.

Ejemplo de uso de merge:

import io.reactivex.rxjava3.core.Observable;

public Observable<Evento> obtenerEventos() {
    Observable<Evento> eventosSistema = sistemaService.obtenerEventosSistema();
    Observable<Evento> eventosUsuario = usuarioService.obtenerEventosUsuario();

    return Observable.merge(eventosSistema, eventosUsuario);
}

Con merge, se combinan las emisiones de ambos Observables en un único flujo de eventos, emitidos conforme lleguen, sin esperar a sincronizarlos.

Reducción de secuencias

La reducción es un proceso para convertir una secuencia de elementos en un único valor acumulado. RxJava proporciona operadores como reduce y collect para este propósito.

Ejemplo de uso de reduce:

import io.reactivex.rxjava3.core.Single;

public Single<Integer> contarProductosDisponibles() {
    return productoService.obtenerProductos()
            .filter(Producto::isEnStock)
            .reduce(0, (contador, producto) -> contador + 1);
}

Este ejemplo cuenta los productos disponibles sumando uno por cada producto filtrado. El operador reduce aplica una función acumulativa a los elementos emitidos, resultando en un Single con el valor final.

Ejemplo de uso de collect:

import io.reactivex.rxjava3.core.Single;
import java.util.ArrayList;
import java.util.List;

public Single<List<String>> obtenerListaNombresProductos() {
    return productoService.obtenerProductos()
            .map(Producto::getNombre)
            .collect(ArrayList::new, List::add);
}

Aquí, se recopilan los nombres de los productos en una lista usando collect, que permite especificar el contenedor y la función para agregar elementos.

Aplicación práctica en Spring Web

Integrar estos operadores en una aplicación Spring Boot 3 con Spring Web mejora la manipulación de datos en servicios y controladores. A continuación, se muestra cómo utilizarlos en un servicio:

import io.reactivex.rxjava3.core.Observable;
import org.springframework.stereotype.Service;

@Service
public class EstadisticaService {

    public Observable<EstadisticaProducto> calcularEstadisticasProductos() {
        return productoService.obtenerProductos()
                .filter(Producto::isEnStock)
                .flatMap(this::obtenerVentasProducto)
                .reduce(new EstadisticaProducto(), this::acumularEstadisticas)
                .toObservable();
    }

    private Observable<Venta> obtenerVentasProducto(Producto producto) {
        return ventaService.obtenerVentasPorProducto(producto.getId());
    }

    private EstadisticaProducto acumularEstadisticas(EstadisticaProducto estadistica, Venta venta) {
        // Lógica para acumular estadísticas
        estadistica.incrementarVentas(venta.getCantidad());
        return estadistica;
    }
}

En este servicio, se combinan varios operadores para filtrar productos, obtener ventas asociadas y reducir la información a unas estadísticas acumuladas.

Consideraciones sobre el manejo de errores

Es esencial manejar los errores de forma adecuada en los flujos de RxJava. Operadores como onErrorReturn, onErrorResumeNext o retry permiten definir comportamientos ante excepciones.

Ejemplo de manejo de errores:

public Observable<Producto> obtenerProductosConRecuperacion() {
    return productoService.obtenerProductos()
            .onErrorResumeNext(Observable.empty());
}

Si ocurre un error al obtener los productos, onErrorResumeNext permite continuar con un Observable alternativo, en este caso vacío.

Uso de operadores en controladores

Los controladores pueden beneficiarse de los operadores para procesar peticiones y formar la respuesta adecuada.

Ejemplo de controlador utilizando operadores:

import io.reactivex.rxjava3.core.Observable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ReporteController {

    private final EstadisticaService estadisticaService;

    public ReporteController(EstadisticaService estadisticaService) {
        this.estadisticaService = estadisticaService;
    }

    @GetMapping("/estadisticas/productos")
    public Observable<EstadisticaProducto> obtenerEstadisticasProductos() {
        return estadisticaService.calcularEstadisticasProductos();
    }
}

El controlador expone un endpoint que devuelve las estadísticas calculadas utilizando los operadores de RxJava, integrándose de manera fluida en la arquitectura de Spring Web.

Uso de RxJava con RestClient para invocar API REST externas

En aplicaciones Spring Boot 3, es común la necesidad de invocar API REST externas de manera eficiente, manejando adecuadamente la concurrencia y las operaciones asíncronas. La combinación de RxJava con el nuevo RestClient de Spring permite realizar estas llamadas de forma reactiva, mejorando el rendimiento y la escalabilidad de la aplicación.

El RestClient, introducido en Spring Framework 6, es el sucesor del tradicional RestTemplate. Este nuevo cliente HTTP ofrece una API más moderna y fluida, facilitando la integración con paradigmas reactivos. Al utilizar RxJava, podemos envolver las llamadas al RestClient en Observables o Singles, permitiendo manejar las respuestas de las API externas de forma asíncrona y reactiva.

Para empezar, es necesario configurar el RestClient en nuestra aplicación. A diferencia del WebClient utilizado en aplicaciones no bloqueantes con WebFlux, el RestClient está diseñado para ser utilizado en aplicaciones bloqueantes con Spring Web. A continuación, se muestra cómo crear una instancia del RestClient:

import org.springframework.web.client.RestClient;

RestClient restClient = RestClient.builder()
    .defaultHeader("Accept", "application/json")
    .build();

Una vez configurado el RestClient, podemos integrarlo con RxJava para realizar llamadas a servicios externos y procesar las respuestas de manera reactiva. Por ejemplo, si deseamos obtener información de un servicio externo que proporciona datos de usuarios, podemos crear un método que devuelva un Observable de tipo Usuario:

import io.reactivex.rxjava3.core.Observable;

public Observable<Usuario> obtenerUsuarioPorId(Long id) {
    return Observable.create(emitter -> {
        try {
            Usuario usuario = restClient.get()
                .uri("https://api.ejemplo.com/usuarios/{id}", id)
                .retrieve()
                .body(Usuario.class);
            emitter.onNext(usuario);
            emitter.onComplete();
        } catch (Exception e) {
            emitter.onError(e);
        }
    });
}

En este ejemplo, se utiliza el método create de Observable para envolver la llamada al RestClient. El emitter se encarga de emitir el resultado o el error según corresponda. De esta forma, quienes consuman este método podrán suscribirse al Observable y manejar la respuesta de forma asíncrona y no bloqueante.

Es importante manejar adecuadamente los hilos al trabajar con operaciones de I/O. Podemos especificar en qué scheduler se ejecutará el observable utilizando el método subscribeOn de RxJava. Por ejemplo:

return Observable.create(emitter -> {
    // Lógica de llamada al RestClient
}).subscribeOn(Schedulers.io());

El Schedulers.io() está optimizado para operaciones de I/O y permite que las llamadas al RestClient no bloqueen el hilo principal. De esta manera, mejoramos la concurrencia y la capacidad de respuesta de nuestra aplicación.

Además de los Observables, podemos utilizar otros tipos de RxJava como Single o Flowable, dependiendo de las necesidades. Si sabemos que la respuesta será un único elemento, podemos optar por un Single:

import io.reactivex.rxjava3.core.Single;

public Single<Usuario> obtenerUsuarioPorId(Long id) {
    return Single.fromCallable(() -> restClient.get()
        .uri("https://api.ejemplo.com/usuarios/{id}", id)
        .retrieve()
        .body(Usuario.class)
    ).subscribeOn(Schedulers.io());
}

En este caso, utilizamos el método fromCallable de Single, que simplifica la creación del flujo al manejar automáticamente las excepciones y emitir un error en caso de que ocurra alguna. Esto hace que el código sea más limpio y conciso.

Para realizar operaciones más complejas, como obtener una lista de elementos o manejar paginación, podemos aprovechar los operadores de RxJava. Por ejemplo, para obtener todos los usuarios de una API externa que proporciona paginación:

public Observable<Usuario> obtenerTodosLosUsuarios() {
    return Observable.create(emitter -> {
        int pagina = 1;
        boolean hayMasPaginas = true;

        while (hayMasPaginas && !emitter.isDisposed()) {
            List<Usuario> usuarios = restClient.get()
                .uri("https://api.ejemplo.com/usuarios?page={pagina}", pagina)
                .retrieve()
                .body(new ParameterizedTypeReference<List<Usuario>>() {});
            
            if (usuarios != null && !usuarios.isEmpty()) {
                for (Usuario usuario : usuarios) {
                    emitter.onNext(usuario);
                }
                pagina++;
            } else {
                hayMasPaginas = false;
            }
        }

        emitter.onComplete();
    }).subscribeOn(Schedulers.io());
}

Aquí, el Observable emite cada Usuario obtenido de las diferentes páginas de la API externa. Se controla la paginación mediante un bucle, y se verifica si el emitter ha sido disposed para evitar emitir después de que el suscriptor se haya desuscrito.

Otra ventaja de utilizar RxJava con RestClient es la posibilidad de combinar múltiples llamadas a API externas de forma eficiente. Por ejemplo, si necesitamos obtener información de varias fuentes y combinar los resultados:

public Single<DatosCombinados> obtenerDatosCombinados(Long idUsuario) {
    Single<Usuario> usuarioSingle = obtenerUsuarioPorId(idUsuario);
    Single<List<Post>> postsSingle = obtenerPostsPorUsuario(idUsuario);

    return usuarioSingle.zipWith(postsSingle, (usuario, posts) -> 
        new DatosCombinados(usuario, posts)
    );
}

En este ejemplo, utilizamos el operador zipWith para combinar las respuestas de dos llamadas diferentes. Esto permite manejar de manera elegante y concisa la unificación de datos provenientes de múltiples servicios.

Además, RxJava facilita el manejo de errores y reintentos en llamadas a servicios externos. Podemos utilizar operadores como retry o onErrorResumeNext para definir comportamientos ante fallos:

return obtenerUsuarioPorId(id)
    .retry(3)
    .onErrorResumeNext(error -> {
        // Lógica alternativa o valor por defecto
        return Single.just(new UsuarioPorDefecto());
    });

Con retry(3) indicamos que se reintente la operación hasta tres veces en caso de fallo. Si tras los reintentos la operación sigue fallando, onErrorResumeNext permite proporcionar una alternativa, como devolver un usuario por defecto.

Al integrar RxJava con RestClient, es importante también considerar la gestión de recursos y evitar potenciales fugas de memoria. Asegurarse de que las suscripciones se manejen correctamente y que los observers se dispongan cuando ya no sean necesarios es clave para mantener la aplicación eficiente.

Al utilizar RxJava en aplicaciones Spring Boot 3 con Spring Web, potenciamos la capacidad de realizar llamadas a API REST externas de manera reactiva. Esto mejora la eficiencia y la escalabilidad, permitiendo manejar de forma óptima las operaciones asíncronas y la manipulación de flujos de datos.

Ejemplo RxJava: consumir datos de Fake Store API con RestClient

1. Crear la clase Product

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Product {
    private int id;
    private String title;
    private double price;
    private String category;
    private String description;
    private String image;
}

2. Servicio CRUD usando RestClient y RxJava

import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.RestClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;

import java.util.List;

@Service
public class ProductService {

    private final RestClient restClient;

    public ProductService() {
        this.restClient = RestClient.builder()
                .baseUrl("https://fakestoreapi.com/products")
                .build();
    }

    // Obtener todos los productos
    public Single<List<Product>> getAllProducts() {
        return Single.fromCallable(() ->
                restClient.get()
                        .retrieve()
                        .bodyToFlux(Product.class)
                        .collectList()
                        .block()
        ).subscribeOn(Schedulers.io());
    }

    // Obtener un producto por ID
    public Single<Product> getProductById(int productId) {
        return Single.fromCallable(() ->
                restClient.get()
                        .uri("/{id}", productId)
                        .retrieve()
                        .bodyToMono(Product.class)
                        .block()
        ).subscribeOn(Schedulers.io());
    }

    // Crear un nuevo producto
    public Single<Product> createProduct(Product product) {
        return Single.fromCallable(() ->
                restClient.post()
                        .body(product)
                        .retrieve()
                        .bodyToMono(Product.class)
                        .block()
        ).subscribeOn(Schedulers.io());
    }

    // Actualizar un producto por ID (PUT)
    public Single<Product> updateProduct(int productId, Product product) {
        return Single.fromCallable(() ->
                restClient.put()
                        .uri("/{id}", productId)
                        .body(product)
                        .retrieve()
                        .bodyToMono(Product.class)
                        .block()
        ).subscribeOn(Schedulers.io());
    }

    // Eliminar un producto por ID
    public Single<Void> deleteProduct(int productId) {
        return Single.fromCallable(() -> {
            restClient.delete()
                    .uri("/{id}", productId)
                    .retrieve()
                    .toBodilessEntity()
                    .block();
            return null;
        }).subscribeOn(Schedulers.io());
    }
}

3. Controlador para exponer las operaciones CRUD

import io.reactivex.rxjava3.core.Single;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/products")
public class ProductController {

    private final ProductService productService;

    public ProductController(ProductService productService) {
        this.productService = productService;
    }

    // Obtener todos los productos
    @GetMapping
    public Single<ResponseEntity<List<Product>>> getAllProducts() {
        return productService.getAllProducts()
                .map(ResponseEntity::ok);
    }

    // Obtener un producto por ID
    @GetMapping("/{id}")
    public Single<ResponseEntity<Product>> getProductById(@PathVariable int id) {
        return productService.getProductById(id)
                .map(ResponseEntity::ok)
                .onErrorReturn(throwable -> ResponseEntity.notFound().build());
    }

    // Crear un nuevo producto
    @PostMapping
    public Single<ResponseEntity<Product>> createProduct(@RequestBody Product product) {
        return productService.createProduct(product)
                .map(ResponseEntity::ok);
    }

    // Actualizar un producto por ID
    @PutMapping("/{id}")
    public Single<ResponseEntity<Product>> updateProduct(@PathVariable int id, @RequestBody Product product) {
        return productService.updateProduct(id, product)
                .map(ResponseEntity::ok);
    }

    // Eliminar un producto por ID
    @DeleteMapping("/{id}")
    public Single<ResponseEntity<Void>> deleteProduct(@PathVariable int id) {
        return productService.deleteProduct(id)
                .map(v -> ResponseEntity.noContent().build());
    }
}

4. Prueba de las operaciones

Ejemplo de JSON para crear producto

{
    "title": "Producto de ejemplo",
    "price": 29.99,
    "category": "jewelery",
    "description": "Un producto de prueba con descripción",
    "image": "https://example.com/product.jpg"
}

Rutas CRUD

  • GET /products: Lista todos los productos.
  • GET /products/{id}: Obtiene un producto específico por su ID.
  • POST /products: Crea un nuevo producto con los datos enviados en el cuerpo.
  • PUT /products/{id}: Actualiza un producto existente con los datos enviados en el cuerpo.
  • DELETE /products/{id}: Elimina un producto por su ID.
Aprende SpringBoot GRATIS online

Ejercicios de esta lección RxJava en Spring Web

Evalúa tus conocimientos de esta lección RxJava en Spring Web con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.

API Query By Example (QBE)

Spring Boot
Test

Identificadores y relaciones JPA

Spring Boot
Puzzle

Borrar datos de base de datos

Spring Boot
Test

Web y Test Starters

Spring Boot
Puzzle

Métodos find en repositorios

Spring Boot
Test

Controladores Spring MVC

Spring Boot
Código

Inserción de datos

Spring Boot
Test

CRUD Customers Spring MVC + Spring Data JPA

Spring Boot
Proyecto

Backend API REST con Spring Boot

Spring Boot
Proyecto

Controladores Spring REST

Spring Boot
Código

Uso de Spring con Thymeleaf

Spring Boot
Puzzle

API Specification

Spring Boot
Puzzle

Registro de usuarios

Spring Boot
Test

Crear entidades JPA

Spring Boot
Código

Asociaciones en JPA

Spring Boot
Test

Asociaciones de entidades JPA

Spring Boot
Código

Integración con Vue

Spring Boot
Test

Consultas JPQL

Spring Boot
Código

Open API y cómo agregarlo en Spring Boot

Spring Boot
Puzzle

Uso de Controladores REST

Spring Boot
Puzzle

Repositorios reactivos

Spring Boot
Test

Inyección de dependencias

Spring Boot
Test

Introducción a Spring Boot

Spring Boot
Test

CRUD y JPA Repository

Spring Boot
Puzzle

Inyección de dependencias

Spring Boot
Código

Vista en Spring MVC con Thymeleaf

Spring Boot
Test

Servicios en Spring

Spring Boot
Código

Operadores Reactivos

Spring Boot
Puzzle

Configuración de Vue

Spring Boot
Puzzle

Entidades JPA

Spring Boot
Test

Integración con Angular

Spring Boot
Test

API Specification

Spring Boot
Test

API Query By Example (QBE)

Spring Boot
Puzzle

Controladores MVC

Spring Boot
Test

Anotaciones y mapeo en JPA

Spring Boot
Puzzle

Consultas JPQL con @Query en Spring Data JPA

Spring Boot
Test

Repositorios Spring Data

Spring Boot
Test

Inyección de dependencias

Spring Boot
Puzzle

Data JPA y Mail Starters

Spring Boot
Test

Configuración de Angular

Spring Boot
Puzzle

Controladores Spring REST

Spring Boot
Test

Configuración de Controladores MVC

Spring Boot
Puzzle

Consultas JPQL con @Query en Spring Data JPA

Spring Boot
Puzzle

Actualizar datos de base de datos

Spring Boot
Test

Verificar token JWT en peticiones

Spring Boot
Test

Login de usuarios

Spring Boot
Test

Integración con React

Spring Boot
Test

Configuración de React

Spring Boot
Puzzle

Todas las lecciones de SpringBoot

Accede a todas las lecciones de SpringBoot y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Introducción A Spring Boot

Spring Boot

Introducción Y Entorno

Spring Boot Starters

Spring Boot

Introducción Y Entorno

Inyección De Dependencias

Spring Boot

Introducción Y Entorno

Controladores Spring Mvc

Spring Boot

Spring Web

Vista En Spring Mvc Con Thymeleaf

Spring Boot

Spring Web

Controladores Spring Rest

Spring Boot

Spring Web

Open Api Y Cómo Agregarlo En Spring Boot

Spring Boot

Spring Web

Servicios En Spring

Spring Boot

Spring Web

Clientes Resttemplate Y Restclient

Spring Boot

Spring Web

Rxjava En Spring Web

Spring Boot

Spring Web

Crear Entidades Jpa

Spring Boot

Persistencia Spring Data

Asociaciones De Entidades Jpa

Spring Boot

Persistencia Spring Data

Repositorios Spring Data

Spring Boot

Persistencia Spring Data

Métodos Find En Repositorios

Spring Boot

Persistencia Spring Data

Inserción De Datos

Spring Boot

Persistencia Spring Data

Actualizar Datos De Base De Datos

Spring Boot

Persistencia Spring Data

Borrar Datos De Base De Datos

Spring Boot

Persistencia Spring Data

Consultas Jpql Con @Query En Spring Data Jpa

Spring Boot

Persistencia Spring Data

Api Query By Example (Qbe)

Spring Boot

Persistencia Spring Data

Api Specification

Spring Boot

Persistencia Spring Data

Repositorios Reactivos

Spring Boot

Persistencia Spring Data

Introducción E Instalación De Apache Kafka

Spring Boot

Mensajería Asíncrona

Crear Proyecto Con Apache Kafka

Spring Boot

Mensajería Asíncrona

Creación De Producers

Spring Boot

Mensajería Asíncrona

Creación De Consumers

Spring Boot

Mensajería Asíncrona

Kafka Streams En Spring Boot

Spring Boot

Mensajería Asíncrona

Introducción A Spring Webflux

Spring Boot

Reactividad Webflux

Spring Data R2dbc

Spring Boot

Reactividad Webflux

Controlador Rest Reactivo Basado En Anotaciones

Spring Boot

Reactividad Webflux

Controlador Rest Reactivo Funcional

Spring Boot

Reactividad Webflux

Operadores Reactivos Básicos

Spring Boot

Reactividad Webflux

Operadores Reactivos Avanzados

Spring Boot

Reactividad Webflux

Cliente Reactivo Webclient

Spring Boot

Reactividad Webflux

Introducción A Spring Security

Spring Boot

Seguridad Con Spring Security

Seguridad Basada En Formulario En Mvc Con Thymeleaf

Spring Boot

Seguridad Con Spring Security

Registro De Usuarios

Spring Boot

Seguridad Con Spring Security

Login De Usuarios

Spring Boot

Seguridad Con Spring Security

Verificar Token Jwt En Peticiones

Spring Boot

Seguridad Con Spring Security

Seguridad Jwt En Api Rest Spring Web

Spring Boot

Seguridad Con Spring Security

Seguridad Jwt En Api Rest Reactiva Spring Webflux

Spring Boot

Seguridad Con Spring Security

Autenticación Y Autorización Con Anotaciones

Spring Boot

Seguridad Con Spring Security

Testing Unitario De Componentes Y Servicios

Spring Boot

Testing Con Spring Test

Testing De Repositorios Spring Data Jpa

Spring Boot

Testing Con Spring Test

Testing Controladores Spring Mvc Con Thymeleaf

Spring Boot

Testing Con Spring Test

Testing Controladores Rest Con Json

Spring Boot

Testing Con Spring Test

Testing De Aplicaciones Reactivas Webflux

Spring Boot

Testing Con Spring Test

Testing De Seguridad Spring Security

Spring Boot

Testing Con Spring Test

Testing Con Apache Kafka

Spring Boot

Testing Con Spring Test

Integración Con Angular

Spring Boot

Integración Frontend

Integración Con React

Spring Boot

Integración Frontend

Integración Con Vue

Spring Boot

Integración Frontend

Accede GRATIS a SpringBoot y certifícate

En esta lección

Objetivos de aprendizaje de esta lección

  • Comprender qué es la programación reactiva
  • Saber añadir la librería RxJava
  • Conocer los tipos existentes en RxJava
  • Conocer los operadores de RxJava