Estado global con RxJS Subjects

Avanzado
Angular
Angular
Actualizado: 24/09/2025

Subject de RxJS

Los Subject en RxJS son observables especiales que actúan como emisores y observadores simultáneamente. A diferencia de los observables tradicionales que solo emiten valores, los subjects pueden recibir nuevos valores a través del método next() y distribuirlos a múltiples suscriptores. Esto los convierte en candidatos ideales para gestión de estado global en aplicaciones Angular.

Introducción a Subject vs BehaviorSubject vs ReplaySubject

Existen tres tipos principales de subjects que necesitas conocer para gestión de estado:

  • Subject básico: Emite valores solo a suscriptores activos desde el momento de la emisión
  • BehaviorSubject: Almacena el último valor emitido y lo proporciona inmediatamente a nuevos suscriptores
  • ReplaySubject: Almacena múltiples valores anteriores y los reproduce a nuevos suscriptores

BehaviorSubject para estado actual

El BehaviorSubject es la opción más común para estado global porque siempre mantiene un valor actual. Requiere un valor inicial y garantiza que cualquier nuevo suscriptor reciba inmediatamente el estado actual.

import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';

export interface UserState {
  id: number | null;
  name: string;
  email: string;
  isAuthenticated: boolean;
}

@Injectable({
  providedIn: 'root'
})
export class UserStateService {
  private readonly initialState: UserState = {
    id: null,
    name: '',
    email: '',
    isAuthenticated: false
  };

  private userSubject = new BehaviorSubject<UserState>(this.initialState);
  
  // Observable público para que los componentes se suscriban
  public user$ = this.userSubject.asObservable();

  // Getter para obtener el valor actual sin suscripción
  get currentUser(): UserState {
    return this.userSubject.value;
  }

  // Método para actualizar todo el estado
  setUser(user: UserState): void {
    this.userSubject.next(user);
  }

  // Método para login
  login(id: number, name: string, email: string): void {
    const newState: UserState = {
      id,
      name,
      email,
      isAuthenticated: true
    };
    this.userSubject.next(newState);
  }

  // Método para logout
  logout(): void {
    this.userSubject.next(this.initialState);
  }
}

El BehaviorSubject es perfecto para estados donde necesitas:

  • Acceso inmediato al valor actual
  • Que nuevos componentes reciban el estado sin esperar actualizaciones
  • Un estado que siempre tiene un valor válido

Usando BehaviorSubject en componentes

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
import { UserStateService, UserState } from './user-state.service';

@Component({
  selector: 'app-user-profile',
  standalone: true,
  template: `
    @if (user.isAuthenticated) {
      <div class="profile">
        <h2>Bienvenido, {{ user.name }}</h2>
        <p>Email: {{ user.email }}</p>
        <button (click)="logout()">Cerrar sesión</button>
      </div>
    } @else {
      <div class="login">
        <p>No has iniciado sesión</p>
        <button (click)="login()">Iniciar sesión</button>
      </div>
    }
  `
})
export class UserProfileComponent implements OnInit, OnDestroy {
  user: UserState = this.userStateService.currentUser;
  private destroy$ = new Subject<void>();

  constructor(private userStateService: UserStateService) {}

  ngOnInit(): void {
    // Suscripción al estado del usuario
    this.userStateService.user$
      .pipe(takeUntil(this.destroy$))
      .subscribe(user => {
        this.user = user;
      });
  }

  login(): void {
    this.userStateService.login(1, 'Ana García', 'ana@email.com');
  }

