PySpark
Tutorial PySpark: Manipulación y análisis de datos con PySpark
Domina la manipulación y análisis de datos con PySpark en Python, aplica técnicas avanzadas y optimiza el rendimiento en tus proyectos de big data de manera eficiente con Apache Spark.
Aprende PySpark GRATIS y certifícateLectura y escritura de datos en diversos formatos (CSV, JSON, Parquet)
La manipulación de datos en PySpark comienza con la habilidad de leer y escribir datos en diversos formatos. Entre los formatos más comunes se encuentran CSV, JSON y Parquet, cada uno con sus particularidades y beneficios en el procesamiento de grandes volúmenes de datos.
Para leer archivos CSV, PySpark proporciona el método spark.read.csv()
. Este método permite cargar datos desde archivos CSV y convertirlos en un DataFrame para su posterior análisis. Es importante especificar parámetros como header
y inferSchema
para garantizar una lectura correcta de los datos:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LecturaCSV").getOrCreate()
df_csv = spark.read.csv("ruta/al/archivo.csv", header=True, inferSchema=True)
En este ejemplo, header=True
indica que la primera fila del CSV contiene los nombres de las columnas, mientras que inferSchema=True
permite a PySpark deducir automáticamente los tipos de datos de cada columna. Estos parámetros son esenciales para trabajar con datos estructurados de manera eficiente.
Para escribir un DataFrame en formato CSV, se utiliza el método df.write.csv()
. Este método permite guardar los datos procesados en un archivo CSV, facilitando su uso en otras aplicaciones o sistemas:
df_csv.write.csv("ruta/de/salida.csv", header=True, mode="overwrite")
Aquí, mode="overwrite"
asegura que si el archivo de salida ya existe, será reemplazado por el nuevo. El parámetro header=True
incluye los nombres de las columnas en el archivo de salida, manteniendo la consistencia en la estructura de los datos.
El formato JSON es ampliamente utilizado para el intercambio de datos debido a su naturaleza flexible y legible por humanos. Para leer archivos JSON, PySpark ofrece el método spark.read.json()
:
df_json = spark.read.json("ruta/al/archivo.json")
Si los archivos JSON contienen estructuras anidadas, PySpark es capaz de manejar estos datos y representarlos en columnas estructuradas dentro del DataFrame. Esto es especialmente útil al trabajar con datos provenientes de APIs o servicios web.
Al escribir datos en formato JSON, el procedimiento es similar:
df_json.write.json("ruta/de/salida.json", mode="overwrite")
Este método guarda el DataFrame en uno o varios archivos JSON, dependiendo de la partición de los datos. Es importante considerar la organización de los datos y el número de particiones para optimizar el rendimiento y el almacenamiento.
El formato Parquet es un formato de almacenamiento columna altamente eficiente, diseñado para manejar cargas de trabajo de big data. Parquet utiliza compresión y codificación eficientes que mejoran el rendimiento de lectura y escritura. Para leer archivos Parquet, se utiliza el método spark.read.parquet()
:
df_parquet = spark.read.parquet("ruta/al/archivo.parquet")
Uno de los beneficios clave del formato Parquet es su capacidad para almacenar metadatos, lo que permite a PySpark leer los tipos de datos y la estructura del archivo sin necesidad de inferirlos o especificarlos manualmente.
Para escribir un DataFrame en formato Parquet, se emplea el método df.write.parquet()
:
df_parquet.write.parquet("ruta/de/salida.parquet", mode="overwrite")
El uso de Parquet es altamente recomendable al trabajar con grandes conjuntos de datos y cuando se busca optimizar las operaciones de entrada y salida. La compresión y la alineación de datos por columnas permiten un acceso más rápido y eficiente a los datos necesarios durante el procesamiento.
Además de los métodos básicos, PySpark ofrece opciones avanzadas para personalizar la lectura y escritura de datos. Al leer archivos, se pueden especificar esquemas personalizados utilizando StructType
y StructField
para definir la estructura exacta de los datos:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
esquema = StructType([
StructField("nombre", StringType(), True),
StructField("edad", IntegerType(), True),
StructField("ciudad", StringType(), True)
])
df_personalizado = spark.read.csv("ruta/al/archivo.csv", header=True, schema=esquema)
Esta práctica es útil cuando se conoce de antemano la estructura de los datos y se desea evitar la inferencia automática, lo que puede mejorar el rendimiento y la consistencia del esquema.
Al escribir datos, es posible seleccionar el modo de compresión para reducir el espacio en disco. Por ejemplo, al escribir en formato Parquet con compresión Snappy:
df_parquet.write.parquet("ruta/de/salida.parquet", mode="overwrite", compression="snappy")
La elección del algoritmo de compresión puede tener un impacto significativo en el rendimiento y el tamaño de los archivos resultantes.
Al manipular diferentes formatos de datos, es crucial manejar las particiones adecuadamente. PySpark permite controlar el número de particiones al leer o escribir datos, lo que afecta el paralelismo y el rendimiento del procesamiento:
# Al leer datos y re-particionar
df_csv = spark.read.csv("ruta/al/archivo.csv", header=True, inferSchema=True).repartition(10)
# Al escribir datos con particiones
df_csv.write.csv("ruta/de/salida.csv", header=True, mode="overwrite").partitionBy("city")
El uso de partitionBy
al escribir datos permite dividir el conjunto de datos en directorios basados en los valores de una o más columnas, facilitando las consultas posteriores y mejorando la eficiencia en operaciones de filtrado.
Finalmente, es importante manejar correctamente los datos faltantes o nulos al leer diferentes formatos. PySpark ofrece múltiples opciones para tratar valores nulos, como na.fill()
o na.drop()
, que permiten limpiar el DataFrame antes de realizar análisis adicionales.
# Rellenar valores nulos
df_sin_nulos = df_csv.na.fill({"edad": 0, "ciudad": "Desconocida"})
# Eliminar filas con valores nulos
df_limpio = df_csv.na.drop()
El tratamiento adecuado de valores nulos es esencial para mantener la integridad de los análisis y evitar errores durante las transformaciones y agregaciones.
Exploración y perfilamiento de datos
La exploración de datos es un paso esencial en cualquier proyecto de análisis, ya que permite comprender la estructura y las características de los datos antes de proceder con transformaciones o modelos de aprendizaje automático. En PySpark, los DataFrames ofrecen métodos eficientes para realizar este perfilamiento de datos en grandes conjuntos distribuidos.
Para comenzar, es fundamental conocer la estructura del DataFrame. El método printSchema()
muestra el esquema de los datos, incluyendo los nombres de las columnas y sus tipos:
df_limpio.printSchema()
Este comando proporciona una visión detallada de las columnas y ayuda a identificar el tipo de datos de cada una, como IntegerType, StringType o DoubleType. Conocer los tipos es crucial para evitar errores en operaciones posteriores y garantizar que las transformaciones sean adecuadas.
Otra práctica común es visualizar las primeras filas del DataFrame para tener una idea del contenido. El método show()
permite mostrar un número específico de filas:
df_limpio.show(5)
Con este método, es posible inspeccionar las primeras cinco filas y verificar si los datos se han cargado correctamente. Además, show()
admite el parámetro truncate=False
para evitar que las cadenas largas se corten:
df_limpio.show(5, truncate=False)
Para obtener un recuento rápido del número total de registros, se utiliza el método count()
:
total_registros = df_limpio.count()
print(f"Total de registros: {total_registros}")
Conocer el número de registros ayuda a dimensionar el volumen de datos y planificar el procesamiento adecuado.
El método describe()
proporciona estadísticas básicas de las columnas numéricas, como el promedio, la desviación estándar, los valores mínimo y máximo:
df_limpio.describe().show()
Si se desea obtener estas estadísticas para columnas específicas, es posible especificarlas:
df_limpio.describe('edad', 'salario').show()
Para un análisis más detallado, el método summary()
ofrece estadísticas adicionales, como los percentiles:
df_limpio.summary().show()
Este método es útil para identificar valores atípicos y comprender la distribución de los datos.
La detección de valores nulos es fundamental en el perfilamiento de datos. Para contar los valores nulos en cada columna, se puede utilizar el siguiente enfoque:
from pyspark.sql.functions import col, sum as suma
df_limpio.select([suma(col(c).isNull().cast("int")).alias(c) for c in df_limpio.columns]).show()
Este código calcula la cantidad de valores nulos por columna, lo que permite determinar si es necesario realizar imputaciones o eliminar registros con datos faltantes.
El método distinct()
permite identificar valores únicos en una columna, lo cual es útil para entender la variabilidad y detectar posibles errores:
df_limpio.select('city').distinct().show()
Para conocer la distribución de frecuencias de una columna categórica, es posible utilizar groupBy()
junto con count()
:
df_limpio.groupBy('city').count().orderBy('count', ascending=False).show()
Este código muestra las ciudades con mayor número de registros, ayudando a identificar concentraciones o desequilibrios en los datos.
Las funciones de agregación permiten calcular métricas como sumas, promedios y máximos. Por ejemplo, para obtener el salario promedio por departamento:
df_limpio.groupBy('departamento').avg('salario').show()
En ocasiones, es útil obtener una muestra de los datos para análisis locales. El método sample()
permite extraer una fracción aleatoria del DataFrame:
df_muestra = df_limpio.sample(fraction=0.1, seed=42)
df_muestra.show()
La exploración de correlaciones entre variables numéricas es posible utilizando el método corr()
:
correlacion = df_limpio.stat.corr('edad', 'salario')
print(f"Correlación entre edad y salario: {correlacion}")
Adicionalmente, el método cov()
permite calcular la covarianza entre dos columnas:
covarianza = df_limpio.stat.cov('edad', 'salario')
print(f"Covarianza entre edad y salario: {covarianza}")
Para columnas categóricas, es útil crear tablas de contingencia con el método crosstab()
:
df_limpio.stat.crosstab('género', 'departamento').show()
Esta tabla muestra la frecuencia conjunta entre género y departamento, lo que ayuda a detectar patrones o relación entre variables categóricas.
En el caso de necesitar estadísticas aproximadas en conjuntos de datos muy grandes, PySpark ofrece el método approxQuantile()
para calcular cuantiles Q1, Q2, Q3 de manera eficiente:
cuantiles = df_limpio.approxQuantile('rating', [0.25, 0.5, 0.75], 0.01)
print(f"Cuantiles de calificaciones: {cuantiles}")
El parámetro 0.01
indica el error relativo permitido en el cálculo, lo que acelera el procesamiento.
Para mejorar el rendimiento en la exploración, es posible utilizar cache para almacenar el DataFrame en memoria:
df_limpio.cache()
Esto es especialmente beneficioso cuando se realizan múltiples operaciones sobre el mismo DataFrame, reduciendo el tiempo de lectura desde disco.
La detección de valores atípicos es esencial en el perfilamiento. Se pueden utilizar funciones para calcular los límites basados en el rango intercuartílico:
q1, q3 = df_limpio.approxQuantile('quantity', [0.25, 0.75], 0.0)
iqr = q3 - q1
limite_inferior = q1 - 1.5 * iqr
limite_superior = q3 + 1.5 * iqr
df_limpio.filter((df_limpio['quantity'] < limite_inferior) | (df_limpio['quantity'] > limite_superior)).show()
Este código identifica los registros con salarios atípicos, permitiendo un análisis más profundo de posibles anomalías.
Para visualizar la distribución de una variable, aunque PySpark no es una herramienta de visualización, es posible exportar una muestra a herramientas como matplotlib:
muestra_pandas = df_muestra.toPandas()
muestra_pandas['Quantity'].hist()
Esta integración facilita la creación de gráficos y complementa el análisis exploratorio.
Es importante recordar que, al trabajar con grandes volúmenes de datos, las operaciones deben ser optimizadas. El uso de select()
para limitar las columnas procesadas y el filtrado temprano de datos irrelevantes mejora la eficiencia:
df_seleccionado = df.select('edad', 'salario', 'departamento').filter(df['salario'].isNotNull())
De esta manera, se reduce la cantidad de datos en tránsito y se acelera el procesamiento.
Por último, el uso de funciones estadísticas avanzadas está disponible a través de paquetes adicionales como MLlib. Aunque MLlib está orientado al aprendizaje automático, ofrece utilidades para el análisis estadístico que pueden ser aprovechadas en el perfilamiento de datos.
Operaciones de selección, filtrado y ordenamiento
La manipulación eficiente de datos en PySpark requiere un dominio de las operaciones de selección, filtrado y ordenamiento. Estas operaciones son fundamentales para extraer información relevante de grandes conjuntos de datos y preparar los datos para análisis más profundos o para su uso en modelos de aprendizaje automático.
Para seleccionar columnas específicas de un DataFrame, se utiliza el método select()
. Este método permite reducir el conjunto de datos a las columnas de interés, mejorando la eficiencia y la legibilidad del código:
# Seleccionar columnas específicas
df_seleccion = df.select('nombre', 'edad', 'ciudad')
En este ejemplo, se crea un nuevo DataFrame que contiene únicamente las columnas nombre, edad y ciudad. Es posible también renombrar columnas directamente dentro de select()
utilizando expresiones:
from pyspark.sql.functions import col
# Renombrar columnas al seleccionar
df_renombrado = df.select(col('nombre').alias('nombre_completo'), 'edad', 'ciudad')
La función col()
facilita la manipulación de columnas y el uso de alias mejora la claridad al referirse a los datos.
Cuando se requiere realizar una transformación o crear una nueva columna basada en cálculos, withColumn()
es el método adecuado:
from pyspark.sql.functions import when
# Crear una nueva columna basada en una condición
df_nuevo = df.withColumn('mayor_de_edad', when(df["edad"] >= 18, True).otherwise(False))
Aquí se añade la columna mayor_de_edad, que indica si una persona es mayor de edad según su valor en la columna edad. El uso de funciones como when()
permite implementar lógica condicional de manera eficiente.
Para excluir columnas no deseadas, el método drop()
resulta útil:
# Eliminar columnas innecesarias
df_reducido = df.drop('telefono', 'email')
Eliminar columnas irrelevantes ayuda a reducir el consumo de recursos y simplifica el DataFrame para análisis posteriores.
El filtrado de filas basado en condiciones se logra con filter()
o su sinónimo where()
. Estas funciones permiten extraer únicamente los registros que cumplen ciertos criterios:
# Filtrar filas donde la edad es mayor a 30
df_filtrado = df.filter(df.edad > 30)
El uso de operadores como >
, <
, ==
, !=
, &
(and), |
(or) facilita la construcción de condiciones lógicas complejas. Por ejemplo:
# Filtrar filas donde la edad está entre 25 y 40, y la ciudad es Madrid
df_condicional = df.filter((df.edad >= 25) & (df.edad <= 40) & (df.ciudad == 'Madrid'))
Es importante utilizar paréntesis para delimitar correctamente cada condición y asegurar que las operaciones lógicas se evalúen como se espera.
Para manejar valores nulos durante el filtrado, se dispone de funciones como isNull()
e isNotNull()
:
# Filtrar filas donde el salario no es nulo
df_sin_nulos = df.filter(df.salario.isNotNull())
El tratamiento adecuado de valores nulos es esencial para mantener la integridad de los datos y evitar errores en cálculos posteriores.
El método filter()
también admite expresiones en cadena de texto utilizando la sintaxis SQL:
# Filtrar usando una expresión SQL
df_sql = df.filter("edad > 30 AND ciudad = 'Barcelona'")
Esta opción puede ser conveniente para quienes están familiarizados con las consultas SQL y prefieren esta sintaxis para las condiciones.
El ordenamiento de datos se realiza con orderBy()
o su alias sort()
. Estos métodos permiten ordenar el DataFrame según una o más columnas, en orden ascendente o descendente:
# Ordenar por edad en orden ascendente
df_ordenado = df.orderBy('edad')
# Ordenar por salario en orden descendente
df_ordenado_desc = df.orderBy(df.salario.desc())
Al utilizar desc()
o asc()
, se controla el orden de manera explícita. Es posible ordenar por múltiples columnas especificando una lista:
# Ordenar por departamento y luego por salario descendente
df_multiorden = df.orderBy('departamento', df.salario.desc())
Este ordenamiento jerárquico primero organiza los datos por departamento y, dentro de cada uno, ordena por salario de mayor a menor.
Para mejorar el rendimiento al ordenar grandes conjuntos de datos, PySpark utiliza el concepto de particiones. Sin embargo, a veces es necesario reorganizar las particiones para optimizar el procesamiento. El método repartition()
permite redistribuir los datos:
# Reparticionar antes de ordenar
df_repart = df.repartition(4).orderBy('fecha')
Ajustar el número de particiones puede ayudar a equilibrar la carga entre nodos y acelerar las operaciones de ordenamiento.
Cuando se trabaja con datos donde el orden relativo es importante, como filas consecutivas, se pueden utilizar ventanas con la clase Window
y funciones como row_number()
:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Añadir un número de fila ordenado por fecha
window = Window.orderBy(df.fecha)
df_con_fila = df.withColumn('numero_fila', row_number().over(window))
Las ventanas permiten aplicar funciones de agregación y generar nuevas columnas que dependen del orden de los datos.
En escenarios donde se necesita muestrear datos ordenados, es posible combinar orderBy()
con limit()
:
# Obtener los 10 empleados con mayor salario
top_empleados = df.orderBy(df.salario.desc()).limit(10)
El método limit()
extrae un número específico de filas del DataFrame ordenado, útil para análisis de máximos o mínimos.
Para seleccionar datos de manera dinámica, selectExpr()
acepta expresiones SQL y es útil para operaciones más complejas:
# Seleccionar con una expresión SQL
df_expr = df.selectExpr("nombre", "salario * 1.1 as salario_incrementado")
En este caso, se calcula un nuevo valor de salario_incrementado aplicando un aumento del 10% al salario original.
La combinación de selección, filtrado y ordenamiento permite construir consultas poderosas y extraer insights significativos de los datos. Por ejemplo, para obtener los tres productos más vendidos en una categoría específica:
# Filtrar por categoría, agrupar y ordenar por ventas
from pyspark.sql.functions import sum as suma
df_resultado = (df.filter(df.categoria == 'Electrónica')
.groupBy('producto')
.agg(suma('ventas').alias('total_ventas'))
.orderBy('total_ventas', ascending=False)
.limit(3))
Este código primero filtra los productos de la categoría "Electrónica", luego agrupa por producto, suma las ventas y finalmente ordena los resultados para obtener los tres productos con mayores ventas.
Es esencial comprender que las operaciones en PySpark son inmutables, es decir, cada transformación produce un nuevo DataFrame. Esto permite encadenar métodos sin modificar los datos originales:
# Encadenamiento de operaciones
df_procesado = (df.select('nombre', 'edad', 'ciudad', 'salario')
.filter(df.edad >= 18)
.withColumn('salario_anual', df.salario * 12)
.orderBy('salario_anual', ascending=False))
El uso de paréntesis y saltos de línea mejora la legibilidad cuando se realizan múltiples transformaciones.
Para mejorar la eficiencia, PySpark utiliza la evaluación diferida. Las operaciones no se ejecutan hasta que se requiere una acción, como show()
, count()
o write()
. Esto permite optimizar el plan de ejecución y reducir el tiempo de procesamiento.
Al filtrar datos, es importante considerar el uso de broadcast joins si se combinan con otras tablas. Aunque este tema se profundiza en secciones posteriores, es relevante notar que el filtrado adecuado puede minimizar la necesidad de operaciones costosas.
En algunas situaciones, se puede necesitar ordenar los datos dentro de cada grupo. Esto se logra combinando Window
con partitionBy()
:
from pyspark.sql.functions import rank
# Ordenar ventas dentro de cada región
window_region = Window.partitionBy('region').orderBy(df.ventas.desc())
df_ranked = df.withColumn('ranking', rank().over(window_region))
Este enfoque permite analizar el rendimiento relativo dentro de subgrupos de datos, como regiones o categorías.
Para realizar comparaciones entre columnas, las operaciones de selección pueden incluir expresiones lógicas:
# Seleccionar empleados con salario por encima del promedio
salario_promedio = df.agg({'salario': 'avg'}).collect()[0][0]
df_superior = df.filter(df.salario > salario_promedio)
Aquí se calcula el salario promedio y se filtran los empleados que están por encima de ese valor.
Es posible también utilizar funciones personalizadas dentro de las operaciones de selección y filtrado. Las User Defined Functions (UDFs) permiten incorporar lógica específica:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Definir una UDF para categorizar edades
def categorizar_edad(edad):
if edad < 18:
return 'Menor'
elif 18 <= edad < 65:
return 'Adulto'
else:
return 'Senior'
categorizar_edad_udf = udf(categorizar_edad, StringType())
# Usar la UDF en una selección
df_categorizado = df.withColumn('categoria_edad', categorizar_edad_udf(df.edad))
El uso de UDFs debe manejarse con cuidado, ya que pueden afectar el rendimiento al romper la optimización de PySpark.
Transformaciones avanzadas con funciones lambda
Gestionar correctamente las transformaciones avanzadas en PySpark es fundamental para el procesamiento eficiente de grandes volúmenes de datos. Las funciones lambda ofrecen una manera concisa y potente de definir operaciones personalizadas sobre los datos dentro de un DataFrame o RDD. A través de ellas, es posible aplicar lógica específica sin la necesidad de definir funciones nombradas, lo que agiliza el desarrollo y mejora la legibilidad del código.
En PySpark, la función map()
es una de las transformaciones más utilizadas para aplicar una función a cada elemento de un RDD. Por ejemplo, para transformar una lista de números incrementando cada uno en 1:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd_incrementado = rdd.map(lambda x: x + 1)
print(rdd_incrementado.collect())
Este código generará la salida [2, 3, 4, 5, 6]
, demostrando cómo la función lambda aplicada incrementa cada elemento del RDD. La utilización de funciones anónimas como las lambdas permite escribir transformaciones de manera más concisa.
Otra transformación esencial es filter()
, que permite filtrar elementos según una condición. Por ejemplo, para obtener solo los números pares de una lista:
rdd_pares = rdd.filter(lambda x: x % 2 == 0)
print(rdd_pares.collect())
La salida será [2, 4]
, mostrando que la condición lambda ha filtrado correctamente los elementos. Las funciones lambda en el filtrado son especialmente útiles para implementar condiciones personalizadas de una manera elegante.
Las transformaciones no se limitan a operaciones simples. Se pueden combinar múltiples operaciones con funciones lambda para realizar tareas más complejas. Por ejemplo, supongamos que tenemos un DataFrame con información de usuarios y queremos calcular la longitud del nombre de cada usuario:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
data = [("Ana", 23), ("Juan", 34), ("María", 29)]
df = spark.createDataFrame(data, ["nombre", "edad"])
longitud_nombre = udf(lambda x: len(x), IntegerType())
df_con_longitud = df.withColumn("longitud_nombre", longitud_nombre(df.nombre))
df_con_longitud.show()
Este código añade una nueva columna longitud_nombre al DataFrame, calculada mediante una UDF (User Defined Function) basada en una función lambda. Las UDFs permiten aplicar funciones personalizadas que no están disponibles en las funciones integradas de PySpark.
Es importante destacar que las UDFs pueden ser un cuello de botella en el rendimiento, ya que pueden desactivar algunas optimizaciones de PySpark. Para mitigar este problema, se recomienda utilizar las funciones incorporadas en PySpark siempre que sea posible. Por ejemplo, en lugar de usar una UDF para calcular la longitud de una cadena, se puede utilizar la función length()
:
from pyspark.sql.functions import length
df_con_longitud = df.withColumn("longitud_nombre", length(df.nombre))
df_con_longitud.show()
Este enfoque aprovecha las funciones integradas de PySpark, que están optimizadas para el procesamiento distribuido y mejoran la eficiencia.
Las funciones lambda también pueden utilizarse en conjunción con reduce()
para realizar agregaciones. Por ejemplo, para sumar todos los elementos de un RDD:
suma_total = rdd.reduce(lambda x, y: x + y)
print(suma_total)
La salida será 15
, que es la suma de todos los números del RDD. El uso de reduce()
con funciones lambda permite realizar agregaciones personalizadas sin necesidad de funciones adicionales.
En el contexto de DataFrames, las transformaciones complejas pueden lograrse utilizando transform()
junto con funciones lambda. Por ejemplo, si tenemos una columna de listas y queremos transformar cada elemento de la lista:
from pyspark.sql.functions import col, transform
data = [("Alice", [1, 2, 3]), ("Bob", [4, 5, 6])]
df = spark.createDataFrame(data, ["nombre", "valores"])
df_transformado = df.withColumn("valores_incrementados", transform(col("valores"), lambda x: x + 1))
df_transformado.show()
La salida mostrará que para cada fila, los valores en la lista han sido incrementados en 1. La función transform()
aplicada sobre la columna de tipo array permite la manipulación elemento a elemento utilizando funciones lambda.
Asimismo, las funciones de orden superior como filter()
, exists()
, y aggregate()
pueden utilizarse en columnas de tipo array con funciones lambda. Por ejemplo, para filtrar elementos pares en una columna array:
from pyspark.sql.functions import filter
df_filtrado = df.withColumn("valores_pares", filter(col("valores"), lambda x: x % 2 == 0))
df_filtrado.show()
Este código producirá una nueva columna valores_pares que contiene solo los números pares de la lista original. Las funciones lambda proporcionan una manera flexible de aplicar lógica condicional a los elementos de estructuras complejas.
Para combinar y transformar datos de manera más sofisticada, es posible utilizar mapPartitions()
, que aplica una función a cada partición del RDD. Esto puede mejorar el rendimiento al reducir la sobrecarga de llamada por elemento:
def procesar_particion(iterator):
for x in iterator:
yield x * 2
rdd_dobles = rdd.mapPartitions(procesar_particion)
print(rdd_dobles.collect())
La salida será [2, 4, 6, 8, 10]
, mostrando que cada elemento ha sido multiplicado por 2. Al operar a nivel de partición, se pueden optimizar tareas que requieren acceso a los datos de una partición completa.
En situaciones donde se necesita acceso tanto a los índices como a los valores, mapWithIndex()
es útil. Sin embargo, es más común en RDDs y requiere una gestión cuidadosa para mantener la coherencia de los datos en un entorno distribuido.
Las funciones lambda también se emplean en flatMap()
, que es similar a map()
pero cada elemento de entrada puede generar cero o más elementos de salida. Por ejemplo, para dividir frases en palabras:
rdd_frases = spark.sparkContext.parallelize(["Hola mundo", "Aprendiendo PySpark"])
rdd_palabras = rdd_frases.flatMap(lambda frase: frase.split(" "))
print(rdd_palabras.collect())
La salida será ['Hola', 'mundo', 'Aprendiendo', 'PySpark']
, demostrando cómo flatMap()
expande cada frase en sus palabras constituyentes. Este enfoque es útil para tareas de procesamiento de texto y análisis lingüístico.
En el contexto de DataFrames, si se requiere aplicar una transformación compleja que involucra múltiples columnas, se puede utilizar withColumn()
junto con funciones lambda y when()
. Por ejemplo, para categorizar edades:
from pyspark.sql.functions import when
df_categorizado = df.withColumn("categoria_edad", when(df.edad < 18, "Menor")
.when(df.edad < 65, "Adulto")
.otherwise("Senior"))
df_categorizado.show()
Este código asigna una categoría a cada individuo basado en su edad, utilizando una serie de condiciones. Aunque no es una función lambda per se, demuestra cómo aplicar lógica compleja en transformaciones de columnas.
Para operaciones que requieren evaluar una expresión en tiempo de ejecución, expr()
puede ser combinado con funciones lambda. Por ejemplo:
from pyspark.sql.functions import expr
columna_dinamica = "edad * 2"
df_transformado = df.withColumn("edad_doble", expr(columna_dinamica))
df_transformado.show()
Este enfoque permite definir expresiones dinámicas que pueden ser modificadas sin cambiar el código, proporcionando flexibilidad en las transformaciones.
Es importante tener en cuenta que, aunque las funciones lambda ofrecen una gran potencia, deben utilizarse con precaución. Un uso excesivo o inapropiado puede conducir a código difícil de leer y mantener. Siempre que sea posible, se deben preferir las funciones integradas y las expresiones claras.
Para finalizar, las transformaciones avanzadas con funciones lambda en PySpark expanden las capacidades de manipulación de datos, permitiendo aplicar transformaciones personalizadas y eficientes en grandes conjuntos de datos. Su correcta aplicación facilita la implementación de lógica específica del dominio y mejora la productividad en el desarrollo de aplicaciones de procesamiento de datos.
Agregaciones y operaciones de grupo
En el análisis de datos con PySpark, las agregaciones y operaciones de grupo son esenciales para obtener resúmenes y estadísticas significativas de grandes conjuntos de datos. Estas operaciones permiten agrupar los datos en función de una o varias columnas y aplicar funciones de agregación para extraer información valiosa.
El método fundamental para agrupar datos es groupBy()
. Este método se utiliza para dividir el DataFrame en grupos basados en los valores de una o más columnas. Una vez agrupados, es posible aplicar funciones de agregación como sum()
, avg()
, min()
, max()
y count()
.
Por ejemplo, supongamos un DataFrame llamado ventas
con las columnas producto
, categoria
y cantidad
. Para obtener el total de ventas por categoría, se utiliza:
ventas_por_categoria = ventas.groupBy('categoria').sum('cantidad')
ventas_por_categoria.show()
En este código, groupBy('categoria')
agrupa las filas por la columna categoria, y sum('cantidad')
calcula la suma de la columna cantidad para cada grupo. De esta manera, se obtiene el total de ventas para cada categoría específica.
Además de sum()
, PySpark ofrece diversas funciones de agregación integradas. Para calcular el promedio de ventas por producto, se emplea:
ventas_por_producto = ventas.groupBy('producto').avg('cantidad')
ventas_por_producto.show()
Aquí, avg('cantidad')
calcula el promedio de la columna cantidad para cada producto, proporcionando una visión detallada del rendimiento de cada artículo.
Cuando se requieren múltiples agregaciones simultáneamente, el método agg()
es la opción adecuada. Este método permite aplicar varias funciones de agregación al mismo tiempo:
from pyspark.sql.functions import sum, avg
ventas_agrupadas = ventas.groupBy('categoria').agg(
sum('cantidad').alias('total_cantidad'),
avg('cantidad').alias('promedio_cantidad')
)
ventas_agrupadas.show()
En este caso, se importan las funciones sum
y avg
de pyspark.sql.functions
y se aplican en agg()
. La utilización de alias()
renombra las columnas resultantes para mejorar la claridad de los datos agregados.
Es posible agrupar por múltiples columnas especificando todas en groupBy()
. Por ejemplo, para obtener el total de ventas por producto y categoría:
ventas_detalladas = ventas.groupBy('categoria', 'producto').sum('cantidad')
ventas_detalladas.show()
Este enfoque proporciona un análisis más granular, permitiendo entender cómo cada producto contribuye dentro de su categoría.
Las funciones de ventana o window functions son herramientas avanzadas que permiten realizar cálculos agregados sin alterar el nivel de detalle de los datos. Estas funciones utilizan la cláusula over(Window)
para especificar el ámbito de la agregación:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum
ventana = Window.partitionBy('categoria').orderBy('cantidad').rowsBetween(Window.unboundedPreceding, 0)
ventas_acumuladas = ventas.withColumn('ventas_acumuladas', sum('cantidad').over(ventana))
ventas_acumuladas.show()
En este ejemplo, se calcula el total acumulado de ventas para cada categoría hasta la fecha actual. La ventana está definida por partitionBy('categoria')
y orderBy('fecha')
, lo que garantiza que el cálculo acumulado se reinicie para cada categoría.
La creación de tablas dinámicas o pivotes se logra con el método pivot()
. Esta técnica es útil para reorganizar los datos y facilitar el análisis de múltiples dimensiones:
ventas_pivot = ventas.groupBy('producto').pivot('cantidad').sum('cantidad')
ventas_pivot.show()
Utilizando pivot('año')
, los valores únicos de la columna año se convierten en columnas individuales. Luego, sum('cantidad')
calcula la suma de ventas para cada producto en cada año.
Al trabajar con agregaciones, es crucial manejar los valores nulos adecuadamente. Las funciones de agregación suelen ignorar los nulos, pero es recomendable verificar su presencia:
from pyspark.sql.functions import col
registros_totales = ventas.count()
registros_con_nulos = ventas.filter(col('cantidad').isNull()).count()
print(f"Total de registros: {registros_totales}")
print(f"Registros con cantidad nula: {registros_con_nulos}")
Este análisis permite identificar si los datos faltantes pueden afectar los resultados y si es necesario aplicar técnicas de imputación o limpieza.
La optimización del rendimiento es esencial al realizar agregaciones en datasets grandes. Las operaciones de groupBy()
pueden ser costosas si no se gestionan correctamente las particiones. Utilizar repartition()
puede ayudar a distribuir los datos de manera más eficiente:
ventas_reparticionadas = ventas.repartition('categoria')
ventas_por_categoria = ventas_reparticionadas.groupBy('categoria').sum('cantidad')
ventas_por_categoria.show()
Al reparticionar por la columna categoria, se mejora la paralelización de la operación de agrupamiento, reduciendo el tiempo de ejecución.
Para cálculos más complejos, PySpark permite utilizar agregaciones personalizadas. Aunque la creación de UDAFs (User Defined Aggregate Functions) en PySpark es limitada, se pueden combinar funciones existentes para lograr el resultado deseado:
from pyspark.sql.functions import expr
ventas_percentil = ventas.groupBy('categoria').agg(
expr('percentile_approx(cantidad, 0.5)').alias('mediana_cantidad')
)
ventas_percentil.show()
En este ejemplo, se calcula la mediana de las ventas por categoría utilizando percentile_approx
, que estima los percentiles de manera eficiente en grandes conjuntos de datos.
Las agregaciones también pueden aplicarse sin agrupar, utilizando el método agg()
directamente sobre el DataFrame:
from pyspark.sql.functions import sum, avg, min, max
estadisticas_ventas = ventas.agg(
sum('cantidad').alias('total_cantidad'),
avg('cantidad').alias('promedio_cantidad'),
min('cantidad').alias('min_cantidad'),
max('cantidad').alias('max_cantidad')
)
estadisticas_ventas.show()
Este enfoque proporciona una visión general de las estadísticas globales del dataset, útil para el perfilamiento inicial de datos.
Cuando se requiere flexibilidad en las agrupaciones, el método rollup()
permite crear subtotales y totales acumulados:
ventas_rollup = ventas.groupBy('categoria', 'producto').rollup('fecha').sum('cantidad')
ventas_rollup.orderBy('categoria', 'producto', 'fecha').show()
Con rollup('fecha')
, se generan agregaciones en diferentes niveles de detalle, incluyendo totales por producto y categoría sin tener en cuenta la fecha.
Por otro lado, cube()
crea una hipercubo de agregaciones, generando todas las combinaciones posibles entre las columnas especificadas:
ventas_cubo = ventas.groupBy('categoria', 'producto').cube('año', 'mes').sum('cantidad')
ventas_cubo.show()
Este método es útil para análisis multidimensionales, aunque puede aumentar significativamente el volumen de datos resultantes.
Para asegurar que las agregaciones sean precisas y eficientes, es recomendable utilizar las funciones integradas de PySpark y evitar las UDFs cuando sea posible. Las funciones integradas están optimizadas para el procesamiento distribuido y aprovechan al máximo el Catalyst Optimizer de Spark.
Finalmente, es importante recordar que las agregaciones y operaciones de grupo son herramientas poderosas que, utilizadas correctamente, permiten extraer insights profundos de los datos. Mediante el uso eficiente de groupBy()
, funciones de agregación y técnicas avanzadas como ventanas y pivotes, se puede abordar una amplia gama de problemas analíticos en entornos de big data.
Joins y combinaciones de DataFrames
En PySpark, la operación de join es fundamental para combinar conjuntos de datos y enriquecer la información disponible. Los DataFrames permiten realizar varias formas de combinaciones, facilitando el análisis y manipulación de grandes volúmenes de datos distribuidos.
La función principal para realizar joins en PySpark es join()
, que permite combinar dos DataFrames basándose en una o más columnas comunes. La sintaxis básica es:
df_resultante = df1.join(df2, on='columna_común', how='tipo_de_join')
El argumento how
especifica el tipo de join a realizar. Los tipos más comunes son:
- Inner Join (
how='inner'
): Devuelve las filas que tienen coincidencias en ambos DataFrames. - Left Outer Join (
how='left'
): Devuelve todas las filas del DataFrame izquierdo y las filas coincidentes del derecho. - Right Outer Join (
how='right'
): Devuelve todas las filas del DataFrame derecho y las filas coincidentes del izquierdo. - Full Outer Join (
how='outer'
): Devuelve todas las filas cuando hay coincidencia en al menos uno de los DataFrames. - Cross Join (
how='cross'
): Calcula el producto cartesiano de los DataFrames.
Por ejemplo, para realizar un inner join entre dos DataFrames basándose en la columna nombre
:
df_join = df1.join(df2, on='nombre', how='inner')
Este código combina los registros de ambos DataFrames donde el nombre
coincide, permitiendo acceder a la información de empleados junto con los detalles de su departamento.
Es común que los DataFrames tengan columnas con el mismo nombre pero diferentes contenidos. Para evitar conflictos, es recomendable renombrar las columnas antes de realizar el join o utilizar el parámetro suffixes
para añadir sufijos a las columnas duplicadas:
df1_renombrado = df1.withColumnRenamed('nombre', 'nombres')
df2_renombrado = df2.withColumnRenamed('nombre', 'nombres')
df_resultante = df1_renombrado.join(df2_renombrado, df1_renombrado.nombres == df2_renombrado.nombres, 'inner')
En este ejemplo, se renombraron las columnas para distinguirlas, y se especificó la condición de join de manera explícita utilizando df1_renombrado.nombres == df2_renombrado.nombres
.
Para casos donde es necesario unir DataFrames basándose en múltiples columnas, se puede pasar una lista de columnas al parámetro on
:
columnas_comunes = ['columna1', 'columna2']
df_multiple_join = df1.join(df2, on=columnas_comunes, how='left')
Esto realiza un left join utilizando columna1
y columna2
como claves para la unión.
Las joins externas permiten manejar registros que no tienen coincidencias en el otro DataFrame. Por ejemplo, un left outer join retiene todos los registros del DataFrame izquierdo, añadiendo valores nulos donde no hay coincidencias en el derecho:
df_left_outer = df_empleados.join(df_departamentos, on='departamento_id', how='left')
De esta forma, se incluyen todos los empleados, incluso aquellos que no están asignados a ningún departamento.
En situaciones donde se desea obtener registros que están en un DataFrame pero no en el otro, se utilizan los left anti join y left semi join:
- Left Anti Join (
how='left_anti'
): Devuelve las filas del DataFrame izquierdo que no tienen coincidencia en el derecho. - Left Semi Join (
how='left_semi'
): Devuelve las filas del DataFrame izquierdo que tienen coincidencia en el derecho, pero solo las columnas del izquierdo.
Por ejemplo, para obtener los empleados que no están asignados a ningún departamento:
df_sin_departamento = df_empleados.join(df_departamentos, on='departamento_id', how='left_anti')
El broadcast join es una técnica para optimizar el rendimiento cuando uno de los DataFrames es pequeño. Al transmitir (broadcast) el DataFrame pequeño a todos los nodos, se reduce la necesidad de mover datos a través de la red:
from pyspark.sql.functions import broadcast
df_optimizado = df_grande.join(broadcast(df_pequeño), on='clave', how='inner')
El uso de broadcast()
indica a PySpark que distribuya el DataFrame df_pequeño
a todos los nodos, mejorando la eficiencia del join.
Para combinar DataFrames verticalmente, se utilizan operaciones como union()
y unionByName()
. La función union()
requiere que los DataFrames tengan el mismo esquema y orden de columnas:
df_union = df1.union(df2)
Si los DataFrames tienen las mismas columnas pero en distinto orden, unionByName()
es más apropiado:
df_union_by_name = df1.unionByName(df2)
Esta operación alinea las columnas por nombre, permitiendo combinar DataFrames con esquemas similares pero columnas en diferente orden.
Al realizar joins, es crucial manejar correctamente los valores nulos. Si las columnas clave contienen nulos, las coincidencias pueden no ocurrir como se espera. Es recomendable filtrar o imputar valores nulos antes de realizar el join:
df_empleados_limpio = df_empleados.filter(df_empleados.departamento_id.isNotNull())
df_join = df_empleados_limpio.join(df_departamentos, on='departamento_id', how='inner')
Esta práctica asegura que solo se unan registros con un departamento_id
válido.
En situaciones donde las claves para unir los DataFrames tienen nombres diferentes, se puede especificar la condición del join usando expresiones:
df_join = df1.join(df2, df1.id == df2.identificador, how='inner')
Aquí, df1.id
se une con df2.identificador
, a pesar de tener nombres diferentes.
Para evitar errores al trabajar con columnas homónimas, se puede utilizar la propiedad df.colName
o el método col()
:
from pyspark.sql.functions import col
df_join = df1.alias('df1').join(df2.alias('df2'), col('df1.id') == col('df2.id'), how='inner')
El uso de alias()
permite referirse a cada DataFrame con un nombre, evitando conflictos de nombres y mejorando la legibilidad.
Las joins cruzadas o cross joins generan el producto cartesiano de los DataFrames. Se utilizan con precaución debido al aumento exponencial del número de registros:
df_cross = df1.crossJoin(df2)
Esta operación resulta en un DataFrame donde cada fila de df1
se combina con cada fila de df2
.
Para operaciones de conjunto como intersección y diferencia, PySpark proporciona métodos específicos:
- Intersección:
intersect()
df_interseccion = df1.intersect(df2)
- Diferencia:
subtract()
df_diferencia = df1.subtract(df2)
Estas operaciones permiten comparar DataFrames y extraer registros comunes o distintos.
Al unirse DataFrames con estructuras complejas como arrays o estructuras anidadas, es posible necesitar explotar o aplanar estas estructuras antes de realizar el join:
from pyspark.sql.functions import explode
df_exploded = df.withColumn('elemento', explode(df.array_columna))
Después de expandir los elementos del array, se puede realizar el join utilizando la nueva columna elemento
.
Es importante considerar el rendimiento y la optimización al realizar joins en entornos de big data. Algunas prácticas recomendadas incluyen:
- Reparticionar DataFrames para equilibrar la carga:
df_reparticionado = df.repartition('columna_clave')
Evitar shuffles innecesarios al unirse por columnas particionadas.
Reducir el tamaño de los DataFrames seleccionando solo las columnas necesarias antes del join:
df_reducido = df.select('columna_clave', 'columna_relevante')
Estas estrategias ayudan a minimizar el costo computacional y a mejorar la eficiencia de las operaciones.
Cuando se trabaja con múltiples joins, es crítico planificar el orden de las operaciones. Un mal orden puede resultar en tiempos de ejecución prolongados. Analizar el plan de ejecución con explain()
puede proporcionar información valiosa:
df_join.explain()
El plan muestra cómo PySpark ejecutará las operaciones, permitiendo identificar cuellos de botella y optimizar el código.
Para realizar joins basados en condiciones complejas, se pueden utilizar expresiones lógicas en la condición del join:
df_condicional = df1.join(df2, (df1.columna1 == df2.columna2) & (df1.columna3 > df2.columna4), how='inner')
Esta flexibilidad permite unir DataFrames utilizando condiciones más elaboradas que simples igualdades.
Finalmente, es esencial asegurar la calidad de los datos antes y después de los joins. Verificar el número de registros, la presencia de duplicados y la coherencia de los datos garantiza resultados fiables:
registros_antes = df1.count()
df_resultante = df1.join(df2, on='clave', how='inner')
registros_después = df_resultante.count()
print(f"Registros antes del join: {registros_antes}")
print(f"Registros después del join: {registros_después}")
Este tipo de comprobaciones ayuda a detectar posibles problemas y a validar que las operaciones de join se han realizado correctamente.
Todas las lecciones de PySpark
Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Pyspark
Introducción Y Entorno
Instalación De Pyspark
Introducción Y Entorno
Fundamentos De Pyspark
Introducción Y Entorno
Manipulación Y Análisis De Datos Con Pyspark
Transformación De Datos
Pyspark Sql
Transformación De Datos
Trabajo Con Datos Complejos
Transformación De Datos
Introducción A Mllib
Aprendizaje Automático
Preparación De Datos Para Mllib
Aprendizaje Automático
Regresión Con Mllib
Aprendizaje Automático
Clasificación Con Mllib
Aprendizaje Automático
Modelos De Clustering
Aprendizaje Automático
Reducción De La Dimensionalidad
Aprendizaje Automático
Recomendación
Aprendizaje Automático
Pipelines
Aprendizaje Automático
Mllib Con Scikit Learn
Integraciones
Mllib Con Tensorflow
Integraciones
En esta lección
Objetivos de aprendizaje de esta lección
- Entender cómo leer y escribir datos en distintos formatos como CSV, JSON y Parquet.
- Aplicar técnicas de exploración y perfilamiento de datos usando PySpark.
- Realizar operaciones de selección, filtrado y ordenamiento sobre grandes conjuntos de datos.
- Utilizar funciones lambda para llevar a cabo transformaciones avanzadas.
- Realizar agregaciones y operaciones de grupo para obtener estadísticas detalladas.
- Comprender y aplicar diferentes tipos de joins para combinar DataFrames eficientemente.