Rust

Rust

Tutorial Rust: Channels y paso de mensajes

Aprende a usar canales MPSC en Rust para comunicación entre hilos y patrones concurrentes seguros con ejemplos prácticos y alternativas a mutex.

Aprende Rust y certifícate

MPSC channels

Los canales (channels) en Rust representan una forma elegante de implementar la comunicación entre hilos siguiendo el principio de "comunicar mediante el paso de mensajes" en lugar de "compartir memoria". Esta filosofía es fundamental en la programación concurrente de Rust, ya que promueve un modelo más seguro y menos propenso a errores.

Un canal MPSC (Multiple Producer, Single Consumer) permite que múltiples hilos productores envíen mensajes a un único hilo consumidor. Esta estructura es particularmente útil en escenarios donde varios procesos generan datos que deben ser procesados por un único componente centralizado.

Para utilizar canales en Rust, necesitamos importar el módulo correspondiente de la biblioteca estándar:

use std::sync::mpsc;
use std::thread;

Creación de un canal MPSC

La creación de un canal MPSC en Rust es sencilla mediante la función mpsc::channel():

fn main() {
    // Creamos un canal que devuelve una tupla (tx, rx)
    let (transmisor, receptor) = mpsc::channel();
    
    println!("Canal creado correctamente");
}

Este código crea un canal y nos devuelve dos extremos:

  • transmisor (tx): El extremo por donde se envían los mensajes
  • receptor (rx): El extremo por donde se reciben los mensajes

Enviando mensajes a través del canal

Para enviar un mensaje a través del canal, utilizamos el método send() del transmisor:

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Creamos un nuevo hilo
    thread::spawn(move || {
        // Enviamos un mensaje simple
        let mensaje = "¡Hola desde otro hilo!";
        tx.send(mensaje).unwrap();
        
        println!("Mensaje enviado");
    });
    
    // El hilo principal recibe el mensaje
    let mensaje_recibido = rx.recv().unwrap();
    println!("Mensaje recibido: {}", mensaje_recibido);
}

En este ejemplo, creamos un nuevo hilo que envía un mensaje de texto a través del canal. El hilo principal espera y recibe ese mensaje. Observa cómo usamos move para transferir la propiedad del transmisor al closure del nuevo hilo.

Recibiendo mensajes

Existen dos formas principales de recibir mensajes:

  • recv(): Bloquea el hilo hasta que recibe un mensaje o el canal se cierra
  • try_recv(): No bloquea, devuelve inmediatamente un Result indicando si hay un mensaje disponible
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Enviamos un mensaje desde el hilo principal
    tx.send(42).unwrap();
    
    // Recibimos con recv() - bloqueante
    let valor = rx.recv().unwrap();
    println!("Recibido con recv(): {}", valor);
    
    // Intentamos recibir con try_recv() - no bloqueante
    match rx.try_recv() {
        Ok(valor) => println!("Recibido con try_recv(): {}", valor),
        Err(e) => println!("No hay mensajes disponibles: {:?}", e),
    }
}

La diferencia clave es que recv() esperará indefinidamente hasta que llegue un mensaje, mientras que try_recv() retornará inmediatamente con un error si no hay mensajes disponibles.

Múltiples productores

La "M" en MPSC significa que podemos tener múltiples productores enviando mensajes al mismo canal. Esto se logra clonando el transmisor:

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Creamos múltiples transmisores clonando el original
    let tx1 = tx.clone();
    
    // Primer hilo productor
    thread::spawn(move || {
        tx.send("Mensaje del productor 1").unwrap();
    });
    
    // Segundo hilo productor
    thread::spawn(move || {
        tx1.send("Mensaje del productor 2").unwrap();
    });
    
    // Recibimos los dos mensajes
    for _ in 0..2 {
        println!("{}", rx.recv().unwrap());
    }
}

En este ejemplo, creamos dos hilos productores, cada uno con su propia copia del transmisor. Ambos envían mensajes al mismo receptor, que los procesa en el orden en que llegan.

Enviando datos más complejos

Los canales pueden transmitir cualquier tipo de datos que implementen el trait Send, lo que incluye la mayoría de los tipos en Rust:

fn main() {
    // Definimos una estructura para nuestros mensajes
    #[derive(Debug)]
    struct Tarea {
        id: u32,
        descripcion: String,
    }
    
    let (tx, rx) = mpsc::channel();
    
    // Creamos varios hilos trabajadores
    for id in 1..=3 {
        let tx = tx.clone();
        thread::spawn(move || {
            let tarea = Tarea {
                id,
                descripcion: format!("Tarea número {}", id),
            };
            
            // Simulamos algún trabajo
            thread::sleep(std::time::Duration::from_millis(id * 200));
            
            // Enviamos la tarea completada
            tx.send(tarea).unwrap();
        });
    }
    
    // Descartamos el transmisor original para que el canal se cierre
    // cuando todos los hilos terminen
    drop(tx);
    
    // Procesamos todas las tareas recibidas
    for tarea in rx {
        println!("Tarea completada: {:?}", tarea);
    }
}

Este ejemplo muestra cómo:

  1. Enviar estructuras personalizadas a través del canal
  2. Usar el receptor como un iterador que procesa mensajes hasta que el canal se cierra
  3. Utilizar drop(tx) para asegurar que el canal se cierre cuando todos los transmisores sean descartados

Canales como tuberías de procesamiento

Los canales son ideales para implementar pipelines de procesamiento, donde cada etapa procesa datos y los pasa a la siguiente:

fn main() {
    // Creamos dos canales para nuestro pipeline
    let (tx_entrada, rx_entrada) = mpsc::channel();
    let (tx_procesado, rx_procesado) = mpsc::channel();
    
    // Etapa 1: Generador de números
    thread::spawn(move || {
        for i in 1..=5 {
            tx_entrada.send(i).unwrap();
            thread::sleep(std::time::Duration::from_millis(100));
        }
    });
    
    // Etapa 2: Procesador (eleva al cuadrado)
    thread::spawn(move || {
        for valor in rx_entrada {
            let resultado = valor * valor;
            tx_procesado.send(resultado).unwrap();
        }
    });
    
    // Etapa 3: Consumidor final
    for resultado in rx_procesado {
        println!("Resultado procesado: {}", resultado);
    }
}

En este pipeline de tres etapas:

  1. Un hilo genera números y los envía al primer canal
  2. Otro hilo recibe esos números, los procesa (elevándolos al cuadrado) y los envía al segundo canal
  3. El hilo principal recibe y muestra los resultados procesados

Manejo de errores en canales

Es importante manejar correctamente los posibles errores al trabajar con canales:

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        // El transmisor se destruye al final de este scope
        // lo que cierra el canal
        if let Err(e) = tx.send(42) {
            println!("Error al enviar: {}", e);
        }
    });
    
    // Esperamos un poco para asegurarnos que el otro hilo termine
    thread::sleep(std::time::Duration::from_millis(100));
    
    // Intentamos recibir después de que el canal se ha cerrado
    match rx.recv() {
        Ok(valor) => println!("Recibido: {}", valor),
        Err(e) => println!("Error al recibir: {}", e),
    }
}

Los errores más comunes son:

  • SendError: Ocurre cuando intentamos enviar a un canal cuyo receptor ha sido descartado
  • RecvError: Ocurre cuando intentamos recibir de un canal cuyos transmisores han sido todos descartados

Los canales MPSC proporcionan una forma segura y eficiente de comunicación entre hilos, eliminando muchos de los problemas asociados con la memoria compartida y los bloqueos. Son especialmente útiles en aplicaciones que siguen patrones de productor-consumidor o que implementan sistemas de procesamiento por etapas.

Send/recv patterns

Los patrones de envío y recepción (send/recv patterns) en Rust representan diferentes estrategias para estructurar la comunicación entre hilos mediante canales. Estos patrones nos permiten diseñar soluciones concurrentes elegantes y adaptadas a distintos escenarios.

Patrón de fan-out (distribución)