  logout(): void {
    this.userStateService.logout();
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

ReplaySubject para historial de estado

El ReplaySubject almacena múltiples valores anteriores y los reproduce a nuevos suscriptores. Es útil cuando necesitas mantener un historial de cambios o cuando el orden de las emisiones es importante.

import { Injectable } from '@angular/core';
import { ReplaySubject } from 'rxjs';

export interface Notification {
  id: string;
  message: string;
  type: 'success' | 'error' | 'warning' | 'info';
  timestamp: Date;
}

@Injectable({
  providedIn: 'root'
})
export class NotificationService {
  // ReplaySubject que mantiene las últimas 5 notificaciones
  private notificationSubject = new ReplaySubject<Notification>(5);
  
  public notifications$ = this.notificationSubject.asObservable();

  addNotification(message: string, type: Notification['type']): void {
    const notification: Notification = {
      id: crypto.randomUUID(),
      message,
      type,
      timestamp: new Date()
    };
    
    this.notificationSubject.next(notification);
  }

  addSuccess(message: string): void {
    this.addNotification(message, 'success');
  }

  addError(message: string): void {
    this.addNotification(message, 'error');
  }
}

Casos de uso para cada tipo

Usa BehaviorSubject cuando:

  • Necesites acceso inmediato al estado actual
  • El estado represente una configuración o preferencia
  • Los componentes necesiten el valor actual al inicializarse
  • Manejes estados como: usuario actual, tema de la aplicación, configuración

Usa ReplaySubject cuando:

  • Necesites mantener un historial de valores
  • Los nuevos suscriptores deban recibir múltiples valores anteriores
  • Manejes eventos o notificaciones donde el orden importa
  • Implementes funcionalidades como: logs de actividad, notificaciones, historial de navegación

Patrón de servicio de estado completo

import { Injectable } from '@angular/core';
import { BehaviorSubject, ReplaySubject, Observable } from 'rxjs';

export interface AppSettings {
  theme: 'light' | 'dark';
  language: string;
  notifications: boolean;
}

export interface AppEvent {
  type: string;
  data: any;
  timestamp: Date;
}

@Injectable({
  providedIn: 'root'
})
export class AppStateService {
  // Estado actual con BehaviorSubject
  private settingsSubject = new BehaviorSubject<AppSettings>({
    theme: 'light',
    language: 'es',
    notifications: true
  });

  // Eventos/historial con ReplaySubject
  private eventsSubject = new ReplaySubject<AppEvent>(10);

  // Observables públicos
  settings$: Observable<AppSettings> = this.settingsSubject.asObservable();
  events$: Observable<AppEvent> = this.eventsSubject.asObservable();

  // Getters para valores actuales
  get currentSettings(): AppSettings {
    return this.settingsSubject.value;
  }

  // Métodos para actualizar estado
  updateSettings(settings: Partial<AppSettings>): void {
    const currentSettings = this.settingsSubject.value;
    const newSettings = { ...currentSettings, ...settings };
    this.settingsSubject.next(newSettings);
    
    // Registrar el cambio como evento
    this.logEvent('SETTINGS_UPDATED', settings);
  }

  changeTheme(theme: 'light' | 'dark'): void {
    this.updateSettings({ theme });
  }

  private logEvent(type: string, data: any): void {
    const event: AppEvent = {
      type,
      data,
      timestamp: new Date()
    };
    this.eventsSubject.next(event);
  }
}

La clave del éxito con BehaviorSubject y ReplaySubject está en mantener los subjects privados dentro del servicio y exponer solo observables de lectura. Esto garantiza que el estado solo se modifique a través de métodos controlados, manteniendo la integridad de los datos en tu aplicación Angular.

Patrones de estado con RxJS operators

Una vez que tienes servicios de estado configurados con BehaviorSubject y ReplaySubject, necesitas patrones para combinar, transformar y gestionar múltiples streams de estado de manera eficiente. Los operadores RxJS proporcionan las herramientas necesarias para crear arquitecturas de estado robustas y escalables.

Combinación de múltiples estados con combineLatest

El operador combineLatest es fundamental cuando necesitas crear estado derivado que depende de múltiples fuentes. Emite cada vez que cualquiera de los observables fuente cambia, proporcionando siempre los valores más recientes de todos.

import { Injectable } from '@angular/core';
import { BehaviorSubject, combineLatest, map } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class ShoppingStateService {
  private cartItemsSubject = new BehaviorSubject<CartItem[]>([]);
  private discountSubject = new BehaviorSubject<number>(0);
  private taxRateSubject = new BehaviorSubject<number>(0.21);

  public cartItems$ = this.cartItemsSubject.asObservable();
  public discount$ = this.discountSubject.asObservable();
  public taxRate$ = this.taxRateSubject.asObservable();

  // Estado derivado que combina múltiples fuentes
  public cartSummary$ = combineLatest([
    this.cartItems$,
    this.discount$,
    this.taxRate$
  ]).pipe(
    map(([items, discount, taxRate]) => {
      const subtotal = items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
      const discountAmount = subtotal * discount;
      const subtotalWithDiscount = subtotal - discountAmount;
      const taxAmount = subtotalWithDiscount * taxRate;
      const total = subtotalWithDiscount + taxAmount;

      return {
        itemCount: items.length,
        subtotal,
        discountAmount,
        taxAmount,
        total
      };
    })
  );

  addItem(item: CartItem): void {
    const currentItems = this.cartItemsSubject.value;
    this.cartItemsSubject.next([...currentItems, item]);
  }

  setDiscount(discount: number): void {
    this.discountSubject.next(discount);
  }
}

Optimización con distinctUntilChanged

Para evitar emisiones innecesarias cuando el estado no ha cambiado realmente, utiliza distinctUntilChanged. Este operador es especialmente útil con objetos complejos donde necesitas comparación personalizada.

import { Injectable } from '@angular/core';
import { BehaviorSubject, distinctUntilChanged, map } from 'rxjs';

export interface UserPreferences {
  theme: string;
  language: string;
  fontSize: number;
}

@Injectable({
  providedIn: 'root'
})
export class PreferencesService {
  private preferencesSubject = new BehaviorSubject<UserPreferences>({
    theme: 'light',
    language: 'es',
    fontSize: 14
  });

  // Solo emite cuando las preferencias realmente cambian
  public preferences$ = this.preferencesSubject.pipe(
    distinctUntilChanged((prev, curr) => 
      prev.theme === curr.theme && 
      prev.language === curr.language && 
      prev.fontSize === curr.fontSize
    )
  );

  // Observable específico para el tema con optimización
  public theme$ = this.preferences$.pipe(
    map(prefs => prefs.theme),
    distinctUntilChanged()
  );

  updatePreferences(updates: Partial<UserPreferences>): void {
    const current = this.preferencesSubject.value;
    const newPreferences = { ...current, ...updates };
    this.preferencesSubject.next(newPreferences);
  }
}

Fusión de eventos con merge

El operador merge combina múltiples observables en uno solo, emitiendo valores de cualquiera de las fuentes. Es útil para unificar diferentes tipos de eventos que afectan el mismo estado.

import { Injectable } from '@angular/core';
import { BehaviorSubject, Subject, merge, scan, startWith } from 'rxjs';

export interface TodoAction {
  type: 'ADD' | 'REMOVE' | 'TOGGLE' | 'CLEAR_COMPLETED';
  payload?: any;
}

export interface Todo {
  id: number;
  text: string;
  completed: boolean;
}

@Injectable({
  providedIn: 'root'
})
export class TodoStateService {
  private addTodoSubject = new Subject<string>();
  private removeTodoSubject = new Subject<number>();
  private toggleTodoSubject = new Subject<number>();
  private clearCompletedSubject = new Subject<void>();

  // Combinar todos los eventos en un stream unificado
  private actions$ = merge(
    this.addTodoSubject.pipe(
      map(text => ({ type: 'ADD' as const, payload: text }))
    ),
    this.removeTodoSubject.pipe(
      map(id => ({ type: 'REMOVE' as const, payload: id }))
    ),
    this.toggleTodoSubject.pipe(
      map(id => ({ type: 'TOGGLE' as const, payload: id }))
    ),
    this.clearCompletedSubject.pipe(
      map(() => ({ type: 'CLEAR_COMPLETED' as const }))
    )
  );

  // Reducer pattern con scan
  public todos$ = this.actions$.pipe(
    startWith({ type: 'INIT' as const }),
    scan((todos: Todo[], action) => {
      switch (action.type) {
        case 'ADD':
          return [...todos, {
            id: Date.now(),
            text: action.payload,
            completed: false
          }];
        case 'REMOVE':
          return todos.filter(todo => todo.id !== action.payload);
        case 'TOGGLE':
          return todos.map(todo =>
            todo.id === action.payload
              ? { ...todo, completed: !todo.completed }
              : todo
          );
        case 'CLEAR_COMPLETED':
          return todos.filter(todo => !todo.completed);
        default:
          return todos;
      }
    }, [] as Todo[])
  );

  addTodo(text: string): void {
    this.addTodoSubject.next(text);
  }

  removeTodo(id: number): void {
    this.removeTodoSubject.next(id);
  }

  toggleTodo(id: number): void {
    this.toggleTodoSubject.next(id);
  }

  clearCompleted(): void {
    this.clearCompletedSubject.next();
  }
}

Compartir estado con shareReplay

Para evitar múltiples ejecuciones de lógica pesada cuando varios componentes se suscriben al mismo observable, utiliza shareReplay. Este operador cachea los últimos valores y los comparte entre suscriptores.

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { BehaviorSubject, Observable, switchMap, shareReplay, map } from 'rxjs';

export interface ApiData {
  users: User[];
  lastUpdated: Date;
}

@Injectable({
  providedIn: 'root'
})
export class DataStateService {
  private refreshSubject = new BehaviorSubject<void>(undefined);

  // Observable que se ejecuta solo una vez y comparte el resultado
  public data$: Observable<ApiData> = this.refreshSubject.pipe(
    switchMap(() => this.http.get<User[]>('/api/users')),
    map(users => ({
      users,
      lastUpdated: new Date()
    })),
    shareReplay(1) // Cachea el último valor para nuevos suscriptores
  );

  // Estados derivados que reutilizan la misma data
  public activeUsers$ = this.data$.pipe(
    map(data => data.users.filter(user => user.active))
  );

  public userCount$ = this.data$.pipe(
    map(data => data.users.length)
  );

  constructor(private http: HttpClient) {}

  refresh(): void {
    this.refreshSubject.next();
  }
}

Manejo de errores en estado

El manejo de errores es crucial en aplicaciones de producción. Utiliza catchError para recuperarte de errores sin romper el stream de estado.

import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, catchError, of, retry, delay } from 'rxjs';

export interface LoadingState<T> {
  data: T | null;
  loading: boolean;
  error: string | null;
}

@Injectable({
  providedIn: 'root'
})
export class ResilientStateService {
  private stateSubject = new BehaviorSubject<LoadingState<User[]>>({
    data: null,
    loading: false,
    error: null
  });

