La transformación de datos en PySpark se basa en una combinación de operaciones declarativas y funciones específicas que permiten manipular el contenido de un DataFrame de forma distribuida. Cada transformación produce un nuevo DataFrame sin alterar el original, lo que mantiene la inmutabilidad de los datos y promueve un desarrollo ordenado.
Para realizar estas transformaciones, se suelen emplear métodos como select
, filter
, groupBy
, withColumn
o join
. Estos métodos se encadenan de modo que cada operación tome la salida de la anterior, conformando un flujo de trabajo con pasos fáciles de seguir. A continuación, se muestran algunos enfoques comunes:
- Selección de columnas y creación de columnas derivadas.
- Filtrado de filas en función de criterios numéricos, lógicos o de texto.
- Agrupaciones y agregaciones para resumir la información.
- Uniones entre varios DataFrames, análogas a las uniones de SQL.
- Transformaciones con funciones de usuario (UDF) cuando es necesario aplicar lógica más específica.
En primer lugar, es frecuente utilizar select
y withColumn
para ajustar la estructura de un DataFrame. Por ejemplo, supongamos que se tiene un DataFrame df
con una columna llamada precio y se quiere crear una nueva columna que aplique un descuento:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("TransformacionesPySpark") \
.getOrCreate()
df = spark.read.csv("ruta/ventas.csv", header=True, inferSchema=True)
df_transformado = df.withColumn("precio_descuento", col("precio") * 0.90)
df_transformado.select("producto", "precio", "precio_descuento").show()
En este ejemplo, col ayuda a referenciar la columna precio para generar la nueva precio_descuento. Además de multiplicar valores, se pueden usar funciones incorporadas en PySpark para formatear fechas, manipular cadenas o realizar operaciones matemáticas más complejas.
Para filtrar filas, basta con usar métodos como filter
o where
junto a expresiones lógicas. Por ejemplo, si se desea filtrar los productos con un precio mayor a 100:
df_filtrado = df_transformado.filter(col("precio") > 100)
df_filtrado.show()
Las agrupaciones se implementan con groupBy
, seguido de agregaciones como avg
, count
o sum
. Un caso típico es contar la cantidad de ventas por categoría:
from pyspark.sql.functions import count
df_categorias = df.groupBy("categoria").agg(count("*").alias("total_ventas"))
df_categorias.show()
Las uniones permiten combinar varios DataFrames en una sola estructura. Por ejemplo, si se dispone de un DataFrame con ventas y otro con detalles del producto, se podría realizar un join en la columna que los relaciona:
Guarda tu progreso
Inicia sesión para no perder tu progreso y accede a miles de tutoriales, ejercicios prácticos y nuestro asistente de IA.
Más de 25.000 desarrolladores ya confían en CertiDevs
df_detalles = spark.read.parquet("ruta/detalles_productos/")
df_unido = df.join(df_detalles, df["producto_id"] == df_detalles["id"], "inner")
df_unido.show()
En ocasiones, se requiere lógica compleja que no se cubre con las funciones integradas. En estos casos, se recurre a una UDF (User Defined Function) escrita en Python. Sin embargo, conviene emplear primero las funciones nativas de PySpark, ya que se integran con el optimizador de consultas y suelen ofrecer un rendimiento mayor.
Además de la API de DataFrame, Spark SQL brinda la posibilidad de expresar transformaciones de datos mediante sentencias SQL clásicas. Para ello, se registra el DataFrame como una vista temporal y se emplean sentencias SELECT
, JOIN
, WHERE
y agregaciones con GROUP BY
. Por ejemplo:
df.createOrReplaceTempView("ventas")
consulta = """
SELECT categoria,
AVG(precio) as precio_medio
FROM ventas
GROUP BY categoria
"""
df_sql = spark.sql(consulta)
df_sql.show()
Este método facilita la adopción de PySpark a personas habituadas a SQL y proporciona la misma potencia de ejecución distribuida.
Para flujos de datos en tiempo real, Structured Streaming reutiliza estas transformaciones sobre un DataFrame en streaming. Se define una fuente de entrada (por ejemplo, Kafka), las transformaciones necesarias y, por último, la salida. Aunque el enfoque es similar, cada operación se ejecuta de forma continua o en microbatches, asegurando que las transformaciones se apliquen a medida que llegan los datos.
La transformación de datos en PySpark combina herramientas declarativas, integración con SQL, funciones avanzadas y opciones de escalabilidad. Con estas capacidades, se pueden construir pipelines de procesamiento para filtrar, agregar y limpiar grandes volúmenes de datos, respaldando desde tareas analíticas hasta modelos de aprendizaje automático.
Completa PySpark y certifícate
Únete a nuestra plataforma y accede a miles de tutoriales, ejercicios prácticos, proyectos reales y nuestro asistente de IA personalizado para acelerar tu aprendizaje.
Asistente IA
Resuelve dudas al instante
Ejercicios
Practica con proyectos reales
Certificados
Valida tus conocimientos
Más de 25.000 desarrolladores ya se han certificado con CertiDevs