FastAPI

FastAPI

Tutorial FastAPI: Conexión de FastAPI con SQLAlchemy

Aprende a conectar FastAPI con SQLAlchemy y MySQL para crear una API REST con operaciones CRUD seguras y optimizadas.

Aprende FastAPI y certifícate

Conexión con SQLAlchemy con MySQL

SQLAlchemy es una biblioteca ORM (Object-Relational Mapping) para Python que facilita la interacción con bases de datos relacionales. Integrar SQLAlchemy con FastAPI nos permite crear aplicaciones web con persistencia de datos de forma elegante y mantenible, separando la lógica de negocio de la capa de acceso a datos.

Preparando el entorno

Antes de comenzar, necesitamos instalar las dependencias necesarias para trabajar con SQLAlchemy y MySQL:

pip install sqlalchemy pymysql cryptography

El paquete pymysql es el driver que permite a SQLAlchemy comunicarse con MySQL, mientras que cryptography proporciona soporte para conexiones seguras.

Estructura del proyecto

Para mantener nuestro código organizado, crearemos una estructura de carpetas adecuada:

mi_aplicacion/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── database.py
│   ├── models/
│   │   ├── __init__.py
│   │   └── models.py
│   └── config.py
└── requirements.txt

Configuración de la conexión

Primero, vamos a crear un archivo config.py para almacenar la configuración de la base de datos:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    DATABASE_URL: str = "mysql+pymysql://usuario:contraseña@localhost:3306/nombre_db"
    
    class Config:
        env_file = ".env"

settings = Settings()

Este enfoque nos permite definir la configuración con valores predeterminados que pueden ser sobrescritos por variables de entorno o un archivo .env.

Estableciendo la conexión con SQLAlchemy

Ahora, crearemos el archivo database.py para configurar la conexión a MySQL:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from app.config import settings

# Crear el motor de SQLAlchemy
engine = create_engine(
    settings.DATABASE_URL,
    echo=True  # Muestra las consultas SQL generadas (útil para desarrollo)
)

# Crear una clase de sesión
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Crear una clase base para los modelos
Base = declarative_base()

# Función para obtener una sesión de base de datos
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

En este código:

  • create_engine establece la conexión con MySQL usando la URL de conexión.
  • sessionmaker crea una fábrica de sesiones que generará objetos de sesión para interactuar con la base de datos.
  • declarative_base proporciona una clase base para definir nuestros modelos.
  • La función get_db implementa un patrón de dependencia que FastAPI utilizará para inyectar sesiones de base de datos en nuestros endpoints.

Definiendo modelos

En el archivo models.py, definiremos las clases que representan nuestras tablas en la base de datos:

from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func

from app.database import Base

class Usuario(Base):
    __tablename__ = "usuarios"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(100), unique=True, index=True, nullable=False)
    nombre = Column(String(100))
    hashed_password = Column(String(100), nullable=False)
    is_active = Column(Boolean, default=True)
    fecha_registro = Column(DateTime(timezone=True), server_default=func.now())
    
    # Relación con otras tablas
    productos = relationship("Producto", back_populates="propietario")

class Producto(Base):
    __tablename__ = "productos"
    
    id = Column(Integer, primary_key=True, index=True)
    nombre = Column(String(100), index=True)
    descripcion = Column(String(500))
    precio = Column(Float, nullable=False)
    propietario_id = Column(Integer, ForeignKey("usuarios.id"))
    
    # Relación con otras tablas
    propietario = relationship("Usuario", back_populates="productos")

Cada clase hereda de Base y define una tabla con sus columnas y relaciones. SQLAlchemy proporciona tipos de datos que se mapean a los tipos de MySQL:

  • IntegerINT
  • StringVARCHAR
  • FloatFLOAT
  • DateTimeDATETIME
  • BooleanTINYINT(1)

Creando las tablas

Para crear las tablas en la base de datos, necesitamos ejecutar el siguiente código:

from app.database import engine
from app.models import models

# Crear todas las tablas definidas en los modelos
models.Base.metadata.create_all(bind=engine)

Este código puede incluirse en el archivo main.py o ejecutarse como un script separado durante la inicialización de la aplicación.

Configuración de FastAPI con SQLAlchemy

Ahora, integremos todo en nuestra aplicación FastAPI en el archivo main.py:

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session

from app.database import engine, get_db
from app.models import models

# Crear las tablas en la base de datos
models.Base.metadata.create_all(bind=engine)