  public state$ = this.stateSubject.asObservable();

  constructor(private http: HttpClient) {}

  loadUsers(): void {
    // Establecer estado de carga
    this.stateSubject.next({
      data: this.stateSubject.value.data,
      loading: true,
      error: null
    });

    this.http.get<User[]>('/api/users').pipe(
      retry(3), // Reintentar 3 veces
      delay(1000), // Esperar 1 segundo entre reintentos
      catchError(error => {
        // En caso de error, actualizar estado con error
        this.stateSubject.next({
          data: this.stateSubject.value.data,
          loading: false,
          error: 'Error al cargar usuarios. Inténtalo de nuevo.'
        });
        return of([]); // Retornar array vacío para mantener el stream
      })
    ).subscribe(users => {
      // Éxito: actualizar estado con datos
      this.stateSubject.next({
        data: users,
        loading: false,
        error: null
      });
    });
  }
}

Completion y cleanup de streams

Para evitar memory leaks, es importante completar adecuadamente los streams cuando ya no son necesarios. Implementa patrones de cleanup en tus servicios de estado.

import { Injectable, OnDestroy } from '@angular/core';
import { Subject, BehaviorSubject, takeUntil, timer } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class TimedStateService implements OnDestroy {
  private destroy$ = new Subject<void>();
  private dataSubject = new BehaviorSubject<any>(null);
  
  public data$ = this.dataSubject.asObservable();

  constructor() {
    // Auto-refresh cada 30 segundos
    timer(0, 30000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(() => {
      this.refreshData();
    });
  }

  private refreshData(): void {
    // Lógica de actualización
    console.log('Actualizando datos...');
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
    this.dataSubject.complete();
  }
}

Estos patrones con operadores RxJS te permiten crear arquitecturas de estado sofisticadas y maintener un control granular sobre cómo fluyen y se transforman los datos en tu aplicación Angular. La clave está en combinar operators de manera eficiente para evitar suscripciones innecesarias y mantener la performance óptima.

NgRx y alternativas para la gestión de estado

La gestión de estado en una aplicación puede complicarse conforme esta crece. Aquí mostramos recomendaciones de qué sistema de estado utilizar en función de la complejidad.

Para aplicaciones pequeñas-medianas:

  • RxJS Subjects en servicios
  • Angular Signals nativos (lo vemos en la siguiente lección)

Para aplicaciones complejas:

  • NGXS
  • NgRx Store
  • NgRx Signals
Alan Sastre - Autor del tutorial

Alan Sastre

Ingeniero de Software y formador, CEO en CertiDevs

Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, Angular es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.

Más tutoriales de Angular

Explora más contenido relacionado con Angular y continúa aprendiendo con nuestros tutoriales gratuitos.

Aprendizajes de esta lección

  • Comprender las diferencias entre Subject, BehaviorSubject y ReplaySubject.
  • Implementar BehaviorSubject para mantener y distribuir el estado actual en servicios Angular.
  • Utilizar ReplaySubject para conservar y emitir un historial de estados o eventos.
  • Aplicar operadores RxJS como combineLatest, distinctUntilChanged, merge y shareReplay para gestionar y optimizar flujos de estado.
  • Implementar patrones de manejo de errores, limpieza de streams y optimización de suscripciones en servicios de estado.