El patrón fan-out distribuye trabajo entre múltiples hilos trabajadores, permitiendo el procesamiento paralelo de tareas:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let rx = std::sync::Arc::new(std::sync::Mutex::new(rx));
    
    // Creamos varios trabajadores
    let num_workers = 4;
    let mut handles = vec![];
    
    for id in 0..num_workers {
        let rx_worker = rx.clone();
        
        let handle = thread::spawn(move || {
            println!("Trabajador {} iniciado", id);
            
            loop {
                // Intentamos obtener una tarea
                let tarea = {
                    let rx_guard = rx_worker.lock().unwrap();
                    rx_guard.try_recv()
                };
                
                match tarea {
                    Ok(num) => {
                        println!("Trabajador {} procesando tarea: {}", id, num);
                        thread::sleep(Duration::from_millis(500));
                    },
                    Err(mpsc::TryRecvError::Empty) => {
                        thread::sleep(Duration::from_millis(100));
                        continue;
                    },
                    Err(mpsc::TryRecvError::Disconnected) => break,
                }
            }
            
            println!("Trabajador {} terminado", id);
        });
        
        handles.push(handle);
    }
    
    // Enviamos tareas
    for i in 0..10 {
        tx.send(i).unwrap();
        thread::sleep(Duration::from_millis(100));
    }
    
    // Cerramos el canal
    drop(tx);
    
    // Esperamos a que todos los trabajadores terminen
    for handle in handles {
        handle.join().unwrap();
    }
}

En este patrón, un único productor envía tareas que son procesadas por múltiples consumidores. Cada trabajador compite por obtener la siguiente tarea disponible, lo que permite distribuir la carga de trabajo de forma eficiente.

Patrón de pipeline (tubería)

El patrón pipeline organiza el procesamiento en etapas secuenciales, donde cada etapa procesa los datos y los pasa a la siguiente:

use std::sync::mpsc;
use std::thread;

fn main() {
    // Creamos los canales para cada etapa del pipeline
    let (tx_entrada, rx_entrada) = mpsc::channel();
    let (tx_filtrado, rx_filtrado) = mpsc::channel();
    let (tx_transformado, rx_transformado) = mpsc::channel();
    
    // Etapa 1: Generador de datos
    thread::spawn(move || {
        for i in 1..=20 {
            tx_entrada.send(i).unwrap();
        }
    });
    
    // Etapa 2: Filtrado (solo números pares)
    thread::spawn(move || {
        for num in rx_entrada {
            if num % 2 == 0 {
                tx_filtrado.send(num).unwrap();
            }
        }
    });
    
    // Etapa 3: Transformación (multiplicar por 10)
    thread::spawn(move || {
        for num in rx_filtrado {
            tx_transformado.send(num * 10).unwrap();
        }
    });
    
    // Etapa 4: Consumidor final
    for resultado in rx_transformado {
        println!("Resultado final: {}", resultado);
    }
}

Este patrón es ideal para procesos que requieren transformaciones secuenciales de datos. Cada etapa puede ejecutarse en paralelo con las demás, procesando elementos tan pronto como estén disponibles de la etapa anterior.

Patrón de solicitud-respuesta

Este patrón implementa una comunicación bidireccional donde un cliente envía una solicitud y espera una respuesta:

use std::sync::mpsc;
use std::thread;

fn main() {
    // Canal para solicitudes
    let (tx_solicitud, rx_solicitud) = mpsc::channel();
    
    // Iniciamos el servidor
    thread::spawn(move || {
        for (valor, tx_respuesta) in rx_solicitud {
            // Procesamos la solicitud (en este caso, calculamos el cuadrado)
            let resultado = valor * valor;
            
            // Enviamos la respuesta
            tx_respuesta.send(resultado).unwrap();
        }
    });
    
    // Función cliente para enviar solicitudes
    let enviar_solicitud = |valor| {
        // Creamos un canal para la respuesta
        let (tx_respuesta, rx_respuesta) = mpsc::channel();
        
        // Enviamos la solicitud junto con el canal de respuesta
        tx_solicitud.send((valor, tx_respuesta)).unwrap();
        
        // Esperamos y devolvemos la respuesta
        rx_respuesta.recv().unwrap()
    };
    
    // Enviamos varias solicitudes
    for i in 1..=5 {
        let respuesta = enviar_solicitud(i);
        println!("El cuadrado de {} es {}", i, respuesta);
    }
}

Este patrón es útil para implementar servicios donde un hilo actúa como servidor, procesando solicitudes de múltiples clientes y enviando respuestas específicas a cada uno.

Patrón de broadcast (difusión)

El patrón broadcast permite enviar el mismo mensaje a múltiples receptores:

use std::sync::mpsc;
use std::thread;

