Bases de datos asíncronas

Intermedio
FastAPI
FastAPI
Actualizado: 01/07/2025

¡Desbloquea el curso completo!

IA
Ejercicios
Certificado
Entrar

Configurar SQLAlchemy asíncrono

SQLAlchemy asíncrono permite que tu aplicación FastAPI maneje múltiples operaciones de base de datos simultáneamente sin bloquear el hilo principal. En lugar de esperar a que cada consulta termine antes de procesar la siguiente, el motor asíncrono puede alternar entre diferentes operaciones mientras esperan respuestas de la base de datos.

La configuración asíncrona requiere componentes específicos que reemplazan sus equivalentes síncronos. El motor asíncrono (create_async_engine) sustituye al motor tradicional, mientras que las sesiones asíncronas (AsyncSession) reemplazan las sesiones normales de SQLAlchemy.

Instalación de dependencias asíncronas

Para trabajar con SQLAlchemy asíncrono necesitas instalar el driver asíncrono correspondiente a tu base de datos. Para PostgreSQL, el más utilizado es asyncpg:

pip install sqlalchemy[asyncio] asyncpg

Si prefieres SQLite para desarrollo, puedes usar aiosqlite:

pip install sqlalchemy[asyncio] aiosqlite

Configuración del motor asíncrono

El motor asíncrono se crea con create_async_engine en lugar de create_engine. La URL de conexión debe incluir el prefijo del driver asíncrono:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# Para PostgreSQL con asyncpg
DATABASE_URL = "postgresql+asyncpg://usuario:password@localhost/mi_base_datos"

# Para SQLite con aiosqlite (desarrollo)
# DATABASE_URL = "sqlite+aiosqlite:///./test.db"

# Crear el motor asíncrono
engine = create_async_engine(DATABASE_URL, echo=True)