# Crear la aplicación FastAPI
app = FastAPI(title="Mi API con MySQL")

@app.get("/")
def read_root():
    return {"message": "API conectada a MySQL con SQLAlchemy"}

@app.get("/db-test")
def test_db(db: Session = Depends(get_db)):
    # Simplemente probamos que la conexión funciona
    return {"message": "Conexión a la base de datos establecida correctamente"}

En este código:

  1. Importamos las dependencias necesarias.
  2. Creamos las tablas en la base de datos.
  3. Inicializamos la aplicación FastAPI.
  4. Definimos un endpoint de prueba que utiliza la inyección de dependencias para obtener una sesión de base de datos.

Opciones avanzadas de conexión

Para entornos de producción, es recomendable configurar opciones adicionales en la conexión:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    settings.DATABASE_URL,
    echo=False,  # Desactivar en producción
    pool_size=5,  # Número de conexiones permanentes
    max_overflow=10,  # Conexiones adicionales permitidas
    pool_timeout=30,  # Tiempo de espera para obtener una conexión
    pool_recycle=1800,  # Reciclar conexiones cada 30 minutos
    pool_pre_ping=True,  # Verificar conexiones antes de usarlas
    poolclass=QueuePool  # Tipo de pool a utilizar
)

Estas opciones nos permiten:

  • Controlar el tamaño del pool de conexiones.
  • Establecer tiempos de espera y reciclaje para evitar conexiones obsoletas.
  • Activar la verificación de conexiones con pool_pre_ping.

Manejo de migraciones con Alembic

Para proyectos más grandes, es recomendable utilizar Alembic para gestionar las migraciones de la base de datos:

pip install alembic
alembic init migrations

Configuramos Alembic editando el archivo alembic.ini:

# alembic.ini
sqlalchemy.url = mysql+pymysql://usuario:contraseña@localhost:3306/nombre_db

Y modificamos migrations/env.py para importar nuestros modelos:

# migrations/env.py
from app.models import models
from app.database import Base
target_metadata = Base.metadata

Para crear una migración:

alembic revision --autogenerate -m "Crear tablas iniciales"

Y para aplicarla:

alembic upgrade head

Consideraciones de seguridad

Al trabajar con bases de datos en aplicaciones web, es importante considerar:

  1. No almacenar credenciales directamente en el código.
  2. Utilizar variables de entorno o archivos .env (con .gitignore).
  3. Implementar control de acceso adecuado a la base de datos.
  4. Configurar SSL/TLS para conexiones seguras:
engine = create_engine(
    "mysql+pymysql://usuario:contraseña@localhost:3306/nombre_db?ssl_ca=/path/to/ca.pem",
    connect_args={
        "ssl": {
            "ca": "/path/to/ca.pem"
        }
    }
)

Optimización del rendimiento

Para mejorar el rendimiento de la conexión con MySQL:

  1. Utiliza índices adecuados en las columnas frecuentemente consultadas:
class Usuario(Base):
    __tablename__ = "usuarios"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(100), unique=True, index=True)
  1. Configura el pool de conexiones según la carga esperada.
  2. Utiliza consultas optimizadas con joins adecuados.
  3. Implementa caché para consultas frecuentes.

Guardar y recuperar datos de base de datos en API REST

Una vez configurada la conexión con SQLAlchemy y MySQL, el siguiente paso es implementar las operaciones CRUD (Crear, Leer, Actualizar y Eliminar) en nuestra API REST. Estas operaciones nos permitirán interactuar con la base de datos a través de endpoints HTTP.

Esquemas de datos con Pydantic

Antes de implementar los endpoints, necesitamos definir los esquemas de datos que utilizaremos para validar las entradas y formatear las salidas. Crearemos un archivo schemas.py:

from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime

# Esquemas para Producto
class ProductoBase(BaseModel):
    nombre: str = Field(..., min_length=1, max_length=100)
    descripcion: Optional[str] = Field(None, max_length=500)
    precio: float = Field(..., gt=0)

class ProductoCreate(ProductoBase):
    pass

class ProductoUpdate(BaseModel):
    nombre: Optional[str] = Field(None, min_length=1, max_length=100)
    descripcion: Optional[str] = Field(None, max_length=500)
    precio: Optional[float] = Field(None, gt=0)

class ProductoResponse(ProductoBase):
    id: int
    propietario_id: int
    
    class Config:
        from_attributes = True

