Consultas y filtros
flowchart TD
QUERY["coleccion.find(filtro)"] --> OPS["Operadores"]
OPS --> CMP["$gt, $lt, $eq, $ne"]
OPS --> LOG["$and, $or, $not, $in"]
OPS --> ARR["$elemMatch, $all, $size"]
OPS --> TXT["$regex, $text"]
QUERY --> PROJ["Proyección: campos a devolver"]
QUERY --> SORT["sort + skip + limit (paginación)"]
AGG["Pipeline de agregación"] --> STAGES["$match, $group, $project, $unwind, $lookup"]
STAGES --> IDX["Índices: rendimiento"]
Las consultas y filtros son el núcleo de la interacción con MongoDB, permitiéndonos recuperar exactamente los datos que necesitamos. PyMongo proporciona una API intuitiva que refleja la sintaxis de consulta nativa de MongoDB, facilitando la construcción de consultas desde simples hasta altamente complejas.
Consultas básicas
Para realizar consultas en MongoDB, utilizamos principalmente los métodos find() y find_one(). La diferencia fundamental es que find() devuelve un cursor iterable con todos los documentos coincidentes, mientras que find_one() devuelve un único documento.
from pymongo import MongoClient
# Conexión a la base de datos
client = MongoClient('mongodb://localhost:27017/')
db = client['biblioteca']
coleccion = db['libros']
# Consulta básica - todos los documentos
todos_libros = coleccion.find()
# Consulta con filtro simple
libros_python = coleccion.find({"tema": "Python"})
# Obtener un único documento
primer_libro = coleccion.find_one({"titulo": "Aprende Python en 24 horas"})
Operadores de comparación
MongoDB ofrece varios operadores de comparación que podemos utilizar en nuestras consultas:
# Libros con precio mayor que 20
libros_caros = coleccion.find({"precio": {"$gt": 20}})
# Libros publicados después de 2020
libros_recientes = coleccion.find({"año_publicacion": {"$gte": 2020}})
# Libros que no son de programación
libros_no_programacion = coleccion.find({"tema": {"$ne": "Programación"}})
# Libros con precio entre 15 y 30
rango_precio = coleccion.find({
"precio": {
"$gte": 15,
"$lte": 30
}
})
Los operadores de comparación más comunes son:
- $eq: Igual a (=)
- $ne: No igual a
- $gt: Mayor que (>)
- $gte: Mayor o igual que
- $lt: Menor que (<)
- $lte: Menor o igual que
Operadores lógicos
Para construir consultas más complejas, podemos combinar condiciones utilizando operadores lógicos:
# Libros de Python O JavaScript
libros_programacion = coleccion.find({
"$or": [
{"tema": "Python"},
{"tema": "JavaScript"}
]
})
# Libros de Python que cuestan menos de 25
libros_python_baratos = coleccion.find({
"$and": [
{"tema": "Python"},
{"precio": {"$lt": 25}}
]
})
# Forma simplificada del AND (implícito)
libros_python_baratos = coleccion.find({
"tema": "Python",
"precio": {"$lt": 25}
})
# Libros que NO son ni de Python ni de JavaScript
otros_libros = coleccion.find({
"$nor": [
{"tema": "Python"},
{"tema": "JavaScript"}
]
})
Los operadores lógicos principales son:
- $and: Todas las condiciones deben cumplirse
- $or: Al menos una condición debe cumplirse
- $nor: Ninguna de las condiciones debe cumplirse
- $not: Niega una condición
Consultas en arrays
MongoDB permite realizar consultas sofisticadas en campos de tipo array:
# Libros que tienen exactamente las etiquetas "programación" y "principiantes"
libros_exactos = coleccion.find({
"etiquetas": ["programación", "principiantes"]
})
# Libros que contienen la etiqueta "programación" (en cualquier posición del array)
libros_programacion = coleccion.find({
"etiquetas": "programación"
})
# Libros que contienen AMBAS etiquetas (en cualquier orden)
libros_ambas = coleccion.find({
"etiquetas": {
"$all": ["programación", "principiantes"]
}
})
# Libros con exactamente 3 etiquetas
libros_tres_etiquetas = coleccion.find({
"etiquetas": {
"$size": 3
}
})
# Libros donde al menos una etiqueta cumple una condición
libros_etiqueta_larga = coleccion.find({
"etiquetas": {
"$elemMatch": {
"$regex": "^.{10,}$" # Etiquetas con 10 o más caracteres
}
}
})
Consultas en documentos anidados
Para consultar campos dentro de documentos anidados, utilizamos la notación de punto:
# Libros con un editor específico
libros_editor = coleccion.find({
"detalles_publicacion.editor": "TechBooks"
})
# Libros publicados en España
libros_espana = coleccion.find({
"detalles_publicacion.pais": "España"
})
# Libros con más de 5 reseñas positivas
libros_populares = coleccion.find({
"estadisticas.resenas.positivas": {"$gt": 5}
})
Expresiones regulares
PyMongo permite utilizar expresiones regulares para búsquedas de texto más flexibles:
import re
# Búsqueda con expresión regular (case sensitive)
libros_python = coleccion.find({
"titulo": {"$regex": "Python"}
})
# Búsqueda insensible a mayúsculas/minúsculas
libros_python = coleccion.find({
"titulo": {"$regex": "python", "$options": "i"}
})
# Alternativa usando el módulo re de Python
patron = re.compile("python", re.IGNORECASE)
libros_python = coleccion.find({
"titulo": patron
})
# Títulos que comienzan con "Aprende"
libros_aprende = coleccion.find({
"titulo": {"$regex": "^Aprende"}
})
# Títulos que terminan con "Python"
libros_python_final = coleccion.find({
"titulo": {"$regex": "Python$"}
})
Consultas de texto completo
MongoDB ofrece capacidades de búsqueda de texto completo que podemos aprovechar con PyMongo:
# Primero, necesitamos crear un índice de texto (solo una vez)
coleccion.create_index([("titulo", "text"), ("descripcion", "text")])
# Búsqueda de texto
resultados = coleccion.find({
"$text": {"$search": "python programación"}
})
# Búsqueda de texto con puntuación de relevancia
resultados = coleccion.find(
{"$text": {"$search": "python programación"}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])
La búsqueda de texto completo es ideal para implementar funcionalidades de búsqueda en aplicaciones donde los usuarios necesitan encontrar documentos basados en su contenido textual.
Proyecciones
Las proyecciones nos permiten especificar qué campos queremos incluir o excluir en los resultados:
# Solo incluir título y precio (y _id por defecto)
libros_resumidos = coleccion.find(
{"tema": "Python"},
{"titulo": 1, "precio": 1}
)
# Excluir campos específicos
libros_sin_detalles = coleccion.find(
{"tema": "Python"},
{"detalles_publicacion": 0, "estadisticas": 0}
)
# Excluir _id e incluir solo ciertos campos
libros_custom = coleccion.find(
{"tema": "Python"},
{"_id": 0, "titulo": 1, "autor": 1, "precio": 1}
)
Ordenación, límite y salto
Estos métodos nos permiten controlar cómo se presentan los resultados:
# Ordenar por precio (ascendente)
libros_ordenados = coleccion.find().sort("precio", 1)
# Ordenar por precio (descendente)
libros_caros_primero = coleccion.find().sort("precio", -1)
# Ordenar por múltiples campos
libros_ordenados = coleccion.find().sort([
("tema", 1), # Primero por tema (ascendente)
("precio", -1) # Luego por precio (descendente)
])
# Limitar resultados (los 5 primeros)
libros_top = coleccion.find().limit(5)
# Saltar resultados (paginación)
# Página 1 (primeros 10)
pagina_1 = coleccion.find().skip(0).limit(10)
# Página 2 (siguientes 10)
pagina_2 = coleccion.find().skip(10).limit(10)
Optimización de consultas con índices
Para mejorar el rendimiento de las consultas, es fundamental crear índices adecuados:
# Crear un índice simple
coleccion.create_index("titulo")
# Crear un índice compuesto
coleccion.create_index([("autor", 1), ("año_publicacion", -1)])
# Crear un índice único
coleccion.create_index("isbn", unique=True)
# Crear un índice en segundo plano (no bloquea operaciones)
coleccion.create_index("tema", background=True)
# Verificar índices existentes
indices = coleccion.index_information()
print(indices)
Consultas con count y distinct
Para contar documentos o encontrar valores únicos:
# Contar todos los documentos
total_libros = coleccion.count_documents({})
# Contar con filtro
libros_python = coleccion.count_documents({"tema": "Python"})
# Valores únicos de un campo
temas_unicos = coleccion.distinct("tema")
# Valores únicos con filtro
autores_python = coleccion.distinct("autor", {"tema": "Python"})
Ejemplo práctico: Sistema de búsqueda avanzada
Veamos un ejemplo completo que combina varias técnicas de consulta:
def buscar_libros(
db,
texto=None,
tema=None,
precio_min=None,
precio_max=None,
ano_desde=None,
ordenar_por="relevancia",
pagina=1,
por_pagina=10
):
"""
Función de búsqueda avanzada para libros.
"""
# Construir el filtro base
filtro = {}
# Añadir condiciones según los parámetros
if texto:
filtro["$text"] = {"$search": texto}
if tema:
filtro["tema"] = tema
# Condiciones de precio
if precio_min is not None or precio_max is not None:
filtro["precio"] = {}
if precio_min is not None:
filtro["precio"]["$gte"] = precio_min
if precio_max is not None:
filtro["precio"]["$lte"] = precio_max
# Condición de año
if ano_desde:
filtro["año_publicacion"] = {"$gte": ano_desde}
# Configurar proyección y ordenación
proyeccion = {
"titulo": 1,
"autor": 1,
"tema": 1,
"precio": 1,
"año_publicacion": 1
}
# Añadir puntuación si hay búsqueda de texto
if texto:
proyeccion["score"] = {"$meta": "textScore"}
# Configurar ordenación
if ordenar_por == "relevancia" and texto:
orden = [("score", {"$meta": "textScore"})]
elif ordenar_por == "precio_asc":
orden = [("precio", 1)]
elif ordenar_por == "precio_desc":
orden = [("precio", -1)]
elif ordenar_por == "año_desc":
orden = [("año_publicacion", -1)]
else:
orden = [("titulo", 1)]
# Calcular salto para paginación
salto = (pagina - 1) * por_pagina
# Ejecutar consulta
cursor = db.libros.find(
filtro,
proyeccion
).sort(orden).skip(salto).limit(por_pagina)
# Contar total de resultados (para paginación)
total = db.libros.count_documents(filtro)
# Convertir cursor a lista
resultados = list(cursor)
return {
"resultados": resultados,
"total": total,
"pagina": pagina,
"paginas_totales": (total + por_pagina - 1) // por_pagina
}
# Ejemplo de uso
resultados = buscar_libros(
db,
texto="python avanzado",
precio_min=20,
precio_max=50,
ano_desde=2018,
ordenar_por="relevancia",
pagina=1
)
print(f"Encontrados {resultados['total']} libros")
for libro in resultados['resultados']:
print(f"{libro['titulo']} - {libro['autor']} - ${libro['precio']}")
Este ejemplo muestra cómo construir un sistema de búsqueda flexible que combina múltiples criterios, ordenación y paginación, demostrando el poder de las consultas y filtros en MongoDB con PyMongo.
Agregaciones básicas
Las agregaciones en MongoDB representan una de las características más útiles para el análisis y transformación de datos. A diferencia de las consultas simples, el framework de agregación permite procesar múltiples documentos y realizar operaciones complejas en ellos, similar a cómo funcionaría un pipeline de procesamiento de datos.
PyMongo proporciona acceso completo a esta funcionalidad a través del método aggregate(), que nos permite construir pipelines de agregación para realizar análisis avanzados sobre nuestros datos.
Concepto de pipeline de agregación
Un pipeline de agregación es una secuencia de etapas de procesamiento donde la salida de cada etapa se convierte en la entrada de la siguiente. Cada etapa transforma los documentos a medida que pasan por el pipeline.
from pymongo import MongoClient
# Conexión a MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['tienda']
coleccion = db['ventas']
# Pipeline de agregación básico
pipeline = [
{"$match": {"estado": "completado"}},
{"$group": {"_id": "$categoria", "total": {"$sum": "$importe"}}}
]
# Ejecutar la agregación
resultados = coleccion.aggregate(pipeline)
# Iterar sobre los resultados
for resultado in resultados:
print(f"Categoría: {resultado['_id']}, Total: {resultado['total']}")
En este ejemplo, el pipeline consta de dos etapas:
$match: Filtra los documentos para incluir solo ventas completadas$group: Agrupa los documentos por categoría y calcula la suma de los importes
Etapas comunes de agregación
Etapa $match
La etapa $match filtra los documentos, similar a cómo funciona el método find():
# Filtrar productos con stock bajo
pipeline = [
{"$match": {"stock": {"$lt": 10}}}
]
productos_stock_bajo = db.productos.aggregate(pipeline)
Esta etapa es más eficiente cuando se coloca al inicio del pipeline, ya que reduce la cantidad de documentos que pasan a las siguientes etapas.
Etapa $group
La etapa $group agrupa documentos por una clave especificada y permite aplicar operadores de acumulación:
# Calcular ventas totales por país
pipeline = [
{"$group": {
"_id": "$pais",
"ventas_totales": {"$sum": "$importe"},
"num_ventas": {"$sum": 1},
"importe_promedio": {"$avg": "$importe"},
"importe_maximo": {"$max": "$importe"},
"importe_minimo": {"$min": "$importe"}
}}
]
estadisticas_por_pais = db.ventas.aggregate(pipeline)
Los operadores de acumulación más comunes son:
- $sum: Suma valores o cuenta documentos (usando 1)
- $avg: Calcula el promedio
- $min y $max: Encuentran valores mínimos y máximos
- $first y $last: Obtienen el primer o último valor del grupo
Etapa $project
La etapa $project permite remodelar documentos, incluyendo, excluyendo o transformando campos:
# Transformar documentos para mostrar solo información relevante
pipeline = [
{"$match": {"categoria": "electrónica"}},
{"$project": {
"_id": 0,
"nombre": 1,
"precio_con_iva": {"$multiply": ["$precio", 1.21]},
"descuento_aplicable": {"$cond": [{"$gt": ["$precio", 100]}, 10, 5]},
"disponibilidad": {"$switch": {
"branches": [
{"case": {"$eq": ["$stock", 0]}, "then": "Agotado"},
{"case": {"$lt": ["$stock", 5]}, "then": "Últimas unidades"}
],
"default": "Disponible"
}}
}}
]
productos_transformados = db.productos.aggregate(pipeline)
En este ejemplo:
- Excluimos el campo
_idy mantenemosnombre - Calculamos un nuevo campo
precio_con_ivamultiplicando el precio por 1.21 - Creamos un campo
descuento_aplicableusando una condición - Generamos un campo
disponibilidadusando una estructura switch-case
Etapa $sort, $limit y $skip
Estas etapas ordenan y paginan los documentos del pipeline:
# Ordenar productos por precio descendente
pipeline = [
{"$sort": {"precio": -1}} # 1 para ascendente, -1 para descendente
]
# Implementar paginación
pagina = 2
elementos_por_pagina = 10
pipeline = [
{"$sort": {"fecha_creacion": -1}},
{"$skip": (pagina - 1) * elementos_por_pagina},
{"$limit": elementos_por_pagina}
]
productos_paginados = db.productos.aggregate(pipeline)
Operaciones de agregación avanzadas
Etapa $unwind
La etapa $unwind descompone arrays en documentos individuales, lo que facilita el análisis de datos en arrays:
# Descomponer productos por etiquetas
pipeline = [
{"$unwind": "$etiquetas"},
{"$group": {
"_id": "$etiquetas",
"cantidad": {"$sum": 1},
"productos": {"$push": "$nombre"}
}},
{"$sort": {"cantidad": -1}}
]
etiquetas_populares = db.productos.aggregate(pipeline)
Este pipeline:
- Descompone cada documento por su array de etiquetas
- Agrupa por etiqueta y cuenta productos
- Crea un array con los nombres de productos para cada etiqueta
- Ordena por cantidad descendente
Etapa $lookup
La etapa $lookup realiza una operación similar a un JOIN en SQL, permitiendo combinar documentos de diferentes colecciones:
# Combinar pedidos con información de clientes
pipeline = [
{"$match": {"estado": "pendiente"}},
{"$lookup": {
"from": "clientes",
"localField": "cliente_id",
"foreignField": "_id",
"as": "info_cliente"
}},
{"$project": {
"numero_pedido": 1,
"importe": 1,
"fecha": 1,
"cliente": {"$arrayElemAt": ["$info_cliente", 0]}
}}
]
pedidos_con_clientes = db.pedidos.aggregate(pipeline)
En este ejemplo:
- Filtramos pedidos pendientes
- Realizamos un lookup para obtener información del cliente
- Proyectamos los campos necesarios, extrayendo el primer elemento del array
info_cliente
Etapa $facet
La etapa $facet permite realizar múltiples agregaciones en paralelo dentro del mismo pipeline:
# Obtener múltiples métricas en una sola consulta
pipeline = [
{"$match": {"fecha": {"$gte": datetime(2023, 1, 1)}}},
{"$facet": {
"ventas_por_categoria": [
{"$group": {"_id": "$categoria", "total": {"$sum": "$importe"}}}
],
"ventas_por_mes": [
{"$group": {
"_id": {"$dateToString": {"format": "%Y-%m", "date": "$fecha"}},
"total": {"$sum": "$importe"}
}},
{"$sort": {"_id": 1}}
],
"top_productos": [
{"$group": {"_id": "$producto_id", "unidades": {"$sum": "$cantidad"}}},
{"$sort": {"unidades": -1}},
{"$limit": 5}
]
}}
]
dashboard = db.ventas.aggregate(pipeline)
Esta agregación genera un dashboard completo con tres secciones diferentes en una sola consulta.
Ejemplo práctico: Análisis de ventas
Veamos un ejemplo completo que combina varias etapas para realizar un análisis de ventas:
def analisis_ventas(db, ano, mes=None):
"""
Realiza un análisis completo de ventas para un período específico.
"""
# Construir filtro de fecha
match_fecha = {"$match": {"fecha": {"$gte": datetime(ano, 1, 1)}}}
if mes:
primer_dia = datetime(ano, mes, 1)
if mes == 12:
ultimo_dia = datetime(ano + 1, 1, 1)
else:
ultimo_dia = datetime(ano, mes + 1, 1)
match_fecha = {"$match": {"fecha": {"$gte": primer_dia, "$lt": ultimo_dia}}}
# Pipeline de agregación
pipeline = [
match_fecha,
{"$facet": {
"resumen": [
{"$group": {
"_id": None,
"total_ventas": {"$sum": "$importe"},
"num_transacciones": {"$sum": 1},
"ticket_promedio": {"$avg": "$importe"},
"ticket_maximo": {"$max": "$importe"}
}}
],
"ventas_por_categoria": [
{"$group": {
"_id": "$categoria",
"total": {"$sum": "$importe"},
"porcentaje": {"$sum": "$importe"}
}},
{"$sort": {"total": -1}}
],
"ventas_diarias": [
{"$group": {
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$fecha"}},
"total": {"$sum": "$importe"},
"transacciones": {"$sum": 1}
}},
{"$sort": {"_id": 1}}
],
"top_productos": [
{"$group": {
"_id": "$producto_id",
"nombre": {"$first": "$producto_nombre"},
"unidades": {"$sum": "$cantidad"},
"ingresos": {"$sum": "$importe"}
}},
{"$sort": {"ingresos": -1}},
{"$limit": 10}
]
}}
]
# Ejecutar agregación
resultado = list(db.ventas.aggregate(pipeline))[0]
# Calcular porcentajes para ventas por categoría
if resultado["ventas_por_categoria"] and resultado["resumen"]:
total_general = resultado["resumen"][0]["total_ventas"]
for categoria in resultado["ventas_por_categoria"]:
categoria["porcentaje"] = round((categoria["total"] / total_general) * 100, 2)
return resultado
# Ejemplo de uso
analisis = analisis_ventas(db, 2023, 3) # Análisis de marzo 2023
print(f"Total ventas: ${analisis['resumen'][0]['total_ventas']:,.2f}")
print(f"Ticket promedio: ${analisis['resumen'][0]['ticket_promedio']:,.2f}")
print("\nVentas por categoría:")
for cat in analisis['ventas_por_categoria']:
print(f"{cat['_id']}: ${cat['total']:,.2f} ({cat['porcentaje']}%)")
print("\nTop 3 productos:")
for i, prod in enumerate(analisis['top_productos'][:3], 1):
print(f"{i}. {prod['nombre']}: {prod['unidades']} unidades (${prod['ingresos']:,.2f})")
Exportar resultados de agregación
A menudo necesitamos exportar los resultados de una agregación para su posterior análisis:
import csv
import json
from bson import json_util
def exportar_resultados(resultados, formato, archivo):
"""
Exporta resultados de agregación a CSV o JSON.
"""
if formato.lower() == 'csv':
# Asumimos que todos los documentos tienen la misma estructura
with open(archivo, 'w', newline='', encoding='utf-8') as f:
if not resultados:
return False
# Obtener encabezados del primer documento
encabezados = resultados[0].keys()
writer = csv.DictWriter(f, fieldnames=encabezados)
writer.writeheader()
writer.writerows(resultados)
elif formato.lower() == 'json':
# Convertir a JSON manejando tipos especiales de MongoDB
with open(archivo, 'w', encoding='utf-8') as f:
json.dump(resultados, f, default=json_util.default, indent=2)
else:
raise ValueError("Formato no soportado. Use 'csv' o 'json'")
return True
# Ejemplo: Exportar ventas mensuales
pipeline = [
{"$group": {
"_id": {
"año": {"$year": "$fecha"},
"mes": {"$month": "$fecha"}
},
"total": {"$sum": "$importe"},
"transacciones": {"$sum": 1}
}},
{"$sort": {"_id.año": 1, "_id.mes": 1}},
{"$project": {
"_id": 0,
"año": "$_id.año",
"mes": "$_id.mes",
"total": 1,
"transacciones": 1
}}
]
resultados = list(db.ventas.aggregate(pipeline))
exportar_resultados(resultados, 'csv', 'ventas_mensuales.csv')
Optimización de agregaciones
Las agregaciones pueden ser operaciones intensivas. Algunas técnicas para optimizarlas:
# Usar allowDiskUse para operaciones que exceden el límite de memoria
pipeline = [] # pipeline complejo
resultados = db.coleccion.aggregate(pipeline, allowDiskUse=True)
# Añadir índices para campos usados en $match y $sort
db.ventas.create_index([("fecha", 1)])
db.ventas.create_index([("categoria", 1), ("importe", -1)])
# Explicar el plan de ejecución
pipeline_analizar = [] # pipeline a analizar
explicacion = db.ventas.aggregate(pipeline_analizar, explain=True)
print(json.dumps(explicacion, indent=2, default=json_util.default))
Las agregaciones en MongoDB con PyMongo proporcionan un sistema flexible y útil para analizar datos, generar informes y transformar información. Dominando estas técnicas, podrás extraer conocimientos valiosos de tus datos y construir funcionalidades avanzadas en tus aplicaciones.
Fuentes y referencias
Documentación oficial y recursos externos para profundizar en Python
Documentación oficial de Python
Alan Sastre
Ingeniero de Software y formador, CEO en CertiDevs
Ingeniero de software especializado en Full Stack y en Inteligencia Artificial. Como CEO de CertiDevs, Python es una de sus áreas de expertise. Con más de 15 años programando, 6K seguidores en LinkedIn y experiencia como formador, Alan se dedica a crear contenido educativo de calidad para desarrolladores de todos los niveles.
Más tutoriales de Python
Explora más contenido relacionado con Python y continúa aprendiendo con nuestros tutoriales gratuitos.
Aprendizajes de esta lección
Construir consultas y filtros complejos con operadores de comparación, lógicos, de array y de texto. Usar proyecciones, ordenación y paginación eficiente con sort, skip y limit. Crear índices para optimizar el rendimiento de las consultas. Construir pipelines de agregación con etapas $match, $group, $project, $unwind y $lookup. Implementar análisis de ventas completos y exportar resultados a CSV o JSON.