fn main() {
    // Número de receptores
    let num_receptores = 3;
    
    // Creamos canales para cada receptor
    let mut transmisores = Vec::new();
    let mut handles = Vec::new();
    
    for id in 0..num_receptores {
        let (tx, rx) = mpsc::channel();
        transmisores.push(tx);
        
        // Creamos un hilo receptor
        let handle = thread::spawn(move || {
            println!("Receptor {} iniciado", id);
            
            for mensaje in rx {
                println!("Receptor {} recibió: {}", id, mensaje);
            }
            
            println!("Receptor {} terminado", id);
        });
        
        handles.push(handle);
    }
    
    // Enviamos mensajes a todos los receptores
    for i in 1..=5 {
        let mensaje = format!("Mensaje broadcast #{}", i);
        
        for tx in &transmisores {
            tx.send(mensaje.clone()).unwrap();
        }
        
        thread::sleep(std::time::Duration::from_millis(100));
    }
    
    // Cerramos todos los canales
    drop(transmisores);
    
    // Esperamos a que todos los receptores terminen
    for handle in handles {
        handle.join().unwrap();
    }
}

Este patrón es útil para notificaciones o cuando múltiples componentes necesitan recibir las mismas actualizaciones.

Patrón de selección dinámica

Aunque Rust no tiene un operador select nativo como Go, podemos implementar un patrón similar usando try_recv en un bucle:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Creamos dos canales
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();
    
    // Productor para el primer canal
    thread::spawn(move || {
        for i in 1..=3 {
            thread::sleep(Duration::from_millis(500));
            tx1.send(format!("Canal 1: mensaje {}", i)).unwrap();
        }
    });
    
    // Productor para el segundo canal
    thread::spawn(move || {
        for i in 1..=3 {
            thread::sleep(Duration::from_millis(700));
            tx2.send(format!("Canal 2: mensaje {}", i)).unwrap();
        }
    });
    
    // Simulamos un select monitoreando ambos canales
    let mut mensajes_pendientes = true;
    let mut contador1 = 0;
    let mut contador2 = 0;
    
    while mensajes_pendientes {
        let mut recibido_algo = false;
        
        // Intentamos recibir del primer canal
        match rx1.try_recv() {
            Ok(msg) => {
                println!("Recibido: {}", msg);
                contador1 += 1;
                recibido_algo = true;
            },
            Err(mpsc::TryRecvError::Empty) => {},
            Err(mpsc::TryRecvError::Disconnected) => contador1 = 3,
        }
        
        // Intentamos recibir del segundo canal
        match rx2.try_recv() {
            Ok(msg) => {
                println!("Recibido: {}", msg);
                contador2 += 1;
                recibido_algo = true;
            },
            Err(mpsc::TryRecvError::Empty) => {},
            Err(mpsc::TryRecvError::Disconnected) => contador2 = 3,
        }
        
        // Si no recibimos nada, esperamos un poco
        if !recibido_algo {
            thread::sleep(Duration::from_millis(100));
        }
        
        // Verificamos si hemos recibido todos los mensajes
        if contador1 >= 3 && contador2 >= 3 {
            mensajes_pendientes = false;
        }
    }
    
    println!("Todos los mensajes han sido procesados");
}

Este patrón permite monitorear múltiples canales simultáneamente, procesando mensajes a medida que llegan de cualquier fuente.

Patrón de timeout (tiempo de espera)

A veces necesitamos limitar el tiempo que esperamos por un mensaje:

use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Enviamos un mensaje después de cierto tiempo
    thread::spawn(move || {
        thread::sleep(Duration::from_secs(2));
        tx.send("Mensaje tardío").unwrap();
    });
    
    // Esperamos con timeout
    let timeout = Duration::from_secs(1);
    let inicio = Instant::now();
    
    println!("Esperando mensaje con timeout de 1 segundo...");
    
    let resultado = loop {
        match rx.try_recv() {
            Ok(msg) => break Ok(msg),
            Err(mpsc::TryRecvError::Empty) => {
                if inicio.elapsed() >= timeout {
                    break Err("Timeout alcanzado");
                }
                thread::sleep(Duration::from_millis(100));
            },
            Err(mpsc::TryRecvError::Disconnected) => {
                break Err("Canal cerrado");
            }
        }
    };
    
    match resultado {
        Ok(msg) => println!("Recibido: {}", msg),
        Err(err) => println!("Error: {}", err),
    }
    
    // Esperamos un poco más para ver si llega el mensaje tardío
    thread::sleep(Duration::from_secs(2));
    
    // Intentamos recibir de nuevo
    match rx.try_recv() {
        Ok(msg) => println!("Mensaje recibido después del timeout: {}", msg),
        Err(e) => println!("No hay más mensajes: {:?}", e),
    }
}

