Flow, StateFlow y SharedFlow

Avanzado
Kotlin
Kotlin
Actualizado: 04/05/2026

Diagrama: tutorial-kotlin-flow-state-shared

El tipo Flow de kotlinx.coroutines modela un flujo asíncrono de valores que se emiten y procesan bajo demanda. A diferencia de una lista o una secuencia, los elementos pueden llegar a lo largo del tiempo y las operaciones de transformación son funciones suspendibles, lo que encaja con IO, sensores o interfaces reactivas.

Junto a Flow conviven dos variantes especializadas. StateFlow expone un estado observable que siempre tiene un valor actual. SharedFlow distribuye eventos a varios suscriptores sin necesidad de mantener un estado. Ambos se construyen sobre la misma infraestructura y se integran con el resto de coroutines con naturalidad.

Flujos frios con Flow

Un Flow se define con el builder flow { ... }. Dentro del bloque, emit(valor) envia cada elemento al consumidor. La ejecución del bloque no empieza hasta que alguien llame a collect, por eso se describe como flujo frio.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun numeros(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    println("Antes de collect")
    numeros().collect { valor ->
        println("Recibido $valor")
    }
    println("Despues de collect")
}

Cada invocación de collect reinicia la producción desde el principio. El builder es suspendible: se puede usar delay, IO asíncrona o cualquier otra función suspend sin bloquear el hilo.

Los Flow no se ejecutan hasta que hay un consumidor. Esta evaluación perezosa permite componer cadenas de operadores sin miedo a disparar trabajo innecesario.

Operadores intermedios

Los operadores de Flow se parecen a los de colecciones, pero son suspendibles y encadenables. Algunos de los mas usados son map, filter, onEach, take, debounce y distinctUntilChanged.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    (1..10).asFlow()
        .filter { it % 2 == 0 }
        .map { it * it }
        .onEach { println("Procesando $it") }
        .collect()
}

La cadena se construye en orden declarativo y no consume datos hasta que collect inicia la ejecución. onEach es útil para efectos secundarios como registrar eventos sin alterar el flujo.

Transformaciones con funciones suspendibles

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

suspend fun detalleUsuario(id: Int): String {
    delay(200)
    return "Usuario#$id"
}

fun main() = runBlocking {
    flowOf(1, 2, 3)
        .map { detalleUsuario(it) }
        .collect { println(it) }
}

El hecho de que map pueda invocar funciones suspend convierte a Flow en un mecanismo adecuado para orquestar llamadas a APIs, bases de datos o caches sin callbacks ni hilos explícitos.

Control del contexto con flowOn

Cuando la fuente del flujo realiza trabajo pesado de IO o CPU, es conveniente ejecutar esa parte en un Dispatcher distinto del consumidor. El operador flowOn cambia el contexto de las operaciones de la cadena anterior.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun archivosGrandes(): Flow<String> = flow {
    for (i in 1..3) {
        emit("linea $i")
    }
}.flowOn(Dispatchers.IO)

fun main() = runBlocking {
    archivosGrandes().collect { linea ->
        println("[${Thread.currentThread().name}] $linea")
    }
}

La producción del flujo ocurre en el dispatcher de IO mientras que el collect se mantiene en el hilo principal. Este patrón es esencial para no bloquear la interfaz o el hilo de negocio con operaciones intensivas.

Terminal operators

Para ejecutar un flujo se necesita una operación terminal. Además de collect, hay varias formas de consumir resultados:

  • toList(): colecciona todos los elementos en una lista. Adecuada para flujos finitos.
  • first() y firstOrNull(): obtienen solo el primer elemento.
  • reduce { acc, v -> ... } y fold(inicial) { acc, v -> ... }: combinan elementos.
  • launchIn(scope): lanza la recolección en un CoroutineScope y devuelve un Job para cancelarla.
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val total = (1..100).asFlow().fold(0) { acc, v -> acc + v }
    println("Suma: $total")
}

StateFlow: estado observable

Un StateFlow es un flujo caliente que mantiene siempre un valor actual. Cualquier nuevo suscriptor recibe de inmediato el último valor emitido. Esta caracteristica lo hace adecuado para modelar el estado de una pantalla, un caso de uso o una configuración.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

data class EstadoSesion(val usuario: String? = null, val cargando: Boolean = false)

class GestorSesion(private val scope: CoroutineScope) {
    private val _estado = MutableStateFlow(EstadoSesion())
    val estado: StateFlow<EstadoSesion> = _estado.asStateFlow()

    fun iniciarSesion(nombre: String) {
        scope.launch {
            _estado.value = _estado.value.copy(cargando = true)
            delay(500)
            _estado.value = EstadoSesion(usuario = nombre, cargando = false)
        }
    }
}

fun main() = runBlocking {
    val gestor = GestorSesion(this)
    val trabajo = launch {
        gestor.estado.collect { estado ->
            println("Estado: $estado")
        }
    }
    gestor.iniciarSesion("Ana")
    delay(800)
    trabajo.cancel()
}