# Esquemas para Usuario
class UsuarioBase(BaseModel):
    email: EmailStr
    nombre: str = Field(..., min_length=1, max_length=100)

class UsuarioCreate(UsuarioBase):
    password: str = Field(..., min_length=8)

class UsuarioResponse(UsuarioBase):
    id: int
    is_active: bool
    fecha_registro: datetime
    productos: List[ProductoResponse] = []
    
    class Config:
        from_attributes = True

Estos esquemas nos ayudan a:

  • Validar datos de entrada con reglas específicas
  • Serializar objetos de SQLAlchemy a JSON
  • Documentar automáticamente la API con Swagger/OpenAPI

Implementando operaciones CRUD

Ahora implementaremos los endpoints para realizar operaciones CRUD. Organizaremos nuestro código en routers para mantenerlo limpio y modular. Crearemos un directorio routers con archivos para cada entidad.

Router de Productos

Creamos el archivo routers/productos.py:

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List

from app.database import get_db
from app.models import models
from app.schemas import ProductoCreate, ProductoResponse, ProductoUpdate

router = APIRouter(
    prefix="/productos",
    tags=["productos"]
)

@router.post("/", response_model=ProductoResponse, status_code=status.HTTP_201_CREATED)
def crear_producto(producto: ProductoCreate, db: Session = Depends(get_db)):
    # Simulamos un usuario propietario (en una app real usaríamos autenticación)
    propietario_id = 1
    
    # Creamos una instancia del modelo
    db_producto = models.Producto(
        nombre=producto.nombre,
        descripcion=producto.descripcion,
        precio=producto.precio,
        propietario_id=propietario_id
    )
    
    # Guardamos en la base de datos
    db.add(db_producto)
    db.commit()
    db.refresh(db_producto)
    
    return db_producto

@router.get("/", response_model=List[ProductoResponse])
def listar_productos(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    # Consultamos todos los productos con paginación
    productos = db.query(models.Producto).offset(skip).limit(limit).all()
    return productos

@router.get("/{producto_id}", response_model=ProductoResponse)
def obtener_producto(producto_id: int, db: Session = Depends(get_db)):
    # Buscamos el producto por ID
    producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
    
    # Si no existe, lanzamos una excepción
    if producto is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Producto con ID {producto_id} no encontrado"
        )
    
    return producto

@router.put("/{producto_id}", response_model=ProductoResponse)
def actualizar_producto(
    producto_id: int, 
    producto_update: ProductoUpdate, 
    db: Session = Depends(get_db)
):
    # Buscamos el producto existente
    db_producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
    
    if db_producto is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Producto con ID {producto_id} no encontrado"
        )
    
    # Actualizamos solo los campos proporcionados
    update_data = producto_update.model_dump(exclude_unset=True)
    for key, value in update_data.items():
        setattr(db_producto, key, value)
    
    # Guardamos los cambios
    db.commit()
    db.refresh(db_producto)
    
    return db_producto

@router.delete("/{producto_id}", status_code=status.HTTP_204_NO_CONTENT)
def eliminar_producto(producto_id: int, db: Session = Depends(get_db)):
    # Buscamos el producto
    db_producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
    
    if db_producto is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Producto con ID {producto_id} no encontrado"
        )
    
    # Eliminamos el producto
    db.delete(db_producto)
    db.commit()
    
    return None

Router de Usuarios

Ahora creamos el archivo routers/usuarios.py:

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List

from app.database import get_db
from app.models import models
from app.schemas import UsuarioCreate, UsuarioResponse

router = APIRouter(
    prefix="/usuarios",
    tags=["usuarios"]
)

@router.post("/", response_model=UsuarioResponse, status_code=status.HTTP_201_CREATED)
def crear_usuario(usuario: UsuarioCreate, db: Session = Depends(get_db)):
    # Verificamos si el email ya existe
    db_usuario = db.query(models.Usuario).filter(models.Usuario.email == usuario.email).first()
    if db_usuario:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="El email ya está registrado"
        )
    
    # En una aplicación real, hashearíamos la contraseña
    hashed_password = usuario.password + "_hashed"  # Simulación simple
    
    # Creamos el usuario
    db_usuario = models.Usuario(
        email=usuario.email,
        nombre=usuario.nombre,
        hashed_password=hashed_password
    )
    
    # Guardamos en la base de datos
    db.add(db_usuario)
    db.commit()
    db.refresh(db_usuario)
    
    return db_usuario