Este patrón es crucial para sistemas que necesitan responder en tiempo real o que no pueden bloquearse indefinidamente esperando comunicaciones.

Patrón de combinación (join)

Este patrón recolecta resultados de múltiples hilos trabajadores:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    let num_workers = 4;
    
    // Lanzamos varios trabajadores
    for id in 0..num_workers {
        let tx = tx.clone();
        
        thread::spawn(move || {
            // Cada trabajador realiza su tarea
            let resultado = id * 10;
            
            // Enviamos el resultado junto con el ID
            tx.send((id, resultado)).unwrap();
        });
    }
    
    // Descartamos el transmisor original
    drop(tx);
    
    // Recolectamos todos los resultados
    let mut resultados = vec![0; num_workers];
    
    for (id, resultado) in rx {
        println!("Trabajador {} completó con resultado: {}", id, resultado);
        resultados[id] = resultado;
    }
    
    // Procesamos los resultados combinados
    let suma_total: usize = resultados.iter().sum();
    println!("Resultado combinado: {}", suma_total);
}

Este patrón es útil para dividir y conquistar problemas, donde cada trabajador procesa una parte de los datos y luego se combinan los resultados.

Los patrones de envío y recepción nos permiten estructurar nuestras aplicaciones concurrentes de manera clara y segura, aprovechando al máximo las garantías de seguridad que ofrece Rust. Cada patrón tiene sus casos de uso específicos y puede combinarse con otros para crear soluciones más complejas y robustas.

Alternativas a mutex

Aunque los mutex son una herramienta fundamental para la sincronización en programación concurrente, no siempre representan la solución óptima para todos los escenarios. Rust ofrece varias alternativas que pueden resultar más adecuadas dependiendo del contexto, proporcionando mejor rendimiento, mayor seguridad o código más legible.

Los canales que hemos explorado anteriormente constituyen una de las alternativas más potentes a los mutex, pero existen otras opciones que vale la pena conocer.

Tipos atómicos

Los tipos atómicos permiten operaciones que se ejecutan de manera indivisible sin necesidad de bloqueos explícitos:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::sync::Arc;

fn main() {
    // Creamos un contador atómico compartido
    let contador = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];
    
    // Lanzamos 10 hilos que incrementarán el contador
    for _ in 0..10 {
        let contador_clon = contador.clone();
        
        let handle = thread::spawn(move || {
            for _ in 0..100 {
                // Incrementamos el contador de forma atómica
                contador_clon.fetch_add(1, Ordering::SeqCst);
            }
        });
        
        handles.push(handle);
    }
    
    // Esperamos a que todos los hilos terminen
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("Valor final del contador: {}", contador.load(Ordering::SeqCst));
}

Los tipos atómicos son más eficientes que los mutex para operaciones simples como incrementos, decrementos o intercambios de valores. La biblioteca estándar de Rust ofrece varios tipos atómicos como AtomicBool, AtomicUsize, AtomicIsize y AtomicPtr.

El parámetro Ordering especifica el nivel de garantías de memoria que necesitamos:

  • Relaxed: Proporciona la operación atómica pero con mínimas garantías de ordenamiento
  • SeqCst (Sequential Consistency): Ofrece el ordenamiento más estricto y predecible
  • Acquire/Release: Proporcionan garantías intermedias útiles para patrones específicos

Read-Write Lock (RwLock)

Cuando tenemos un patrón donde las lecturas son mucho más frecuentes que las escrituras, un RwLock puede ofrecer mejor rendimiento que un mutex regular:

use std::sync::RwLock;
use std::thread;
use std::sync::Arc;
use std::time::Duration;

