FastAPI
Tutorial FastAPI: Conexión de FastAPI con SQLAlchemy
Aprende a conectar FastAPI con SQLAlchemy y MySQL para crear una API REST con operaciones CRUD seguras y optimizadas.
Aprende FastAPI y certifícateConexión con SQLAlchemy con MySQL
SQLAlchemy es una biblioteca ORM (Object-Relational Mapping) para Python que facilita la interacción con bases de datos relacionales. Integrar SQLAlchemy con FastAPI nos permite crear aplicaciones web con persistencia de datos de forma elegante y mantenible, separando la lógica de negocio de la capa de acceso a datos.
Preparando el entorno
Antes de comenzar, necesitamos instalar las dependencias necesarias para trabajar con SQLAlchemy y MySQL:
pip install sqlalchemy pymysql cryptography
El paquete pymysql
es el driver que permite a SQLAlchemy comunicarse con MySQL, mientras que cryptography
proporciona soporte para conexiones seguras.
Estructura del proyecto
Para mantener nuestro código organizado, crearemos una estructura de carpetas adecuada:
mi_aplicacion/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── models.py
│ └── config.py
└── requirements.txt
Configuración de la conexión
Primero, vamos a crear un archivo config.py
para almacenar la configuración de la base de datos:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str = "mysql+pymysql://usuario:contraseña@localhost:3306/nombre_db"
class Config:
env_file = ".env"
settings = Settings()
Este enfoque nos permite definir la configuración con valores predeterminados que pueden ser sobrescritos por variables de entorno o un archivo .env
.
Estableciendo la conexión con SQLAlchemy
Ahora, crearemos el archivo database.py
para configurar la conexión a MySQL:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings
# Crear el motor de SQLAlchemy
engine = create_engine(
settings.DATABASE_URL,
echo=True # Muestra las consultas SQL generadas (útil para desarrollo)
)
# Crear una clase de sesión
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Crear una clase base para los modelos
Base = declarative_base()
# Función para obtener una sesión de base de datos
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
En este código:
create_engine
establece la conexión con MySQL usando la URL de conexión.sessionmaker
crea una fábrica de sesiones que generará objetos de sesión para interactuar con la base de datos.declarative_base
proporciona una clase base para definir nuestros modelos.- La función
get_db
implementa un patrón de dependencia que FastAPI utilizará para inyectar sesiones de base de datos en nuestros endpoints.
Definiendo modelos
En el archivo models.py
, definiremos las clases que representan nuestras tablas en la base de datos:
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.database import Base
class Usuario(Base):
__tablename__ = "usuarios"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(100), unique=True, index=True, nullable=False)
nombre = Column(String(100))
hashed_password = Column(String(100), nullable=False)
is_active = Column(Boolean, default=True)
fecha_registro = Column(DateTime(timezone=True), server_default=func.now())
# Relación con otras tablas
productos = relationship("Producto", back_populates="propietario")
class Producto(Base):
__tablename__ = "productos"
id = Column(Integer, primary_key=True, index=True)
nombre = Column(String(100), index=True)
descripcion = Column(String(500))
precio = Column(Float, nullable=False)
propietario_id = Column(Integer, ForeignKey("usuarios.id"))
# Relación con otras tablas
propietario = relationship("Usuario", back_populates="productos")
Cada clase hereda de Base
y define una tabla con sus columnas y relaciones. SQLAlchemy proporciona tipos de datos que se mapean a los tipos de MySQL:
Integer
→INT
String
→VARCHAR
Float
→FLOAT
DateTime
→DATETIME
Boolean
→TINYINT(1)
Creando las tablas
Para crear las tablas en la base de datos, necesitamos ejecutar el siguiente código:
from app.database import engine
from app.models import models
# Crear todas las tablas definidas en los modelos
models.Base.metadata.create_all(bind=engine)
Este código puede incluirse en el archivo main.py
o ejecutarse como un script separado durante la inicialización de la aplicación.
Configuración de FastAPI con SQLAlchemy
Ahora, integremos todo en nuestra aplicación FastAPI en el archivo main.py
:
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from app.database import engine, get_db
from app.models import models
# Crear las tablas en la base de datos
models.Base.metadata.create_all(bind=engine)
# Crear la aplicación FastAPI
app = FastAPI(title="Mi API con MySQL")
@app.get("/")
def read_root():
return {"message": "API conectada a MySQL con SQLAlchemy"}
@app.get("/db-test")
def test_db(db: Session = Depends(get_db)):
# Simplemente probamos que la conexión funciona
return {"message": "Conexión a la base de datos establecida correctamente"}
En este código:
- Importamos las dependencias necesarias.
- Creamos las tablas en la base de datos.
- Inicializamos la aplicación FastAPI.
- Definimos un endpoint de prueba que utiliza la inyección de dependencias para obtener una sesión de base de datos.
Opciones avanzadas de conexión
Para entornos de producción, es recomendable configurar opciones adicionales en la conexión:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
settings.DATABASE_URL,
echo=False, # Desactivar en producción
pool_size=5, # Número de conexiones permanentes
max_overflow=10, # Conexiones adicionales permitidas
pool_timeout=30, # Tiempo de espera para obtener una conexión
pool_recycle=1800, # Reciclar conexiones cada 30 minutos
pool_pre_ping=True, # Verificar conexiones antes de usarlas
poolclass=QueuePool # Tipo de pool a utilizar
)
Estas opciones nos permiten:
- Controlar el tamaño del pool de conexiones.
- Establecer tiempos de espera y reciclaje para evitar conexiones obsoletas.
- Activar la verificación de conexiones con
pool_pre_ping
.
Manejo de migraciones con Alembic
Para proyectos más grandes, es recomendable utilizar Alembic para gestionar las migraciones de la base de datos:
pip install alembic
alembic init migrations
Configuramos Alembic editando el archivo alembic.ini
:
# alembic.ini
sqlalchemy.url = mysql+pymysql://usuario:contraseña@localhost:3306/nombre_db
Y modificamos migrations/env.py
para importar nuestros modelos:
# migrations/env.py
from app.models import models
from app.database import Base
target_metadata = Base.metadata
Para crear una migración:
alembic revision --autogenerate -m "Crear tablas iniciales"
Y para aplicarla:
alembic upgrade head
Consideraciones de seguridad
Al trabajar con bases de datos en aplicaciones web, es importante considerar:
- No almacenar credenciales directamente en el código.
- Utilizar variables de entorno o archivos
.env
(con.gitignore
). - Implementar control de acceso adecuado a la base de datos.
- Configurar SSL/TLS para conexiones seguras:
engine = create_engine(
"mysql+pymysql://usuario:contraseña@localhost:3306/nombre_db?ssl_ca=/path/to/ca.pem",
connect_args={
"ssl": {
"ca": "/path/to/ca.pem"
}
}
)
Optimización del rendimiento
Para mejorar el rendimiento de la conexión con MySQL:
- Utiliza índices adecuados en las columnas frecuentemente consultadas:
class Usuario(Base):
__tablename__ = "usuarios"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(100), unique=True, index=True)
- Configura el pool de conexiones según la carga esperada.
- Utiliza consultas optimizadas con joins adecuados.
- Implementa caché para consultas frecuentes.
Guardar y recuperar datos de base de datos en API REST
Una vez configurada la conexión con SQLAlchemy y MySQL, el siguiente paso es implementar las operaciones CRUD (Crear, Leer, Actualizar y Eliminar) en nuestra API REST. Estas operaciones nos permitirán interactuar con la base de datos a través de endpoints HTTP.
Esquemas de datos con Pydantic
Antes de implementar los endpoints, necesitamos definir los esquemas de datos que utilizaremos para validar las entradas y formatear las salidas. Crearemos un archivo schemas.py
:
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime
# Esquemas para Producto
class ProductoBase(BaseModel):
nombre: str = Field(..., min_length=1, max_length=100)
descripcion: Optional[str] = Field(None, max_length=500)
precio: float = Field(..., gt=0)
class ProductoCreate(ProductoBase):
pass
class ProductoUpdate(BaseModel):
nombre: Optional[str] = Field(None, min_length=1, max_length=100)
descripcion: Optional[str] = Field(None, max_length=500)
precio: Optional[float] = Field(None, gt=0)
class ProductoResponse(ProductoBase):
id: int
propietario_id: int
class Config:
from_attributes = True
# Esquemas para Usuario
class UsuarioBase(BaseModel):
email: EmailStr
nombre: str = Field(..., min_length=1, max_length=100)
class UsuarioCreate(UsuarioBase):
password: str = Field(..., min_length=8)
class UsuarioResponse(UsuarioBase):
id: int
is_active: bool
fecha_registro: datetime
productos: List[ProductoResponse] = []
class Config:
from_attributes = True
Estos esquemas nos ayudan a:
- Validar datos de entrada con reglas específicas
- Serializar objetos de SQLAlchemy a JSON
- Documentar automáticamente la API con Swagger/OpenAPI
Implementando operaciones CRUD
Ahora implementaremos los endpoints para realizar operaciones CRUD. Organizaremos nuestro código en routers para mantenerlo limpio y modular. Crearemos un directorio routers
con archivos para cada entidad.
Router de Productos
Creamos el archivo routers/productos.py
:
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from app.database import get_db
from app.models import models
from app.schemas import ProductoCreate, ProductoResponse, ProductoUpdate
router = APIRouter(
prefix="/productos",
tags=["productos"]
)
@router.post("/", response_model=ProductoResponse, status_code=status.HTTP_201_CREATED)
def crear_producto(producto: ProductoCreate, db: Session = Depends(get_db)):
# Simulamos un usuario propietario (en una app real usaríamos autenticación)
propietario_id = 1
# Creamos una instancia del modelo
db_producto = models.Producto(
nombre=producto.nombre,
descripcion=producto.descripcion,
precio=producto.precio,
propietario_id=propietario_id
)
# Guardamos en la base de datos
db.add(db_producto)
db.commit()
db.refresh(db_producto)
return db_producto
@router.get("/", response_model=List[ProductoResponse])
def listar_productos(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
# Consultamos todos los productos con paginación
productos = db.query(models.Producto).offset(skip).limit(limit).all()
return productos
@router.get("/{producto_id}", response_model=ProductoResponse)
def obtener_producto(producto_id: int, db: Session = Depends(get_db)):
# Buscamos el producto por ID
producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
# Si no existe, lanzamos una excepción
if producto is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Producto con ID {producto_id} no encontrado"
)
return producto
@router.put("/{producto_id}", response_model=ProductoResponse)
def actualizar_producto(
producto_id: int,
producto_update: ProductoUpdate,
db: Session = Depends(get_db)
):
# Buscamos el producto existente
db_producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
if db_producto is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Producto con ID {producto_id} no encontrado"
)
# Actualizamos solo los campos proporcionados
update_data = producto_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_producto, key, value)
# Guardamos los cambios
db.commit()
db.refresh(db_producto)
return db_producto
@router.delete("/{producto_id}", status_code=status.HTTP_204_NO_CONTENT)
def eliminar_producto(producto_id: int, db: Session = Depends(get_db)):
# Buscamos el producto
db_producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
if db_producto is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Producto con ID {producto_id} no encontrado"
)
# Eliminamos el producto
db.delete(db_producto)
db.commit()
return None
Router de Usuarios
Ahora creamos el archivo routers/usuarios.py
:
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from app.database import get_db
from app.models import models
from app.schemas import UsuarioCreate, UsuarioResponse
router = APIRouter(
prefix="/usuarios",
tags=["usuarios"]
)
@router.post("/", response_model=UsuarioResponse, status_code=status.HTTP_201_CREATED)
def crear_usuario(usuario: UsuarioCreate, db: Session = Depends(get_db)):
# Verificamos si el email ya existe
db_usuario = db.query(models.Usuario).filter(models.Usuario.email == usuario.email).first()
if db_usuario:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="El email ya está registrado"
)
# En una aplicación real, hashearíamos la contraseña
hashed_password = usuario.password + "_hashed" # Simulación simple
# Creamos el usuario
db_usuario = models.Usuario(
email=usuario.email,
nombre=usuario.nombre,
hashed_password=hashed_password
)
# Guardamos en la base de datos
db.add(db_usuario)
db.commit()
db.refresh(db_usuario)
return db_usuario
@router.get("/", response_model=List[UsuarioResponse])
def listar_usuarios(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
usuarios = db.query(models.Usuario).offset(skip).limit(limit).all()
return usuarios
@router.get("/{usuario_id}", response_model=UsuarioResponse)
def obtener_usuario(usuario_id: int, db: Session = Depends(get_db)):
usuario = db.query(models.Usuario).filter(models.Usuario.id == usuario_id).first()
if usuario is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con ID {usuario_id} no encontrado"
)
return usuario
Integrando los routers en la aplicación principal
Ahora, actualizamos nuestro archivo main.py
para incluir los routers:
from fastapi import FastAPI
from app.database import engine
from app.models import models
from app.routers import productos, usuarios
# Crear las tablas en la base de datos
models.Base.metadata.create_all(bind=engine)
# Crear la aplicación FastAPI
app = FastAPI(
title="API de Tienda",
description="API REST con FastAPI y MySQL",
version="0.1.0"
)
# Incluir los routers
app.include_router(productos.router)
app.include_router(usuarios.router)
@app.get("/")
def read_root():
return {"message": "Bienvenido a la API de Tienda"}
Consultas avanzadas con SQLAlchemy
SQLAlchemy nos permite realizar consultas complejas de forma elegante. Veamos algunos ejemplos:
Filtrado y ordenación
Podemos implementar un endpoint para buscar productos con filtros:
@router.get("/buscar/", response_model=List[ProductoResponse])
def buscar_productos(
nombre: Optional[str] = None,
precio_min: Optional[float] = None,
precio_max: Optional[float] = None,
ordenar_por: Optional[str] = "id",
db: Session = Depends(get_db)
):
# Iniciamos la consulta
query = db.query(models.Producto)
# Aplicamos filtros si se proporcionan
if nombre:
query = query.filter(models.Producto.nombre.ilike(f"%{nombre}%"))
if precio_min is not None:
query = query.filter(models.Producto.precio >= precio_min)
if precio_max is not None:
query = query.filter(models.Producto.precio <= precio_max)
# Aplicamos ordenación
if ordenar_por == "precio_asc":
query = query.order_by(models.Producto.precio.asc())
elif ordenar_por == "precio_desc":
query = query.order_by(models.Producto.precio.desc())
elif ordenar_por == "nombre":
query = query.order_by(models.Producto.nombre)
else:
query = query.order_by(models.Producto.id)
# Ejecutamos la consulta
productos = query.all()
return productos
Consultas con relaciones
Podemos obtener los productos de un usuario específico:
@router.get("/usuario/{usuario_id}/productos", response_model=List[ProductoResponse])
def obtener_productos_usuario(usuario_id: int, db: Session = Depends(get_db)):
# Verificamos que el usuario existe
usuario = db.query(models.Usuario).filter(models.Usuario.id == usuario_id).first()
if usuario is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con ID {usuario_id} no encontrado"
)
# Obtenemos sus productos
productos = db.query(models.Producto).filter(
models.Producto.propietario_id == usuario_id
).all()
return productos
Transacciones y manejo de errores
Las transacciones son fundamentales para mantener la integridad de los datos. SQLAlchemy maneja automáticamente las transacciones con los métodos commit()
y rollback()
.
Implementemos un endpoint que realiza múltiples operaciones en una transacción:
@router.post("/transferir-producto/{producto_id}/{nuevo_propietario_id}", response_model=ProductoResponse)
def transferir_producto(
producto_id: int,
nuevo_propietario_id: int,
db: Session = Depends(get_db)
):
try:
# Verificamos que el producto existe
producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
if not producto:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Producto con ID {producto_id} no encontrado"
)
# Verificamos que el nuevo propietario existe
nuevo_propietario = db.query(models.Usuario).filter(
models.Usuario.id == nuevo_propietario_id
).first()
if not nuevo_propietario:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Usuario con ID {nuevo_propietario_id} no encontrado"
)
# Guardamos el propietario anterior para el registro
propietario_anterior_id = producto.propietario_id
# Actualizamos el propietario
producto.propietario_id = nuevo_propietario_id
# Creamos un registro de la transferencia (ejemplo)
# En una aplicación real, tendríamos un modelo para esto
print(f"Transferencia: Producto {producto_id} del usuario {propietario_anterior_id} al {nuevo_propietario_id}")
# Confirmamos la transacción
db.commit()
return producto
except Exception as e:
# Si ocurre cualquier error, revertimos la transacción
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error al transferir el producto: {str(e)}"
)
Paginación y optimización de consultas
Para manejar grandes conjuntos de datos, es importante implementar paginación:
@router.get("/paginados/", response_model=dict)
def listar_productos_paginados(
pagina: int = 1,
items_por_pagina: int = 10,
db: Session = Depends(get_db)
):
# Calculamos el offset
offset = (pagina - 1) * items_por_pagina
# Obtenemos el total de productos
total = db.query(models.Producto).count()
# Obtenemos los productos de la página actual
productos = db.query(models.Producto).offset(offset).limit(items_por_pagina).all()
# Calculamos el total de páginas
total_paginas = (total + items_por_pagina - 1) // items_por_pagina
return {
"total": total,
"pagina_actual": pagina,
"total_paginas": total_paginas,
"items_por_pagina": items_por_pagina,
"productos": productos
}
Mejores prácticas para operaciones CRUD
Al implementar operaciones CRUD en una API REST, es importante seguir estas mejores prácticas:
- Validación de datos: Utiliza Pydantic para validar todas las entradas.
- Códigos de estado HTTP apropiados:
- 201 Created para creaciones exitosas
- 200 OK para lecturas y actualizaciones
- 204 No Content para eliminaciones
- 400 Bad Request para errores de validación
- 404 Not Found para recursos inexistentes
- 500 Internal Server Error para errores del servidor
- Manejo de excepciones: Captura y maneja adecuadamente las excepciones de la base de datos.
- Transacciones: Utiliza transacciones para operaciones que modifican múltiples registros.
- Paginación: Implementa paginación para conjuntos de datos grandes.
- Filtrado y ordenación: Permite a los clientes filtrar y ordenar los resultados.
Optimización del rendimiento
Para mejorar el rendimiento de las operaciones de base de datos:
- Utiliza consultas específicas en lugar de cargar objetos completos cuando solo necesitas algunos campos:
# En lugar de:
producto = db.query(models.Producto).filter(models.Producto.id == producto_id).first()
# Usa:
producto_nombre = db.query(models.Producto.nombre).filter(
models.Producto.id == producto_id
).scalar()
- Implementa carga diferida (lazy loading) o carga ansiosa (eager loading) según sea necesario:
# Carga ansiosa de relaciones
productos_con_propietarios = db.query(models.Producto).options(
joinedload(models.Producto.propietario)
).all()
- Utiliza índices en las columnas frecuentemente consultadas.
- Considera implementar caché para consultas frecuentes y que no cambian a menudo.
Con estas implementaciones, hemos creado una API REST completa que permite guardar y recuperar datos de una base de datos MySQL utilizando SQLAlchemy como ORM y FastAPI como framework web. Esta arquitectura proporciona una base sólida para desarrollar aplicaciones web escalables y mantenibles.
Ejercicios de esta lección Conexión de FastAPI con SQLAlchemy
Evalúa tus conocimientos de esta lección Conexión de FastAPI con SQLAlchemy con nuestros retos de programación de tipo Test, Puzzle, Código y Proyecto con VSCode, guiados por IA.
Todas las lecciones de FastAPI
Accede a todas las lecciones de FastAPI y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Fastapi Y Configuración
Introducción Y Entorno
Respuestas Y Códigos De Estado
Api Rest
Validación De Datos Con Pydantic 2
Api Rest
Rutas Y Parámetros
Api Rest
Conexión De Fastapi Con Sqlalchemy
Persistencia
En esta lección
Objetivos de aprendizaje de esta lección
- Comprender la configuración y conexión de SQLAlchemy con MySQL en un proyecto FastAPI.
- Definir modelos de base de datos utilizando SQLAlchemy ORM y mapearlos a tablas MySQL.
- Implementar esquemas Pydantic para validación y serialización de datos en la API.
- Desarrollar endpoints CRUD para gestionar recursos en la base de datos mediante FastAPI.
- Aplicar buenas prácticas de manejo de transacciones, paginación, consultas avanzadas y seguridad en la conexión a la base de datos.