@router.get("/", response_model=List[UsuarioResponse])
def listar_usuarios(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    usuarios = db.query(models.Usuario).offset(skip).limit(limit).all()
    return usuarios

@router.get("/{usuario_id}", response_model=UsuarioResponse)
def obtener_usuario(usuario_id: int, db: Session = Depends(get_db)):
    usuario = db.query(models.Usuario).filter(models.Usuario.id == usuario_id).first()
    
    if usuario is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Usuario con ID {usuario_id} no encontrado"
        )
    
    return usuario

Integrando los routers en la aplicación principal

Ahora, actualizamos nuestro archivo main.py para incluir los routers:

from fastapi import FastAPI
from app.database import engine
from app.models import models
from app.routers import productos, usuarios

# Crear las tablas en la base de datos
models.Base.metadata.create_all(bind=engine)

# Crear la aplicación FastAPI
app = FastAPI(
    title="API de Tienda",
    description="API REST con FastAPI y MySQL",
    version="0.1.0"
)

# Incluir los routers
app.include_router(productos.router)
app.include_router(usuarios.router)

@app.get("/")
def read_root():
    return {"message": "Bienvenido a la API de Tienda"}

Consultas avanzadas con SQLAlchemy

SQLAlchemy nos permite realizar consultas complejas de forma elegante. Veamos algunos ejemplos:

Filtrado y ordenación

Podemos implementar un endpoint para buscar productos con filtros:

@router.get("/buscar/", response_model=List[ProductoResponse])
def buscar_productos(
    nombre: Optional[str] = None,
    precio_min: Optional[float] = None,
    precio_max: Optional[float] = None,
    ordenar_por: Optional[str] = "id",
    db: Session = Depends(get_db)
):
    # Iniciamos la consulta
    query = db.query(models.Producto)
    
    # Aplicamos filtros si se proporcionan
    if nombre:
        query = query.filter(models.Producto.nombre.ilike(f"%{nombre}%"))
    
    if precio_min is not None:
        query = query.filter(models.Producto.precio >= precio_min)
    
    if precio_max is not None:
        query = query.filter(models.Producto.precio <= precio_max)
    
    # Aplicamos ordenación
    if ordenar_por == "precio_asc":
        query = query.order_by(models.Producto.precio.asc())
    elif ordenar_por == "precio_desc":
        query = query.order_by(models.Producto.precio.desc())
    elif ordenar_por == "nombre":
        query = query.order_by(models.Producto.nombre)
    else:
        query = query.order_by(models.Producto.id)
    
    # Ejecutamos la consulta
    productos = query.all()
    return productos

Consultas con relaciones

Podemos obtener los productos de un usuario específico:

@router.get("/usuario/{usuario_id}/productos", response_model=List[ProductoResponse])
def obtener_productos_usuario(usuario_id: int, db: Session = Depends(get_db)):
    # Verificamos que el usuario existe
    usuario = db.query(models.Usuario).filter(models.Usuario.id == usuario_id).first()
    if usuario is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Usuario con ID {usuario_id} no encontrado"
        )
    
    # Obtenemos sus productos
    productos = db.query(models.Producto).filter(
        models.Producto.propietario_id == usuario_id
    ).all()
    
    return productos

Transacciones y manejo de errores

Las transacciones son fundamentales para mantener la integridad de los datos. SQLAlchemy maneja automáticamente las transacciones con los métodos commit() y rollback().

Implementemos un endpoint que realiza múltiples operaciones en una transacción:

@router.post("/transferir-producto/{producto_id}/{nuevo_propietario_id}", response_model=ProductoResponse)
def transferir_producto(
    producto_id: int,
    nuevo_propietario_id: int,
    db: Session = Depends(get_db)
):
    try:
        # Verificamos que el producto existe
        producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
        if not producto:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"Producto con ID {producto_id} no encontrado"
            )
        
        # Verificamos que el nuevo propietario existe
        nuevo_propietario = db.query(models.Usuario).filter(
            models.Usuario.id == nuevo_propietario_id
        ).first()
        if not nuevo_propietario:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"Usuario con ID {nuevo_propietario_id} no encontrado"
            )
        
        # Guardamos el propietario anterior para el registro
        propietario_anterior_id = producto.propietario_id
        
        # Actualizamos el propietario
        producto.propietario_id = nuevo_propietario_id
        
        # Creamos un registro de la transferencia (ejemplo)
        # En una aplicación real, tendríamos un modelo para esto
        print(f"Transferencia: Producto {producto_id} del usuario {propietario_anterior_id} al {nuevo_propietario_id}")
        
        # Confirmamos la transacción
        db.commit()
        
        return producto
        
    except Exception as e:
        # Si ocurre cualquier error, revertimos la transacción
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Error al transferir el producto: {str(e)}"
        )