fn main() {
    // Datos protegidos por un RwLock
    let datos = Arc::new(RwLock::new(vec![1, 2, 3, 4]));
    let mut handles = vec![];
    
    // Creamos varios hilos lectores
    for id in 0..3 {
        let datos_clon = datos.clone();
        
        let handle = thread::spawn(move || {
            for _ in 0..5 {
                // Adquirimos acceso de lectura
                let valores = datos_clon.read().unwrap();
                
                println!("Lector {}: valores actuales: {:?}", id, *valores);
                
                // Simulamos algún procesamiento
                thread::sleep(Duration::from_millis(50));
                
                // El lock de lectura se libera automáticamente al final del scope
            }
        });
        
        handles.push(handle);
    }
    
    // Creamos un hilo escritor
    let datos_clon = datos.clone();
    let handle = thread::spawn(move || {
        for i in 0..2 {
            // Esperamos un poco antes de escribir
            thread::sleep(Duration::from_millis(200));
            
            // Adquirimos acceso de escritura
            let mut valores = datos_clon.write().unwrap();
            valores.push(5 + i);
            
            println!("Escritor: añadido nuevo valor, ahora: {:?}", *valores);
            
            // El lock de escritura se libera automáticamente al final del scope
        }
    });
    
    handles.push(handle);
    
    // Esperamos a que todos los hilos terminen
    for handle in handles {
        handle.join().unwrap();
    }
}

Un RwLock permite que múltiples lectores accedan simultáneamente a los datos, pero garantiza acceso exclusivo cuando un escritor necesita modificarlos. Esto puede mejorar significativamente el rendimiento en escenarios donde las lecturas son predominantes.

Once Cell

Para inicialización perezosa pero thread-safe, std::sync::OnceLock (estable desde Rust 1.70) proporciona una solución elegante:

use std::sync::OnceLock;
use std::thread;

// Una estructura costosa de inicializar
struct DatosComplejos {
    valores: Vec<u64>,
}

impl DatosComplejos {
    fn new() -> Self {
        println!("Inicializando datos complejos (operación costosa)...");
        // Simulamos una inicialización costosa
        thread::sleep(std::time::Duration::from_millis(500));
        
        Self {
            valores: vec![100, 200, 300, 400],
        }
    }
}

// Singleton global inicializado perezosamente
static DATOS_GLOBALES: OnceLock<DatosComplejos> = OnceLock::new();

fn obtener_datos() -> &'static DatosComplejos {
    DATOS_GLOBALES.get_or_init(|| DatosComplejos::new())
}