# Configurar el sessionmaker asíncrono
AsyncSessionLocal = sessionmaker(
    engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

El parámetro echo=True muestra las consultas SQL en la consola, útil durante el desarrollo. La opción expire_on_commit=False evita que los objetos se expiren automáticamente después de hacer commit, permitiendo acceder a sus atributos sin realizar consultas adicionales.

Creación de tablas asíncronas

Para crear las tablas de forma asíncrona, necesitas usar el contexto asíncrono del motor. El método begin() proporciona una conexión asíncrona que puedes usar con await:

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

async def create_tables():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

# Llamar esta función al iniciar la aplicación
# await create_tables()

El método run_sync() permite ejecutar código síncrono (como create_all()) dentro del contexto asíncrono. Esto es necesario porque algunos métodos de SQLAlchemy aún no tienen equivalentes completamente asíncronos.

Gestión de sesiones asíncronas

La gestión de sesiones asíncronas requiere un enfoque diferente. Puedes crear una función generadora que proporcione sesiones asíncronas a tus endpoints:

async def get_async_session():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

Esta función utiliza el patrón de contexto asíncrono para garantizar que las sesiones se cierren correctamente, incluso si ocurre una excepción. El yield permite que FastAPI inyecte la sesión como dependencia en tus endpoints.

Integración con FastAPI

Para usar la sesión asíncrona en tus endpoints, declárala como dependencia usando Depends:

from fastapi import FastAPI, Depends

app = FastAPI()

@app.get("/usuarios/")
async def obtener_usuarios(session: AsyncSession = Depends(get_async_session)):
    # Aquí realizarás las operaciones CRUD asíncronas
    pass

La palabra clave async en la definición del endpoint es crucial. Sin ella, FastAPI no podrá manejar correctamente las operaciones asíncronas de la base de datos.

Configuración completa de ejemplo

Aquí tienes una configuración completa que puedes usar como base para tu aplicación:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from fastapi import FastAPI

# Configuración de base de datos
DATABASE_URL = "postgresql+asyncpg://usuario:password@localhost/mi_base_datos"

engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()

# Función para obtener sesión asíncrona
async def get_async_session():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

# Crear tablas al iniciar
async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    await init_db()

Esta configuración establece todos los componentes necesarios para trabajar con SQLAlchemy asíncrono en FastAPI. El evento startup garantiza que las tablas se creen automáticamente cuando la aplicación se inicia, simplificando el proceso de desarrollo.

¿Te está gustando esta lección?

Inicia sesión para no perder tu progreso y accede a miles de tutoriales, ejercicios prácticos y nuestro asistente de IA.

Progreso guardado
Asistente IA
Ejercicios
Iniciar sesión gratis

Más de 25.000 desarrolladores ya confían en CertiDevs

Operaciones CRUD asíncronas básicas

Las operaciones CRUD asíncronas siguen los mismos principios que las síncronas, pero requieren el uso de await y métodos específicos para el contexto asíncrono. La principal diferencia radica en cómo se ejecutan las consultas y se manejan los resultados.

Definición del modelo

Antes de realizar operaciones CRUD, necesitas un modelo SQLAlchemy. Aquí tienes un ejemplo básico de un modelo Usuario:

from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Usuario(Base):
    __tablename__ = "usuarios"
    
    id = Column(Integer, primary_key=True, index=True)
    nombre = Column(String, index=True)
    email = Column(String, unique=True, index=True)
    activo = Column(Boolean, default=True)

Crear registros (Create)

Para crear registros de forma asíncrona, añades el objeto a la sesión y usas await con commit():

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession

@app.post("/usuarios/")
async def crear_usuario(
    nombre: str, 
    email: str, 
    session: AsyncSession = Depends(get_async_session)
):
    nuevo_usuario = Usuario(nombre=nombre, email=email)
    session.add(nuevo_usuario)
    await session.commit()
    await session.refresh(nuevo_usuario)
    return nuevo_usuario

El método refresh() es asíncrono y actualiza el objeto con los datos generados por la base de datos, como el ID auto-incrementado. Sin await session.refresh(), el objeto no tendría el ID asignado por la base de datos.

Leer registros (Read)

Las consultas de lectura utilizan métodos asíncronos específicos. Para obtener un registro por ID:

from sqlalchemy import select

@app.get("/usuarios/{usuario_id}")
async def obtener_usuario(
    usuario_id: int, 
    session: AsyncSession = Depends(get_async_session)
):
    stmt = select(Usuario).where(Usuario.id == usuario_id)
    result = await session.execute(stmt)
    usuario = result.scalar_one_or_none()
    
    if not usuario:
        raise HTTPException(status_code=404, detail="Usuario no encontrado")
    
    return usuario

El método scalar_one_or_none() devuelve un único resultado o None si no encuentra coincidencias. Para obtener múltiples registros:

@app.get("/usuarios/")
async def listar_usuarios(
    skip: int = 0, 
    limit: int = 10,
    session: AsyncSession = Depends(get_async_session)
):
    stmt = select(Usuario).offset(skip).limit(limit)
    result = await session.execute(stmt)
    usuarios = result.scalars().all()
    return usuarios

El método scalars().all() devuelve una lista con todos los resultados de la consulta. La diferencia entre scalar() y scalars() es que el primero devuelve un único valor, mientras que el segundo devuelve una secuencia de valores.

Actualizar registros (Update)

Para actualizar registros, primero obtienes el objeto, modificas sus atributos y confirmas los cambios:

@app.put("/usuarios/{usuario_id}")
async def actualizar_usuario(
    usuario_id: int,
    nombre: str = None,
    email: str = None,
    session: AsyncSession = Depends(get_async_session)
):
    stmt = select(Usuario).where(Usuario.id == usuario_id)
    result = await session.execute(stmt)
    usuario = result.scalar_one_or_none()
    
    if not usuario:
        raise HTTPException(status_code=404, detail="Usuario no encontrado")
    
    if nombre:
        usuario.nombre = nombre
    if email:
        usuario.email = email
    
    await session.commit()
    await session.refresh(usuario)
    return usuario

SQLAlchemy rastrea automáticamente los cambios en los objetos cargados en la sesión. No necesitas llamar a session.add() para objetos que ya existen en la sesión, solo modificar sus atributos y hacer commit().

Eliminar registros (Delete)

Para eliminar registros, obtienes el objeto y usas session.delete():

@app.delete("/usuarios/{usuario_id}")
async def eliminar_usuario(
    usuario_id: int,
    session: AsyncSession = Depends(get_async_session)
):
    stmt = select(Usuario).where(Usuario.id == usuario_id)
    result = await session.execute(stmt)
    usuario = result.scalar_one_or_none()
    
    if not usuario:
        raise HTTPException(status_code=404, detail="Usuario no encontrado")
    
    await session.delete(usuario)
    await session.commit()
    return {"mensaje": "Usuario eliminado correctamente"}

Consultas con filtros

Las consultas con filtros utilizan la misma sintaxis que SQLAlchemy síncrono, pero con await en la ejecución:

@app.get("/usuarios/activos/")
async def obtener_usuarios_activos(
    session: AsyncSession = Depends(get_async_session)
):
    stmt = select(Usuario).where(Usuario.activo == True)
    result = await session.execute(stmt)
    usuarios_activos = result.scalars().all()
    return usuarios_activos

@app.get("/usuarios/buscar/")
async def buscar_usuarios(
    nombre: str,
    session: AsyncSession = Depends(get_async_session)
):
    stmt = select(Usuario).where(Usuario.nombre.contains(nombre))
    result = await session.execute(stmt)
    usuarios = result.scalars().all()
    return usuarios

Manejo de errores en operaciones asíncronas

El manejo de errores en operaciones asíncronas requiere capturar excepciones específicas de SQLAlchemy:

from sqlalchemy.exc import IntegrityError
from fastapi import HTTPException

@app.post("/usuarios/")
async def crear_usuario_seguro(
    nombre: str,
    email: str,
    session: AsyncSession = Depends(get_async_session)
):
    try:
        nuevo_usuario = Usuario(nombre=nombre, email=email)
        session.add(nuevo_usuario)
        await session.commit()
        await session.refresh(nuevo_usuario)
        return nuevo_usuario
    except IntegrityError:
        await session.rollback()
        raise HTTPException(
            status_code=400, 
            detail="El email ya está registrado"
        )

La llamada a rollback() es asíncrona y revierte cualquier cambio pendiente en la sesión. Esto es especialmente importante cuando ocurren errores de integridad de datos, como violaciones de restricciones únicas.

Operaciones en lote

Para operaciones en lote, puedes procesar múltiples registros en una sola transacción:

@app.post("/usuarios/lote/")
async def crear_usuarios_lote(
    usuarios_data: list[dict],
    session: AsyncSession = Depends(get_async_session)
):
    usuarios_creados = []
    
    for data in usuarios_data:
        usuario = Usuario(nombre=data["nombre"], email=data["email"])
        session.add(usuario)
        usuarios_creados.append(usuario)
    
    await session.commit()
    
    # Refrescar todos los usuarios para obtener sus IDs
    for usuario in usuarios_creados:
        await session.refresh(usuario)
    
    return usuarios_creados

Este enfoque es más eficiente que crear usuarios uno por uno, ya que utiliza una sola transacción para todas las operaciones. SQLAlchemy optimiza automáticamente las consultas cuando trabajas con múltiples objetos en la misma sesión.

Aprendizajes de esta lección

  • Comprender la configuración y uso del motor y sesiones asíncronas en SQLAlchemy.
  • Aprender a crear y gestionar tablas de forma asíncrona.
  • Implementar operaciones CRUD asíncronas básicas en FastAPI.
  • Manejar errores y transacciones en un contexto asíncrono.
  • Optimizar operaciones con consultas filtradas y en lote usando SQLAlchemy asíncrono.

Completa FastAPI y certifícate

Únete a nuestra plataforma y accede a miles de tutoriales, ejercicios prácticos, proyectos reales y nuestro asistente de IA personalizado para acelerar tu aprendizaje.

Asistente IA

Resuelve dudas al instante

Ejercicios

Practica con proyectos reales

Certificados

Valida tus conocimientos

Más de 25.000 desarrolladores ya se han certificado con CertiDevs

⭐⭐⭐⭐⭐
4.9/5 valoración