SpringBoot
Tutorial SpringBoot: RxJava en Spring Web
Aprende programación reactiva utilizando RxJava en Spring Web sobre Spring Boot para comunicarte de forma reactiva con API REST y servicios externos desde backend.
Aprende SpringBoot GRATIS y certifícateIntroducción a RxJava y cómo agregarlo en proyectos Spring Web con maven
RxJava es una biblioteca que implementa el paradigma de programación reactiva siguiendo el modelo de ReactiveX, permitiendo manejar flujos de datos de forma asíncrona y con operaciones funcionales. Aunque Spring Web es tradicionalmente sincrónico y bloqueante, la integración de RxJava en proyectos Spring Boot permite mejorar la eficiencia y escalabilidad de las aplicaciones al gestionar de manera más efectiva recursos y eventos asíncronos.
Para incorporar RxJava en un proyecto Spring Boot 3 con Spring Web, es necesario añadir las dependencias adecuadas en el archivo pom.xml de Maven. Esto permitirá utilizar las clases y métodos de RxJava en los componentes de la aplicación, como servicios y controladores. A continuación, se detallan los pasos para integrar RxJava correctamente.
Paso 1: Añadir la dependencia de RxJava en Maven
En el archivo pom.xml, es necesario incluir la dependencia de RxJava. Se recomienda utilizar la versión más reciente compatible con el proyecto. La configuración mínima sería:
<dependencies>
<!-- Otras dependencias del proyecto -->
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.9</version>
</dependency>
</dependencies>
Es importante verificar que la versión de RxJava sea la adecuada y esté actualizada según las necesidades del proyecto. Una vez agregada la dependencia, Maven descargará automáticamente los archivos necesarios al reconstruir el proyecto.
Paso 2: Importar las clases de RxJava en el código
Con la dependencia añadida, ya es posible utilizar las clases de RxJava en los componentes de la aplicación. Por ejemplo, en un servicio, se pueden importar e implementar tipos como Observable o Single para manejar flujos de datos:
import io.reactivex.rxjava3.core.Observable;
import org.springframework.stereotype.Service;
@Service
public class UsuarioService {
public Observable<Usuario> obtenerUsuarios() {
return Observable.create(emitter -> {
try {
// Lógica para obtener usuarios
List<Usuario> usuarios = /* obtener lista de usuarios */;
for (Usuario usuario : usuarios) {
emitter.onNext(usuario);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
}
}
En este ejemplo, se crea un Observable que emite elementos de tipo Usuario, proporcionando una forma reactiva de manejar la información.
Paso 3: Utilizar RxJava en controladores Spring
En los controladores, se pueden definir endpoints que utilicen los tipos de RxJava para devolver respuestas asíncronas. Aunque Spring Web es bloqueante, el uso de RxJava permite gestionar mejor las operaciones asíncronas internamente. Un ejemplo de controlador podría ser:
import io.reactivex.rxjava3.core.Observable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class UsuarioController {
private final UsuarioService usuarioService;
public UsuarioController(UsuarioService usuarioService) {
this.usuarioService = usuarioService;
}
@GetMapping("/usuarios")
public Observable<Usuario> obtenerUsuarios() {
return usuarioService.obtenerUsuarios();
}
}
Aquí, el endpoint /usuarios devuelve un Observable de usuarios, permitiendo al cliente suscribirse y recibir los datos conforme estén disponibles.
Paso 4: Manejo de suscripciones y flujos de datos
Es fundamental gestionar correctamente las suscripciones y el flujo de datos para evitar fugas de memoria o comportamientos inesperados. RxJava proporciona varios operadores y herramientas para manipular y controlar los flujos, como map, filter o flatMap.
Por ejemplo, para transformar los datos antes de enviarlos al cliente:
@GetMapping("/usuarios/nombres")
public Observable<String> obtenerNombresUsuarios() {
return usuarioService.obtenerUsuarios()
.map(Usuario::getNombre);
}
En este caso, se utiliza el operador map para transformar cada objeto Usuario en su nombre, enviando al cliente un flujo de cadenas de texto.
Consideraciones al integrar RxJava en Spring Web
Aunque RxJava aporta beneficios en la gestión de operaciones asíncronas, es importante considerar que Spring Web es un framework bloqueante. Esto significa que, a nivel de servidor, las peticiones seguirán siendo gestionadas por hilos que pueden quedar bloqueados esperando respuestas. Sin embargo, al utilizar RxJava, se puede mejorar la eficiencia interna del manejo de datos y operaciones, especialmente en servicios que realizan llamadas a recursos externos o requieren procesamiento asíncrono.
Además, es crucial manejar correctamente los hilos y planificadores (schedulers) de RxJava para evitar bloqueos innecesarios. Por defecto, las operaciones de RxJava se ejecutan en el hilo donde se creó el flujo, pero se pueden especificar planificadores para cambiar este comportamiento:
import io.reactivex.rxjava3.schedulers.Schedulers;
@GetMapping("/usuarios/async")
public Observable<Usuario> obtenerUsuariosAsync() {
return usuarioService.obtenerUsuarios()
.subscribeOn(Schedulers.io());
}
Aquí, se indica que la suscripción al Observable se realizará en un hilo del planificador io(), evitando bloquear el hilo principal.
Integración con otros componentes y buenas prácticas
Al utilizar RxJava en un proyecto Spring Boot 3, se pueden integrar sus flujos con otros componentes del ecosistema Spring, como RestTemplate o RestClient o bases de datos. Por ejemplo, para realizar llamadas a servicios externos de forma reactiva:
public Observable<DatosExternos> obtenerDatosExternos() {
return Observable.fromCallable(() -> restTemplate.getForObject(url, DatosExternos.class));
}
Se recomienda también centralizar el manejo de errores y excepciones utilizando los operadores proporcionados por RxJava, mejorando la resiliencia de la aplicación.
Tipos de RxJava: Observable, Single, Maybe, Completable, Flowable
En RxJava, los tipos base son fundamentales para modelar y manejar flujos de datos asíncronos. Cada tipo ofrece una abstracción específica que se adapta a distintos escenarios de programación reactiva en aplicaciones Spring Web. A continuación, se detallan los principales tipos: Observable, Single, Maybe, Completable y Flowable, junto con sus características y usos habituales.
Observable
El Observable es el tipo más común en RxJava y representa una secuencia de datos que puede emitir cero o más valores, y posteriormente finalizar con éxito o error. Es idóneo para flujos de datos que generen múltiples elementos a lo largo del tiempo, como lecturas de bases de datos o eventos de usuario.
Ejemplo de uso de Observable en un servicio:
import io.reactivex.rxjava3.core.Observable;
import org.springframework.stereotype.Service;
@Service
public class ProductoService {
public Observable<Producto> obtenerProductos() {
return Observable.create(emitter -> {
try {
List<Producto> productos = // Lógica para obtener productos
for (Producto producto : productos) {
emitter.onNext(producto);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
}
}
En este ejemplo, se crea un Observable que emite una serie de objetos Producto, uno por uno, y concluye cuando se han emitido todos.
Single
El Single es un tipo especializado que emite exactamente un valor o un error. Es adecuado para operaciones que retornan un único resultado, como consultas específicas a una base de datos o llamadas a servicios que devuelven un solo objeto.
Ejemplo de uso de Single:
import io.reactivex.rxjava3.core.Single;
import org.springframework.stereotype.Service;
@Service
public class UsuarioService {
public Single<Usuario> obtenerUsuarioPorId(Long id) {
return Single.create(emitter -> {
try {
Usuario usuario = // Lógica para obtener usuario por ID
if (usuario != null) {
emitter.onSuccess(usuario);
} else {
emitter.onError(new NoSuchElementException("Usuario no encontrado"));
}
} catch (Exception e) {
emitter.onError(e);
}
});
}
}
Aquí, el Single emitirá el Usuario encontrado o un error si no existe.
Maybe
El Maybe combina características de Single y Completable. Puede emitir un valor, no emitir nada o emitir un error. Es útil cuando una operación podría devolver un resultado o no, como al buscar un registro que podría no existir.
Ejemplo de uso de Maybe:
import io.reactivex.rxjava3.core.Maybe;
import org.springframework.stereotype.Service;
@Service
public class PedidoService {
public Maybe<Pedido> obtenerPedidoPorCodigo(String codigo) {
return Maybe.create(emitter -> {
try {
Pedido pedido = // Lógica para obtener pedido por código
if (pedido != null) {
emitter.onSuccess(pedido);
} else {
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}
});
}
}
En este caso, el Maybe emitirá el Pedido si existe, completará sin emitir si no se encuentra, o emitirá un error en caso de excepción.
Completable
El Completable representa una operación que no emite ningún valor, pero que avisa de su finalización o error. Es adecuado para acciones donde el resultado no es relevante, como operaciones de guardado o actualización que no necesitan devolver datos.
Ejemplo de uso de Completable:
import io.reactivex.rxjava3.core.Completable;
import org.springframework.stereotype.Service;
@Service
public class NotificacionService {
public Completable enviarNotificacion(Notificacion notificacion) {
return Completable.create(emitter -> {
try {
// Lógica para enviar notificación
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
}
}
En este ejemplo, el Completable simplemente indica si la notificación se envió correctamente o si ocurrió un error.
Flowable
El Flowable es similar al Observable, pero está diseñado para manejar flujos de datos que pueden emitir una gran cantidad de elementos, gestionando la presión de retroceso (backpressure). Es esencial cuando se trabaja con fuentes que generan datos más rápido de lo que pueden procesarse.
Ejemplo de uso de Flowable con manejo de backpressure:
import io.reactivex.rxjava3.core.Flowable;
import org.springframework.stereotype.Service;
@Service
public class EventoService {
public Flowable<Evento> suscribirseEventos() {
return Flowable.create(emitter -> {
// Lógica para emitir eventos continuamente
// Se debe gestionar la backpressure correctamente
}, BackpressureStrategy.BUFFER);
}
}
El Flowable requiere especificar una estrategia de backpressure, como BUFFER, para determinar cómo manejar el exceso de datos.
Integración en Controladores Spring Web
Al utilizar estos tipos en controladores Spring Web, es posible manejar peticiones y respuestas de forma reactiva, mejorando la eficiencia en operaciones asíncronas.
Ejemplo de controlador con diferentes tipos:
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Completable;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api")
public class ApiController {
private final ProductoService productoService;
private final UsuarioService usuarioService;
private final PedidoService pedidoService;
private final NotificacionService notificacionService;
public ApiController(ProductoService productoService,
UsuarioService usuarioService,
PedidoService pedidoService,
NotificacionService notificacionService) {
this.productoService = productoService;
this.usuarioService = usuarioService;
this.pedidoService = pedidoService;
this.notificacionService = notificacionService;
}
@GetMapping("/productos")
public Observable<Producto> listarProductos() {
return productoService.obtenerProductos();
}
@GetMapping("/usuarios/{id}")
public Single<Usuario> obtenerUsuario(@PathVariable Long id) {
return usuarioService.obtenerUsuarioPorId(id);
}
@GetMapping("/pedidos/{codigo}")
public Maybe<Pedido> obtenerPedido(@PathVariable String codigo) {
return pedidoService.obtenerPedidoPorCodigo(codigo);
}
@PostMapping("/notificaciones")
public Completable enviarNotificacion(@RequestBody Notificacion notificacion) {
return notificacionService.enviarNotificacion(notificacion);
}
}
Este controlador muestra cómo integrar los distintos tipos de RxJava en endpoints REST, permitiendo una programación más reactiva y manejando de forma eficiente las operaciones asíncronas.
Elección del tipo adecuado:
La elección entre Observable, Single, Maybe, Completable y Flowable depende del comportamiento esperado y de la naturaleza de los datos:
- Observable: Para múltiples emisiones de datos, como listas o secuencias continuas.
- Single: Cuando se espera un único resultado obligatorio.
- Maybe: Si el resultado puede existir o no, y ambas posibilidades son normales.
- Completable: Cuando solo interesa el éxito o fallo de una operación sin datos adicionales.
- Flowable: En casos de grandes volúmenes de datos o cuando la fuente puede producir elementos más rápido de lo que se consumen.
Manejo de hilos y concurrencia:
Es importante tener en cuenta la concurrencia al trabajar con RxJava en aplicaciones Spring Web. Si bien Spring Web es bloqueante, RxJava permite manejar operaciones asíncronas de manera más eficiente. Se pueden utilizar schedulers para controlar en qué hilos se ejecutan las operaciones.
Ejemplo de especificar un scheduler:
import io.reactivex.rxjava3.schedulers.Schedulers;
public Observable<Producto> obtenerProductosAsync() {
return productoService.obtenerProductos()
.subscribeOn(Schedulers.io());
}
Al usar Schedulers.io(), se designa que la suscripción se realizará en un hilo de I/O, liberando el hilo principal y mejorando la eficiencia de la aplicación.
Operadores: map, flatMap, filter, zip, combinación y reducción
En RxJava, los operadores son fundamentales para manipular y transformar flujos de datos de manera declarativa y eficiente. Estos operadores permiten aplicar funciones a los datos emitidos, combinar flujos y reducir secuencias a valores únicos. A continuación, se explorarán los operadores más utilizados: map, flatMap, filter, zip, así como técnicas de combinación y reducción en el contexto de aplicaciones Spring Web con RxJava.
Operador map
El operador map transforma los elementos emitidos por un Observable aplicando una función a cada uno de ellos. Es útil cuando se necesita convertir los datos de un tipo a otro o modificar su contenido sin alterar la secuencia.
Ejemplo de uso de map:
import io.reactivex.rxjava3.core.Observable;
public Observable<String> obtenerNombresProductos() {
return productoService.obtenerProductos()
.map(producto -> producto.getNombre().toUpperCase());
}
En este ejemplo, se obtiene un Observable de productos y se aplica map para transformar cada Producto en su nombre en mayúsculas. El resultado es un flujo de cadenas de texto con los nombres transformados.
Operador flatMap
El operador flatMap transforma cada elemento emitido en un Observable, y luego aplana las emisiones resultantes en un único flujo. Es especialmente útil cuando cada elemento inicial desencadena una operación asíncrona que devuelve otro Observable.
Ejemplo de uso de flatMap:
import io.reactivex.rxjava3.core.Observable;
public Observable<DetalleProducto> obtenerDetallesProductos() {
return productoService.obtenerProductos()
.flatMap(producto -> detalleService.obtenerDetalleProducto(producto.getId()));
}
Aquí, por cada Producto emitido, se obtiene un Observable de DetalleProducto llamando al servicio de detalles. flatMap se encarga de consolidar todos los flujos en uno solo, proporcionando un mecanismo eficaz para manejar operaciones dependientes que son asíncronas.
Operador filter
El operador filter permite filtrar los elementos emitidos por un Observable según una condición especificada. Solo los elementos que cumplen la predicado proporcionado se emiten al suscriptor.
Ejemplo de uso de filter:
import io.reactivex.rxjava3.core.Observable;
public Observable<Producto> obtenerProductosDisponibles() {
return productoService.obtenerProductos()
.filter(producto -> producto.isEnStock());
}
En este caso, se filtran los productos para emitir solo aquellos que estén en stock. El operador filter evalúa cada producto y emite únicamente los que satisfacen la condición.
Operador zip
El operador zip combina las emisiones de múltiples Observables en una sola emisión aplicando una función combinadora. Es útil cuando se necesitan sincronizar varios flujos y combinarlos en un único resultado.
Ejemplo de uso de zip:
import io.reactivex.rxjava3.core.Observable;
public Observable<ProductoConCategoria> obtenerProductosConCategoria() {
Observable<Producto> productos = productoService.obtenerProductos();
Observable<Categoria> categorias = categoriaService.obtenerCategorias();
return Observable.zip(productos, categorias, (producto, categoria) ->
new ProductoConCategoria(producto, categoria));
}
En este ejemplo, se combinan las emisiones de productos y categorías para crear objetos ProductoConCategoria. El operador zip sincroniza los dos Observables y aplica la función combinadora a los elementos correspondientes.
Combinación de Observables
Además de zip, RxJava ofrece otros operadores para combinar flujos, como merge, concat y combineLatest. Estos operadores permiten gestionar múltiples fuentes de datos de diferentes maneras según el comportamiento deseado.
Ejemplo de uso de merge:
import io.reactivex.rxjava3.core.Observable;
public Observable<Evento> obtenerEventos() {
Observable<Evento> eventosSistema = sistemaService.obtenerEventosSistema();
Observable<Evento> eventosUsuario = usuarioService.obtenerEventosUsuario();
return Observable.merge(eventosSistema, eventosUsuario);
}
Con merge, se combinan las emisiones de ambos Observables en un único flujo de eventos, emitidos conforme lleguen, sin esperar a sincronizarlos.
Reducción de secuencias
La reducción es un proceso para convertir una secuencia de elementos en un único valor acumulado. RxJava proporciona operadores como reduce y collect para este propósito.
Ejemplo de uso de reduce:
import io.reactivex.rxjava3.core.Single;
public Single<Integer> contarProductosDisponibles() {
return productoService.obtenerProductos()
.filter(Producto::isEnStock)
.reduce(0, (contador, producto) -> contador + 1);
}
Este ejemplo cuenta los productos disponibles sumando uno por cada producto filtrado. El operador reduce aplica una función acumulativa a los elementos emitidos, resultando en un Single con el valor final.
Ejemplo de uso de collect:
import io.reactivex.rxjava3.core.Single;
import java.util.ArrayList;
import java.util.List;
public Single<List<String>> obtenerListaNombresProductos() {
return productoService.obtenerProductos()
.map(Producto::getNombre)
.collect(ArrayList::new, List::add);
}
Aquí, se recopilan los nombres de los productos en una lista usando collect, que permite especificar el contenedor y la función para agregar elementos.
Aplicación práctica en Spring Web
Integrar estos operadores en una aplicación Spring Boot 3 con Spring Web mejora la manipulación de datos en servicios y controladores. A continuación, se muestra cómo utilizarlos en un servicio:
import io.reactivex.rxjava3.core.Observable;
import org.springframework.stereotype.Service;
@Service
public class EstadisticaService {
public Observable<EstadisticaProducto> calcularEstadisticasProductos() {
return productoService.obtenerProductos()
.filter(Producto::isEnStock)
.flatMap(this::obtenerVentasProducto)
.reduce(new EstadisticaProducto(), this::acumularEstadisticas)
.toObservable();
}
private Observable<Venta> obtenerVentasProducto(Producto producto) {
return ventaService.obtenerVentasPorProducto(producto.getId());
}
private EstadisticaProducto acumularEstadisticas(EstadisticaProducto estadistica, Venta venta) {
// Lógica para acumular estadísticas
estadistica.incrementarVentas(venta.getCantidad());
return estadistica;
}
}
En este servicio, se combinan varios operadores para filtrar productos, obtener ventas asociadas y reducir la información a unas estadísticas acumuladas.
Consideraciones sobre el manejo de errores
Es esencial manejar los errores de forma adecuada en los flujos de RxJava. Operadores como onErrorReturn, onErrorResumeNext o retry permiten definir comportamientos ante excepciones.
Ejemplo de manejo de errores:
public Observable<Producto> obtenerProductosConRecuperacion() {
return productoService.obtenerProductos()
.onErrorResumeNext(Observable.empty());
}
Si ocurre un error al obtener los productos, onErrorResumeNext permite continuar con un Observable alternativo, en este caso vacío.
Uso de operadores en controladores
Los controladores pueden beneficiarse de los operadores para procesar peticiones y formar la respuesta adecuada.
Ejemplo de controlador utilizando operadores:
import io.reactivex.rxjava3.core.Observable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ReporteController {
private final EstadisticaService estadisticaService;
public ReporteController(EstadisticaService estadisticaService) {
this.estadisticaService = estadisticaService;
}
@GetMapping("/estadisticas/productos")
public Observable<EstadisticaProducto> obtenerEstadisticasProductos() {
return estadisticaService.calcularEstadisticasProductos();
}
}
El controlador expone un endpoint que devuelve las estadísticas calculadas utilizando los operadores de RxJava, integrándose de manera fluida en la arquitectura de Spring Web.
Uso de RxJava con RestClient para invocar API REST externas
En aplicaciones Spring Boot 3, es común la necesidad de invocar API REST externas de manera eficiente, manejando adecuadamente la concurrencia y las operaciones asíncronas. La combinación de RxJava con el nuevo RestClient de Spring permite realizar estas llamadas de forma reactiva, mejorando el rendimiento y la escalabilidad de la aplicación.
El RestClient, introducido en Spring Framework 6, es el sucesor del tradicional RestTemplate. Este nuevo cliente HTTP ofrece una API más moderna y fluida, facilitando la integración con paradigmas reactivos. Al utilizar RxJava, podemos envolver las llamadas al RestClient en Observables o Singles, permitiendo manejar las respuestas de las API externas de forma asíncrona y reactiva.
Para empezar, es necesario configurar el RestClient en nuestra aplicación. A diferencia del WebClient utilizado en aplicaciones no bloqueantes con WebFlux, el RestClient está diseñado para ser utilizado en aplicaciones bloqueantes con Spring Web. A continuación, se muestra cómo crear una instancia del RestClient:
import org.springframework.web.client.RestClient;
RestClient restClient = RestClient.builder()
.defaultHeader("Accept", "application/json")
.build();
Una vez configurado el RestClient, podemos integrarlo con RxJava para realizar llamadas a servicios externos y procesar las respuestas de manera reactiva. Por ejemplo, si deseamos obtener información de un servicio externo que proporciona datos de usuarios, podemos crear un método que devuelva un Observable de tipo Usuario:
import io.reactivex.rxjava3.core.Observable;
public Observable<Usuario> obtenerUsuarioPorId(Long id) {
return Observable.create(emitter -> {
try {
Usuario usuario = restClient.get()
.uri("https://api.ejemplo.com/usuarios/{id}", id)
.retrieve()
.body(Usuario.class);
emitter.onNext(usuario);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
}
En este ejemplo, se utiliza el método create de Observable para envolver la llamada al RestClient. El emitter se encarga de emitir el resultado o el error según corresponda. De esta forma, quienes consuman este método podrán suscribirse al Observable y manejar la respuesta de forma asíncrona y no bloqueante.
Es importante manejar adecuadamente los hilos al trabajar con operaciones de I/O. Podemos especificar en qué scheduler se ejecutará el observable utilizando el método subscribeOn de RxJava. Por ejemplo:
return Observable.create(emitter -> {
// Lógica de llamada al RestClient
}).subscribeOn(Schedulers.io());
El Schedulers.io() está optimizado para operaciones de I/O y permite que las llamadas al RestClient no bloqueen el hilo principal. De esta manera, mejoramos la concurrencia y la capacidad de respuesta de nuestra aplicación.
Además de los Observables, podemos utilizar otros tipos de RxJava como Single o Flowable, dependiendo de las necesidades. Si sabemos que la respuesta será un único elemento, podemos optar por un Single:
import io.reactivex.rxjava3.core.Single;
public Single<Usuario> obtenerUsuarioPorId(Long id) {
return Single.fromCallable(() -> restClient.get()
.uri("https://api.ejemplo.com/usuarios/{id}", id)
.retrieve()
.body(Usuario.class)
).subscribeOn(Schedulers.io());
}
En este caso, utilizamos el método fromCallable de Single, que simplifica la creación del flujo al manejar automáticamente las excepciones y emitir un error en caso de que ocurra alguna. Esto hace que el código sea más limpio y conciso.
Para realizar operaciones más complejas, como obtener una lista de elementos o manejar paginación, podemos aprovechar los operadores de RxJava. Por ejemplo, para obtener todos los usuarios de una API externa que proporciona paginación:
public Observable<Usuario> obtenerTodosLosUsuarios() {
return Observable.create(emitter -> {
int pagina = 1;
boolean hayMasPaginas = true;
while (hayMasPaginas && !emitter.isDisposed()) {
List<Usuario> usuarios = restClient.get()
.uri("https://api.ejemplo.com/usuarios?page={pagina}", pagina)
.retrieve()
.body(new ParameterizedTypeReference<List<Usuario>>() {});
if (usuarios != null && !usuarios.isEmpty()) {
for (Usuario usuario : usuarios) {
emitter.onNext(usuario);
}
pagina++;
} else {
hayMasPaginas = false;
}
}
emitter.onComplete();
}).subscribeOn(Schedulers.io());
}
Aquí, el Observable emite cada Usuario obtenido de las diferentes páginas de la API externa. Se controla la paginación mediante un bucle, y se verifica si el emitter ha sido disposed para evitar emitir después de que el suscriptor se haya desuscrito.
Otra ventaja de utilizar RxJava con RestClient es la posibilidad de combinar múltiples llamadas a API externas de forma eficiente. Por ejemplo, si necesitamos obtener información de varias fuentes y combinar los resultados:
public Single<DatosCombinados> obtenerDatosCombinados(Long idUsuario) {
Single<Usuario> usuarioSingle = obtenerUsuarioPorId(idUsuario);
Single<List<Post>> postsSingle = obtenerPostsPorUsuario(idUsuario);
return usuarioSingle.zipWith(postsSingle, (usuario, posts) ->
new DatosCombinados(usuario, posts)
);
}
En este ejemplo, utilizamos el operador zipWith para combinar las respuestas de dos llamadas diferentes. Esto permite manejar de manera elegante y concisa la unificación de datos provenientes de múltiples servicios.
Además, RxJava facilita el manejo de errores y reintentos en llamadas a servicios externos. Podemos utilizar operadores como retry o onErrorResumeNext para definir comportamientos ante fallos:
return obtenerUsuarioPorId(id)
.retry(3)
.onErrorResumeNext(error -> {
// Lógica alternativa o valor por defecto
return Single.just(new UsuarioPorDefecto());
});
Con retry(3) indicamos que se reintente la operación hasta tres veces en caso de fallo. Si tras los reintentos la operación sigue fallando, onErrorResumeNext permite proporcionar una alternativa, como devolver un usuario por defecto.
Al integrar RxJava con RestClient, es importante también considerar la gestión de recursos y evitar potenciales fugas de memoria. Asegurarse de que las suscripciones se manejen correctamente y que los observers se dispongan cuando ya no sean necesarios es clave para mantener la aplicación eficiente.
Al utilizar RxJava en aplicaciones Spring Boot 3 con Spring Web, potenciamos la capacidad de realizar llamadas a API REST externas de manera reactiva. Esto mejora la eficiencia y la escalabilidad, permitiendo manejar de forma óptima las operaciones asíncronas y la manipulación de flujos de datos.
Ejemplo RxJava: consumir datos de Fake Store API con RestClient
1. Crear la clase Product
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Product {
private int id;
private String title;
private double price;
private String category;
private String description;
private String image;
}
2. Servicio CRUD usando RestClient
y RxJava
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.RestClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import java.util.List;
@Service
public class ProductService {
private final RestClient restClient;
public ProductService() {
this.restClient = RestClient.builder()
.baseUrl("https://fakestoreapi.com/products")
.build();
}
// Obtener todos los productos
public Single<List<Product>> getAllProducts() {
return Single.fromCallable(() ->
restClient.get()
.retrieve()
.bodyToFlux(Product.class)
.collectList()
.block()
).subscribeOn(Schedulers.io());
}
// Obtener un producto por ID
public Single<Product> getProductById(int productId) {
return Single.fromCallable(() ->
restClient.get()
.uri("/{id}", productId)
.retrieve()
.bodyToMono(Product.class)
.block()
).subscribeOn(Schedulers.io());
}
// Crear un nuevo producto
public Single<Product> createProduct(Product product) {
return Single.fromCallable(() ->
restClient.post()
.body(product)
.retrieve()
.bodyToMono(Product.class)
.block()
).subscribeOn(Schedulers.io());
}
// Actualizar un producto por ID (PUT)
public Single<Product> updateProduct(int productId, Product product) {
return Single.fromCallable(() ->
restClient.put()
.uri("/{id}", productId)
.body(product)
.retrieve()
.bodyToMono(Product.class)
.block()
).subscribeOn(Schedulers.io());
}
// Eliminar un producto por ID
public Single<Void> deleteProduct(int productId) {
return Single.fromCallable(() -> {
restClient.delete()
.uri("/{id}", productId)
.retrieve()
.toBodilessEntity()
.block();
return null;
}).subscribeOn(Schedulers.io());
}
}
3. Controlador para exponer las operaciones CRUD
import io.reactivex.rxjava3.core.Single;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/products")
public class ProductController {
private final ProductService productService;
public ProductController(ProductService productService) {
this.productService = productService;
}
// Obtener todos los productos
@GetMapping
public Single<ResponseEntity<List<Product>>> getAllProducts() {
return productService.getAllProducts()
.map(ResponseEntity::ok);
}
// Obtener un producto por ID
@GetMapping("/{id}")
public Single<ResponseEntity<Product>> getProductById(@PathVariable int id) {
return productService.getProductById(id)
.map(ResponseEntity::ok)
.onErrorReturn(throwable -> ResponseEntity.notFound().build());
}
// Crear un nuevo producto
@PostMapping
public Single<ResponseEntity<Product>> createProduct(@RequestBody Product product) {
return productService.createProduct(product)
.map(ResponseEntity::ok);
}
// Actualizar un producto por ID
@PutMapping("/{id}")
public Single<ResponseEntity<Product>> updateProduct(@PathVariable int id, @RequestBody Product product) {
return productService.updateProduct(id, product)
.map(ResponseEntity::ok);
}
// Eliminar un producto por ID
@DeleteMapping("/{id}")
public Single<ResponseEntity<Void>> deleteProduct(@PathVariable int id) {
return productService.deleteProduct(id)
.map(v -> ResponseEntity.noContent().build());
}
}
4. Prueba de las operaciones
Ejemplo de JSON para crear producto
{
"title": "Producto de ejemplo",
"price": 29.99,
"category": "jewelery",
"description": "Un producto de prueba con descripción",
"image": "https://example.com/product.jpg"
}
Rutas CRUD
- GET
/products
: Lista todos los productos. - GET
/products/{id}
: Obtiene un producto específico por su ID. - POST
/products
: Crea un nuevo producto con los datos enviados en el cuerpo. - PUT
/products/{id}
: Actualiza un producto existente con los datos enviados en el cuerpo. - DELETE
/products/{id}
: Elimina un producto por su ID.
Ejercicios de esta lección RxJava en Spring Web
Evalúa tus conocimientos de esta lección RxJava en Spring Web con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.
API Query By Example (QBE)
Identificadores y relaciones JPA
Borrar datos de base de datos
Web y Test Starters
Métodos find en repositorios
Controladores Spring MVC
Inserción de datos
CRUD Customers Spring MVC + Spring Data JPA
Backend API REST con Spring Boot
Controladores Spring REST
Uso de Spring con Thymeleaf
API Specification
Registro de usuarios
Crear entidades JPA
Asociaciones en JPA
Asociaciones de entidades JPA
Integración con Vue
Consultas JPQL
Open API y cómo agregarlo en Spring Boot
Uso de Controladores REST
Repositorios reactivos
Inyección de dependencias
Introducción a Spring Boot
CRUD y JPA Repository
Inyección de dependencias
Vista en Spring MVC con Thymeleaf
Servicios en Spring
Operadores Reactivos
Configuración de Vue
Entidades JPA
Integración con Angular
API Specification
API Query By Example (QBE)
Controladores MVC
Anotaciones y mapeo en JPA
Consultas JPQL con @Query en Spring Data JPA
Repositorios Spring Data
Inyección de dependencias
Data JPA y Mail Starters
Configuración de Angular
Controladores Spring REST
Configuración de Controladores MVC
Consultas JPQL con @Query en Spring Data JPA
Actualizar datos de base de datos
Verificar token JWT en peticiones
Login de usuarios
Integración con React
Configuración de React
Todas las lecciones de SpringBoot
Accede a todas las lecciones de SpringBoot y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Spring Boot
Introducción Y Entorno
Spring Boot Starters
Introducción Y Entorno
Inyección De Dependencias
Introducción Y Entorno
Controladores Spring Mvc
Spring Web
Vista En Spring Mvc Con Thymeleaf
Spring Web
Controladores Spring Rest
Spring Web
Open Api Y Cómo Agregarlo En Spring Boot
Spring Web
Servicios En Spring
Spring Web
Clientes Resttemplate Y Restclient
Spring Web
Rxjava En Spring Web
Spring Web
Crear Entidades Jpa
Persistencia Spring Data
Asociaciones De Entidades Jpa
Persistencia Spring Data
Repositorios Spring Data
Persistencia Spring Data
Métodos Find En Repositorios
Persistencia Spring Data
Inserción De Datos
Persistencia Spring Data
Actualizar Datos De Base De Datos
Persistencia Spring Data
Borrar Datos De Base De Datos
Persistencia Spring Data
Consultas Jpql Con @Query En Spring Data Jpa
Persistencia Spring Data
Api Query By Example (Qbe)
Persistencia Spring Data
Api Specification
Persistencia Spring Data
Repositorios Reactivos
Persistencia Spring Data
Introducción E Instalación De Apache Kafka
Mensajería Asíncrona
Crear Proyecto Con Apache Kafka
Mensajería Asíncrona
Creación De Producers
Mensajería Asíncrona
Creación De Consumers
Mensajería Asíncrona
Kafka Streams En Spring Boot
Mensajería Asíncrona
Introducción A Spring Webflux
Reactividad Webflux
Spring Data R2dbc
Reactividad Webflux
Controlador Rest Reactivo Basado En Anotaciones
Reactividad Webflux
Controlador Rest Reactivo Funcional
Reactividad Webflux
Operadores Reactivos Básicos
Reactividad Webflux
Operadores Reactivos Avanzados
Reactividad Webflux
Cliente Reactivo Webclient
Reactividad Webflux
Introducción A Spring Security
Seguridad Con Spring Security
Seguridad Basada En Formulario En Mvc Con Thymeleaf
Seguridad Con Spring Security
Registro De Usuarios
Seguridad Con Spring Security
Login De Usuarios
Seguridad Con Spring Security
Verificar Token Jwt En Peticiones
Seguridad Con Spring Security
Seguridad Jwt En Api Rest Spring Web
Seguridad Con Spring Security
Seguridad Jwt En Api Rest Reactiva Spring Webflux
Seguridad Con Spring Security
Autenticación Y Autorización Con Anotaciones
Seguridad Con Spring Security
Testing Unitario De Componentes Y Servicios
Testing Con Spring Test
Testing De Repositorios Spring Data Jpa
Testing Con Spring Test
Testing Controladores Spring Mvc Con Thymeleaf
Testing Con Spring Test
Testing Controladores Rest Con Json
Testing Con Spring Test
Testing De Aplicaciones Reactivas Webflux
Testing Con Spring Test
Testing De Seguridad Spring Security
Testing Con Spring Test
Testing Con Apache Kafka
Testing Con Spring Test
Integración Con Angular
Integración Frontend
Integración Con React
Integración Frontend
Integración Con Vue
Integración Frontend
En esta lección
Objetivos de aprendizaje de esta lección
- Comprender qué es la programación reactiva
- Saber añadir la librería RxJava
- Conocer los tipos existentes en RxJava
- Conocer los operadores de RxJava