fn main() {
    let mut handles = vec![];
    
    // Varios hilos intentarán acceder a los datos
    for id in 0..5 {
        let handle = thread::spawn(move || {
            println!("Hilo {} accediendo a los datos...", id);
            let datos = obtener_datos();
            println!("Hilo {} obtuvo datos: {:?}", id, datos.valores);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    // Verificamos que la inicialización solo ocurrió una vez
    println!("Accediendo de nuevo a los datos desde el hilo principal");
    let datos = obtener_datos();
    println!("Valores: {:?}", datos.valores);
}

OnceLock garantiza que la inicialización ocurra exactamente una vez, incluso si múltiples hilos intentan inicializar simultáneamente. Es ideal para recursos globales o configuraciones que deben cargarse bajo demanda.

Barrier (Barrera)

Cuando necesitamos sincronizar múltiples hilos para que comiencen o continúen su ejecución al mismo tiempo, podemos usar una barrera:

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;

fn main() {
    let num_hilos = 5;
    // Creamos una barrera que esperará por 5 hilos
    let barrera = Arc::new(Barrier::new(num_hilos));
    let mut handles = vec![];
    
    for id in 0..num_hilos {
        let barrera = barrera.clone();
        
        let handle = thread::spawn(move || {
            println!("Hilo {} iniciando fase de preparación", id);
            
            // Simulamos trabajo de preparación con duración variable
            let prep_time = Duration::from_millis(id * 100 + 500);
            thread::sleep(prep_time);
            
            println!("Hilo {} listo, esperando a los demás...", id);
            
            // Esperamos a que todos los hilos lleguen a este punto
            barrera.wait();
            
            println!("Hilo {} comenzando fase de ejecución sincronizada", id);
            
            // Ahora todos los hilos continúan simultáneamente
            // Simulamos el trabajo principal
            thread::sleep(Duration::from_millis(300));
            
            println!("Hilo {} completó su trabajo", id);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Una Barrier es útil para algoritmos paralelos que requieren sincronización en puntos específicos, como cálculos por fases donde cada fase depende de que todos los hilos hayan completado la fase anterior.

Lazy Cell

Para inicialización perezosa en un contexto de un solo hilo, std::cell::OnceCell (o std::cell::LazyCell en versiones más recientes) ofrece una alternativa más ligera:

use std::cell::OnceCell;

struct Configuracion {
    max_conexiones: usize,
    timeout: u64,
}

fn main() {
    // Creamos una celda que se inicializará perezosamente
    let config = OnceCell::new();
    
    // La primera vez que necesitamos la configuración, la inicializamos
    let valor = config.get_or_init(|| {
        println!("Inicializando configuración...");
        Configuracion {
            max_conexiones: 100,
            timeout: 30,
        }
    });
    
    println!("Max conexiones: {}", valor.max_conexiones);
    
    // Las siguientes veces, se reutiliza el valor ya inicializado
    let mismo_valor = config.get_or_init(|| {
        println!("Esto no debería imprimirse");
        Configuracion {
            max_conexiones: 50,  // Estos valores no se usarán
            timeout: 10,
        }
    });
    
    println!("Timeout: {}", mismo_valor.timeout);
}

A diferencia de OnceLock, esta versión no es thread-safe pero tiene menos sobrecarga, lo que la hace ideal para contextos donde sabemos que solo un hilo accederá a los datos.

RefCell

Cuando necesitamos mutabilidad interior en un contexto de un solo hilo, RefCell proporciona comprobaciones de préstamo en tiempo de ejecución:

use std::cell::RefCell;

fn main() {
    // Creamos un RefCell con un valor inicial
    let contador = RefCell::new(0);
    
    // Podemos tomar prestado el valor inmutablemente
    {
        let valor = contador.borrow();
        println!("Valor actual: {}", *valor);
    }
    
    // O podemos tomar prestado el valor mutablemente
    {
        let mut valor_mut = contador.borrow_mut();
        *valor_mut += 1;
        println!("Valor incrementado: {}", *valor_mut);
    }
    
    // Podemos tener múltiples préstamos inmutables
    let valor1 = contador.borrow();
    let valor2 = contador.borrow();
    
    println!("Múltiples lectores: {} y {}", *valor1, *valor2);
    
    // Pero no podemos tener préstamos mutables e inmutables simultáneamente
    // El siguiente código causaría un panic en tiempo de ejecución:
    // let valor_mut = contador.borrow_mut(); // ¡Esto causaría un panic!
}

RefCell es útil cuando necesitamos mutabilidad interior en estructuras que aparentan ser inmutables desde fuera. A diferencia de Mutex, las comprobaciones ocurren en tiempo de ejecución en lugar de ser garantizadas por el sistema de tipos.

Comparación de alternativas

Cada alternativa tiene sus casos de uso óptimos:

  • Canales MPSC: Ideales para comunicación entre hilos y patrones productor-consumidor.
  • Tipos atómicos: Perfectos para contadores compartidos y operaciones simples sin bloqueo.
  • RwLock: Óptimo cuando las lecturas son mucho más frecuentes que las escrituras.
  • OnceLock: Excelente para inicialización perezosa thread-safe de recursos globales.
  • Barrier: Útil para sincronizar múltiples hilos en puntos específicos de ejecución.
  • RefCell/OnceCell: Apropiados para contextos de un solo hilo con necesidades de mutabilidad interior.

La elección entre estas alternativas depende de factores como:

  • El patrón de acceso a los datos compartidos (lecturas vs. escrituras)
  • La granularidad de la sincronización necesaria
  • Los requisitos de rendimiento de la aplicación
  • La complejidad del código resultante

En general, es recomendable seguir el principio de Rust de "comunicar mediante el paso de mensajes" utilizando canales cuando sea posible, y recurrir a las otras alternativas cuando tengamos necesidades específicas que los canales no satisfagan adecuadamente.

Aprende Rust online

Otras lecciones de Rust

Accede a todas las lecciones de Rust y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Accede GRATIS a Rust y certifícate

Ejercicios de programación de Rust

Evalúa tus conocimientos de esta lección Channels y paso de mensajes con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.

En esta lección

Objetivos de aprendizaje de esta lección

  • Comprender el funcionamiento y creación de canales MPSC en Rust para comunicación entre hilos.
  • Aprender a enviar y recibir mensajes, incluyendo datos complejos, a través de canales.
  • Conocer patrones comunes de envío y recepción para estructurar aplicaciones concurrentes.
  • Explorar alternativas a mutex para sincronización y mutabilidad en entornos concurrentes.
  • Identificar casos de uso adecuados para cada mecanismo de sincronización y comunicación.