El flujo del StateFlow dedupica emisiones identicas: si se asigna un valor igual al actual, no se notifica. Este comportamiento es útil para interfaces reactivas que no deben redibujar sin cambios efectivos.

Un StateFlow siempre tiene un valor. Es el equivalente reactivo de un campo observable que nunca esta vacío.

SharedFlow: eventos sin estado

Un SharedFlow distribuye valores a todos sus suscriptores activos sin mantener un estado actual. Es ideal para eventos puntuales como notificaciones, ticks de reloj o acciones de usuario que no representan un estado persistente.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class BusEventos {
    private val _eventos = MutableSharedFlow<String>()
    val eventos: SharedFlow<String> = _eventos.asSharedFlow()

    suspend fun publicar(evento: String) = _eventos.emit(evento)
}

fun main() = runBlocking {
    val bus = BusEventos()

    val escuchaA = launch {
        bus.eventos.collect { println("A recibio $it") }
    }
    val escuchaB = launch {
        bus.eventos.collect { println("B recibio $it") }
    }

    delay(50)
    bus.publicar("pago-exitoso")
    bus.publicar("envio-iniciado")
    delay(50)
    escuchaA.cancel()
    escuchaB.cancel()
}

El constructor de MutableSharedFlow admite un replay (cuantos eventos recientes se guardan para los nuevos suscriptores) y un extraBufferCapacity que amortigua cuando los consumidores son lentos. Estas opciones permiten afinar el comportamiento según el caso de uso.

El mismo conjunto de operadores se aplica a las tres fuentes. La diferencia esta en la naturaleza del productor: bajo demanda, con estado actual o de tipo evento.

Combinación de flujos

Los operadores combine, zip y flatMapLatest unen dos o mas flujos en uno solo. Son centrales en arquitecturas reactivas donde el estado de la UI depende de varias fuentes simultaneas.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay

fun busqueda(): Flow<String> = flow {
    listOf("k", "ko", "kot", "kotl", "kotli", "kotlin").forEach {
        emit(it)
        delay(120)
    }
}

suspend fun buscarEnApi(texto: String): List<String> {
    delay(80)
    return if (texto.length < 3) emptyList() else listOf("$texto-1", "$texto-2")
}

fun main() = runBlocking {
    busqueda()
        .debounce(100)
        .distinctUntilChanged()
        .flatMapLatest { texto ->
            flow { emit(buscarEnApi(texto)) }
        }
        .collect { println("Resultado: $it") }
}

Este patrón, con debounce y flatMapLatest, es la base de buscadores con autocompletado. Solo se lanza una petición cuando el usuario deja de escribir, y si llega texto nuevo antes de que termine la anterior, se cancela la pendiente.

Gestion de errores

Los flujos propagan excepciones como cualquier coroutine. El operador catch permite interceptar errores de la cadena anterior y reaccionar sin interrumpir a los consumidores.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun lecturasSensor(): Flow<Int> = flow {
    emit(1)
    emit(2)
    throw RuntimeException("Sensor desconectado")
}

fun main() = runBlocking {
    lecturasSensor()
        .catch { e -> emit(-1).also { println("Error capturado: ${e.message}") } }
        .collect { println("Valor: $it") }
}

catch solo captura errores aguas arriba, no los producidos por el collect. Para estos últimos, se usa un try/catch tradicional en el consumidor.

Cuando elegir cada variante

La elección depende de la naturaleza de los datos y del consumidor.

  • 1. Flow frio: origen de datos bajo demanda (consultas, transformaciones, iteradores asíncronos).
  • 2. StateFlow: estado observable siempre presente, típico de ViewModels y casos de uso con UI.
  • 3. SharedFlow: eventos puntuales que varios consumidores deben recibir sin guardar estado.
  • 4. Channels: comunicación entre productor y consumidor con semántica "uno a uno" y contrapresión estricta.

callbackFlow es un builder especial para envolver APIs basadas en callbacks (listeners, SDK nativos) y convertirlas en Flow seguros con send y awaitClose. Los Channel no son reactivos en el sentido de Flow: cada valor emitido lo recibe un único consumidor, lo que los convierte en la primitiva ideal para pipelines productor consumidor.

Los flujos se han convertido en el mecanismo por defecto para modelar streams reactivos en Kotlin, especialmente en aplicaciones multiplataforma con Jetpack Compose o Ktor. El uso consistente de Flow junto con coroutines produce un estilo coherente y permite reutilizar los mismos operadores tanto en backend como en cliente.

Alan Sastre - Autor del tutorial

Alan Sastre

Ingeniero de Software y formador, CEO en CertiDevs

Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, Kotlin es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.

Más tutoriales de Kotlin

Explora más contenido relacionado con Kotlin y continúa aprendiendo con nuestros tutoriales gratuitos.

Aprendizajes de esta lección

Construir flujos reactivos frios con Flow, gestionar estado observable con StateFlow y eventos con SharedFlow, aplicar operadores de transformación, contrapresión y control del contexto.