Paginación y optimización de consultas

Para manejar grandes conjuntos de datos, es importante implementar paginación:

@router.get("/paginados/", response_model=dict)
def listar_productos_paginados(
    pagina: int = 1,
    items_por_pagina: int = 10,
    db: Session = Depends(get_db)
):
    # Calculamos el offset
    offset = (pagina - 1) * items_por_pagina
    
    # Obtenemos el total de productos
    total = db.query(models.Producto).count()
    
    # Obtenemos los productos de la página actual
    productos = db.query(models.Producto).offset(offset).limit(items_por_pagina).all()
    
    # Calculamos el total de páginas
    total_paginas = (total + items_por_pagina - 1) // items_por_pagina
    
    return {
        "total": total,
        "pagina_actual": pagina,
        "total_paginas": total_paginas,
        "items_por_pagina": items_por_pagina,
        "productos": productos
    }

Mejores prácticas para operaciones CRUD

Al implementar operaciones CRUD en una API REST, es importante seguir estas mejores prácticas:

  • Validación de datos: Utiliza Pydantic para validar todas las entradas.
  • Códigos de estado HTTP apropiados:
  • 201 Created para creaciones exitosas
  • 200 OK para lecturas y actualizaciones
  • 204 No Content para eliminaciones
  • 400 Bad Request para errores de validación
  • 404 Not Found para recursos inexistentes
  • 500 Internal Server Error para errores del servidor
  • Manejo de excepciones: Captura y maneja adecuadamente las excepciones de la base de datos.
  • Transacciones: Utiliza transacciones para operaciones que modifican múltiples registros.
  • Paginación: Implementa paginación para conjuntos de datos grandes.
  • Filtrado y ordenación: Permite a los clientes filtrar y ordenar los resultados.

Optimización del rendimiento

Para mejorar el rendimiento de las operaciones de base de datos:

  • Utiliza consultas específicas en lugar de cargar objetos completos cuando solo necesitas algunos campos:
# En lugar de:
producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()

# Usa:
producto_nombre = db.query(models.Producto.nombre).filter(
    models.Producto.id == producto_id
).scalar()
  • Implementa carga diferida (lazy loading) o carga ansiosa (eager loading) según sea necesario:
# Carga ansiosa de relaciones
productos_con_propietarios = db.query(models.Producto).options(
    joinedload(models.Producto.propietario)
).all()
  • Utiliza índices en las columnas frecuentemente consultadas.
  • Considera implementar caché para consultas frecuentes y que no cambian a menudo.

Con estas implementaciones, hemos creado una API REST completa que permite guardar y recuperar datos de una base de datos MySQL utilizando SQLAlchemy como ORM y FastAPI como framework web. Esta arquitectura proporciona una base sólida para desarrollar aplicaciones web escalables y mantenibles.

CONSTRUYE TU CARRERA EN IA Y PROGRAMACIÓN SOFTWARE

Accede a +1000 lecciones y cursos con certificado. Mejora tu portfolio con certificados de superación para tu CV.

Plan mensual

19.00 € /mes

Precio normal mensual: 19 €
47 % DE DESCUENTO

Plan anual

10.00 € /mes

Ahorras 108 € al año
Precio normal anual: 120 €
Aprende FastAPI online

Ejercicios de esta lección Conexión de FastAPI con SQLAlchemy

Evalúa tus conocimientos de esta lección Conexión de FastAPI con SQLAlchemy con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.

Todas las lecciones de FastAPI

Accede a todas las lecciones de FastAPI y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Accede GRATIS a FastAPI y certifícate

En esta lección

Objetivos de aprendizaje de esta lección

  • Comprender la configuración y conexión de SQLAlchemy con MySQL en un proyecto FastAPI.
  • Definir modelos de base de datos utilizando SQLAlchemy ORM y mapearlos a tablas MySQL.
  • Implementar esquemas Pydantic para validación y serialización de datos en la API.
  • Desarrollar endpoints CRUD para gestionar recursos en la base de datos mediante FastAPI.
  • Aplicar buenas prácticas de manejo de transacciones, paginación, consultas avanzadas y seguridad en la conexión a la base de datos.