PySpark

Tutorial PySpark: Trabajo con datos complejos

Descubre cómo manipular datos anidados en PySpark con estructuras, arrays y mapas. Aprende técnicas esenciales para aplanar y transformar información compleja en Apache Spark con Python.

Aprende PySpark GRATIS y certifícate

Manejo de datos anidados (estructuras, arrays, mapas)

El manejo de datos anidados es esencial al trabajar con estructuras complejas en PySpark. Las columnas pueden contener tipos como StructType, ArrayType y MapType, que permiten representar datos jerárquicos y relacionales dentro de un DataFrame.

StructType

Para manejar estructuras (StructType), se pueden acceder a los campos anidados utilizando la notación de punto. Por ejemplo, si tenemos una columna persona con subcampos nombre y edad, se puede seleccionar el nombre así:

df.select(df.persona.nombre.alias("nombre_persona"))

Es posible crear nuevas estructuras combinando varias columnas:

from pyspark.sql.functions import struct

df_con_estructura = df.select(struct("columna1", "columna2").alias("nueva_estructura"))

ArrayType

Los arrays (ArrayType) permiten almacenar listas de elementos en una sola columna. Para acceder a los elementos de un array, se pueden utilizar funciones como explode, que transforman cada elemento del array en una fila individual:

from pyspark.sql.functions import explode

df_exploded = df.select(explode("lista_elementos").alias("elemento"))

También es posible acceder a elementos específicos de un array mediante índices:

df.select(df.lista_elementos[0].alias("primer_elemento"))

MapType

Los mapas (MapType) son útiles para almacenar pares clave-valor. Para interactuar con mapas, se pueden usar funciones como map_keys y map_values:

from pyspark.sql.functions import map_keys, map_values

df.select(map_keys("mi_mapa").alias("claves"), map_values("mi_mapa").alias("valores"))

Para acceder a un valor específico dentro de un mapa, se puede utilizar la sintaxis de corchetes:

df.select(df.mi_mapa["clave_especifica"].alias("valor"))

La manipulación de datos anidados es más eficiente cuando se utiliza la API de expresiones de PySpark, en lugar de funciones de usuario que pueden disminuir el rendimiento. Además, es importante considerar el esquema del DataFrame para entender la estructura de los datos:

df.printSchema()

Esta función muestra una representación en árbol del esquema, ayudando a identificar estructuras anidadas y tipos de datos. Manejar correctamente los tipos de datos complejos permite aprovechar al máximo las capacidades de PySpark en el procesamiento distribuido de grandes volúmenes de información.

Explosión y aplanamiento de datos anidados

Al trabajar con datos anidados en PySpark, es común encontrarse con estructuras como arrays, mapas y estructuras anidadas dentro de columnas de un DataFrame. Para facilitar el análisis y procesamiento, es necesario aplanar estas estructuras, convirtiéndolas en un formato tabular más manejable. La explosión de datos anidados es una técnica que permite transformar elementos anidados en filas individuales.

La función explode es fundamental para descomponer arrays o mapas en múltiples filas. Por ejemplo, si se tiene una columna intereses que es un array de strings, se puede convertir cada interés en una fila separada:

from pyspark.sql.functions import explode

df_exploded = df.select("id", explode("intereses").alias("interes"))

En este caso, por cada elemento en el array intereses, se crea una nueva fila con el valor explotado en la columna interes. Es importante notar que la función explode mantiene las demás columnas, replicándolas en las filas adicionales.

Cuando se trabaja con mapas (**MapType**), explode también puede ser útil. Para separar las claves y valores de un mapa, se puede aplicar:

df_exploded_map = df.select("id", explode("atributos").alias("clave", "valor"))

Esto generará nuevas filas donde cada par clave-valor del mapa atributos se descompone en columnas separadas.

En casos donde se requiere conservar el índice o posición de los elementos en un array, se puede utilizar posexplode. Esta función añade una columna adicional con el índice de posición:

from pyspark.sql.functions import posexplode

df_posexploded = df.select("id", posexplode("intereses").alias("posicion", "interes"))

Si se desea aplanar estructuras anidadas completas, especialmente cuando hay múltiples niveles de anidación, se pueden combinar varias operaciones de select y acceder a los campos utilizando la notación de punto. Por ejemplo:

df_aplanado = df.select(
    "id",
    "nombre",
    "direccion.calle",
    "direccion.ciudad",
    "direccion.codigo_postal"
)

Aquí, la estructura anidada direccion se ha aplanado extrayendo sus campos individuales como columnas separadas. Esto facilita el acceso y análisis de los datos sin la complejidad de las estructuras anidadas.

Para manejar arrays de estructuras, es posible combinar explode con la notación de punto. Si se tiene una columna pedidos que es un array de estructuras con campos producto y cantidad, se puede explotar y aplanar así:

df_pedidos = df.select(
    "id_cliente",
    explode("pedidos").alias("pedido")
).select(
    "id_cliente",
    "pedido.producto",
    "pedido.cantidad"
)

Este proceso permite convertir cada elemento del array de estructuras en una fila individual con columnas separadas para cada campo, facilitando la manipulación de datos.

Cuando se requiere aplanar todas las columnas anidadas de un DataFrame sin especificarlas manualmente, se puede utilizar una función recursiva que recorre el esquema y genera las expresiones necesarias. Esto es especialmente útil con esquemas complejos y desconocidos previamente.

Por último, la función flatten es útil para aplanar arrays anidados dentro de otros arrays, convirtiéndolos en un solo array plano. Por ejemplo:

from pyspark.sql.functions import flatten

df_flattened = df.select(flatten("array_de_arrays").alias("array_aplanado"))

El uso adecuado de estas funciones y técnicas permite manejar de manera eficiente los datos anidados en PySpark, optimizando el rendimiento y simplificando el proceso de análisis y transformación de datos.

Funciones de orden superior (map, filter, aggregate)

Las funciones de orden superior en PySpark permiten realizar transformaciones complejas y eficientes sobre datos estructurados y anidados. Estas funciones operan sobre tipos de datos como arrays y mapas, y aceptan como parámetros otras funciones, facilitando manipular colecciones de una manera declarativa y concisa.

Una de las funciones más utilizadas es transform, que es equivalente a **map** en otros lenguajes funcionales. Permite aplicar una función a cada elemento de un array y devolver un nuevo array con los resultados. Por ejemplo, si se desea incrementar en 1 cada elemento de un array numérico:

from pyspark.sql.functions import expr

df = df.withColumn("array_incrementado", expr("transform(array_columna, x -> x + 1)"))

En este caso, la expresión lambda x -> x + 1 se aplica a cada elemento x del array array_columna, generando un nuevo array array_incrementado.

La función filter es esencial para filtrar elementos dentro de un array según una condición. Si se quiere extraer solo los valores mayores que 5 de un array:

df = df.withColumn("array_filtrado", expr("filter(array_columna, x -> x > 5)"))

Aquí, solo los elementos x que cumplen la condición x > 5 se incluyen en el nuevo array array_filtrado. Este uso de funciones lambda permite expresar operaciones complejas de manera sencilla.

Para operaciones de agregación sobre arrays, la función aggregate es fundamental. Permite reducir un array a un único valor utilizando una función de acumulación. Por ejemplo, para calcular la suma total de los elementos de un array:

df = df.withColumn("suma_array", expr("aggregate(array_columna, 0, (acum, x) -> acum + x)"))

En este caso, 0 es el valor inicial de acumulación, y (acum, x) -> acum + x es la función que suma cada elemento x al acumulador acum. El resultado es un nuevo campo suma_array con la suma total.

Es importante destacar que estas funciones pueden anidarse y combinarse para operaciones más complejas. Por ejemplo, para calcular el promedio de los elementos de un array:

df = df.withColumn("promedio_array", expr("""
    aggregate(
        array_columna,
        (0, 0),
        (acum, x) -> (acum._1 + x, acum._2 + 1),
        acum -> acum._1 / acum._2
    )
"""))

En este ejemplo, el acumulador es una tupla que mantiene la suma parcial y el conteo de elementos. Finalmente, se calcula el promedio dividiendo la suma entre el conteo.

Las funciones de orden superior también soportan trabajar con índices de los arrays. La función transform puede acceder al índice de cada elemento mediante una sintaxis extendida:

df = df.withColumn("array_con_indices", expr("transform(array_columna, (x, i) -> concat(x, '_', i))"))

Aquí, i representa el índice del elemento x en array_columna, y se crea una nueva cadena concatenando ambos.

Además de arrays, estas funciones pueden aplicarse a mapas. Por ejemplo, para incrementar todos los valores en un mapa numérico:

df = df.withColumn("mapa_incrementado", expr("transform_values(mapa_columna, (k, v) -> v + 1)"))

La función transform_values aplica una transformación a cada valor v del mapa mapa_columna, manteniendo las claves k originales.

Para filtrar pares clave-valor en un mapa según una condición, se utiliza filter:

df = df.withColumn("mapa_filtrado", expr("map_filter(mapa_columna, (k, v) -> v > 5)"))

En este caso, solo los pares donde el valor v es mayor que 5 se conservan en mapa_filtrado.

Cuando se trabaja con estructuras anidadas, es posible utilizar estas funciones para acceder y transformar datos internos. Por ejemplo, si se tiene un array de estructuras y se desea modificar uno de los campos de cada estructura:

df = df.withColumn("array_modificado", expr("transform(array_struct, x -> named_struct('campo1', x.campo1, 'campo2', x.campo2 + 1))"))

Aquí, se crea un nuevo array array_modificado donde cada elemento es una estructura con el campo campo2 incrementado en 1.

Las funciones de orden superior en PySpark son herramientas que permiten manipular datos complejos de forma eficiente y expresiva. Al utilizarlas, es fundamental entender la semántica de las funciones lambda y cómo operan sobre cada elemento o par clave-valor en arrays y mapas. Esto facilita realizar transformaciones avanzadas sin recurrir a bucles explícitos o funciones de usuario, optimizando el rendimiento y aprovechando al máximo las capacidades de procesamiento distribuido de PySpark.

Manipulación de fechas y timestamps

El manejo eficiente de fechas y timestamps es crucial en la transformación y análisis de datos con PySpark. Las funciones específicas para trabajar con estos tipos de datos permiten realizar operaciones como conversión de formatos, cálculos de intervalos y extracción de componentes temporales.

Para comenzar, es importante entender los tipos de datos que PySpark utiliza para representar fechas y timestamps. El tipo DateType almacena solo la parte de la fecha (año, mes y día), mientras que TimestampType incluye tanto la fecha como la hora (horas, minutos, segundos y fracciones de segundo).

Conversión de cadenas a fechas y timestamps

Cuando se trabaja con datos importados, es común que las fechas y timestamps se representen como cadenas de texto. Para convertir una cadena a un tipo de dato fecha, se utiliza la función to_date:

from pyspark.sql.functions import to_date

df_con_fecha = df.withColumn("fecha_formateada", to_date("fecha_cadena", "dd/MM/yyyy"))

En este ejemplo, la columna fecha_cadena contiene fechas en formato dd/MM/yyyy. La función to_date convierte las cadenas en objetos de fecha, facilitando operaciones posteriores.

Para convertir cadenas a timestamps, se emplea to_timestamp:

from pyspark.sql.functions import to_timestamp

df_con_timestamp = df.withColumn("timestamp_formateado", to_timestamp("timestamp_cadena", "dd/MM/yyyy HH:mm:ss"))

Aquí, timestamp_cadena incluye tanto fecha como hora, y el formato especifica cómo interpretar la cadena. Es fundamental que el formato proporcionado coincida con la estructura de las cadenas de entrada.

Formateo de fechas y timestamps

Para convertir fechas y timestamps a cadenas con un formato específico, se utiliza date_format:

from pyspark.sql.functions import date_format

df_formateado = df.withColumn("fecha_en_texto", date_format("fecha_columna", "yyyy-MM-dd"))

Esta función permite representar fechas en el formato deseado, facilitando su presentación o integración con otros sistemas.

Extracción de componentes temporales

PySpark ofrece una serie de funciones para extraer elementos específicos de fechas y timestamps. Algunas de las más utilizadas son:

  • year("fecha_columna"): extrae el año.
  • month("fecha_columna"): obtiene el mes.
  • dayofmonth("fecha_columna"): retorna el día del mes.
  • hour("timestamp_columna"): extrae la hora.
  • minute("timestamp_columna"): obtiene los minutos.
  • second("timestamp_columna"): retorna los segundos.

Por ejemplo, para obtener el año y mes de una fecha:

from pyspark.sql.functions import year, month

df_componentes = df.withColumn("año", year("fecha_columna")) \
                   .withColumn("mes", month("fecha_columna"))

Esta extracción es útil para realizar agrupaciones o filtrados basados en componentes temporales.

Operaciones aritméticas con fechas y timestamps

Para efectuar cálculos de diferencias o agregar intervalos de tiempo, PySpark proporciona funciones como datediff, date_add y date_sub.

Para calcular la diferencia en días entre dos fechas:

from pyspark.sql.functions import datediff

df_diferencia = df.withColumn("dias_diferencia", datediff("fecha_final", "fecha_inicial"))

Para sumar o restar días a una fecha:

from pyspark.sql.functions import date_add, date_sub

df_suma = df.withColumn("fecha_mas_7_dias", date_add("fecha_columna", 7))
df_resta = df.withColumn("fecha_menos_7_dias", date_sub("fecha_columna", 7))

Estas operaciones permiten manipular fechas para ajustes temporales o cálculos de intervalos.

Manejo de timestamps y zonas horarias

Al trabajar con timestamps, es importante considerar las zonas horarias. Por defecto, PySpark utiliza la zona horaria UTC. Para cambiar la zona horaria de un timestamp, se puede usar from_utc_timestamp y to_utc_timestamp:

from pyspark.sql.functions import from_utc_timestamp, to_utc_timestamp

df_zona_horaria = df.withColumn("timestamp_local", from_utc_timestamp("timestamp_columna", "Europe/Madrid"))

Esta función convierte un timestamp en UTC a la zona horaria especificada, ajustando la hora local adecuadamente.

Funciones actuales de fecha y hora

Para obtener la fecha o timestamp actual, PySpark ofrece current_date y current_timestamp:

from pyspark.sql.functions import current_date, current_timestamp

df_actual = df.withColumn("fecha_actual", current_date()) \
              .withColumn("timestamp_actual", current_timestamp())

Estas funciones son útiles para operaciones que requieren el tiempo presente, como calcular antigüedades o marcar registros con la fecha de procesamiento.

Truncado y redondeo de fechas

En ocasiones, es necesario truncar una fecha a una unidad temporal específica, eliminando componentes de menor precisión. La función date_trunc permite truncar timestamps a niveles como año, mes o día:

from pyspark.sql.functions import date_trunc

df_truncado = df.withColumn("inicio_mes", date_trunc("month", "fecha_columna"))

Este ejemplo establece la fecha al inicio del mes, fijando el día al primero y las horas a cero.

Manipulación de fechas complejas

Para operaciones más avanzadas, como añadir meses o calcular la diferencia en meses entre fechas, se utilizan add_months y months_between:

from pyspark.sql.functions import add_months, months_between

df_meses = df.withColumn("fecha_futura", add_months("fecha_columna", 6)) \
             .withColumn("meses_diferencia", months_between("fecha_final", "fecha_inicial"))

Estas funciones facilitan el manejo de periodos mensuales, considerando la variabilidad en la duración de los meses.

Manejo de formatos personalizados

Cuando se trabaja con formatos de fecha y hora no estándar, es posible definir patrones personalizados utilizando símbolos de formato según las convenciones de Java SimpleDateFormat.

Por ejemplo, para convertir una cadena con formato complejo:

df_personalizado = df.withColumn("fecha_personalizada", to_date("fecha_cadena", "dd-MMM-yyyy HH.mm.ss.SSS"))

Es esencial que el patrón de formato coincida exactamente con la estructura de la cadena, incluyendo separadores y especificadores de fecha y hora.

Uso de UDFs con fechas y timestamps

Aunque PySpark ofrece una amplia gama de funciones integradas, en ocasiones puede ser necesario definir funciones definidas por el usuario (UDFs) para operaciones específicas. Sin embargo, es recomendable priorizar las funciones nativas de PySpark para optimizar el rendimiento.

Si es imprescindible usar una UDF:

from pyspark.sql.functions import udf
from pyspark.sql.types import DateType

def mi_funcion_personal(fecha):
    # Lógica personalizada aquí
    return nueva_fecha

mi_udf = udf(mi_funcion_personal, DateType())

df_udf = df.withColumn("fecha_modificada", mi_udf("fecha_columna"))

Se debe tener en cuenta que las UDFs pueden impactar en el rendimiento, ya que las operaciones salen del ámbito optimizado de PySpark.

Consideraciones sobre el rendimiento

El manejo eficiente de fechas y timestamps implica utilizar las funciones integradas siempre que sea posible. Estas funciones están optimizadas para ejecutarse en paralelo y aprovechan el Catalyst Optimizer de PySpark, mejorando la performance de las operaciones.

También es importante asegurar que los tipos de datos de las columnas sean correctos. Antes de realizar operaciones, verificar el esquema del DataFrame:

df.printSchema()

Si las columnas de fecha aparecen como StringType, es recomendable convertirlas al tipo adecuado (DateType o TimestampType) para evitar errores y mejorar el rendimiento.

Tratamiento de datos faltantes y valores nulos

Al trabajar con datos complejos en PySpark, es esencial manejar adecuadamente los valores faltantes y nulos para garantizar la integridad y calidad del análisis. Los valores nulos pueden aparecer en cualquier tipo de dato, incluidas estructuras anidadas, arrays y mapas, lo que requiere técnicas específicas para su tratamiento en cada caso.

Para identificar los valores nulos en un DataFrame, se pueden utilizar las funciones isNull e isNotNull junto con operaciones de filtrado. Por ejemplo, para filtrar registros donde el campo direccion.ciudad es nulo:

from pyspark.sql.functions import col

df_nulos = df.filter(col("direccion.ciudad").isNull())

En el caso de arrays y mapas, la detección de valores nulos puede realizarse mediante funciones de orden superior como exists. Si se desea encontrar registros donde al menos un elemento del array telefonos es nulo:

from pyspark.sql.functions import exists

df_telefonos_nulos = df.filter(exists("telefonos", lambda x: x.isNull()))

Para el tratamiento de valores faltantes, PySpark ofrece el objeto DataFrameNaFunctions a través de df.na, que proporciona métodos como drop, fill y replace. Para eliminar filas que contienen valores nulos en columnas específicas:

df_sin_nulos = df.na.drop(subset=["nombre", "edad"])

Si se prefiere reemplazar los valores nulos por un valor específico, se puede utilizar fill. Por ejemplo, para reemplazar nulos en la columna edad por cero y en ciudad por "Desconocido":

df_rellenado = df.na.fill({"edad": 0, "ciudad": "Desconocido"})

Cuando se trata de estructuras anidadas, es posible acceder y reemplazar valores nulos en campos específicos utilizando la notación de punto. Para reemplazar los valores nulos en direccion.codigo_postal por "00000":

from pyspark.sql.functions import col, when, lit

df_rellenado = df.withColumn(
    "direccion",
    when(
        col("direccion.codigo_postal").isNull(),
        col("direccion").withField("codigo_postal", lit("00000"))
    ).otherwise(col("direccion"))
)

En arrays, se pueden utilizar funciones de orden superior como transform para manejar valores nulos dentro de los elementos. Por ejemplo, para reemplazar valores nulos en un array de números por la media de los elementos no nulos:

from pyspark.sql.functions import transform, avg, lit, array

media_valores = df_empleados.select(explode("numeros").alias("num")).agg(avg("num")).first()[0]

df_array_rellenado = df.withColumn(
    "numeros_rellenados",
    when(col("numeros").isNotNull(), transform("numeros", lambda x: when(x.isNotNull(), x).otherwise(lit(media_valores))))
    .otherwise(array(lit(media_valores))))

Para mapas, la función transform_values permite reemplazar valores nulos dentro de los valores del mapa. Por ejemplo, para reemplazar valores nulos por "No disponible":

from pyspark.sql.functions import transform_values, lit

df_mapa_rellenado = df.withColumn(
    "atributos_rellenados",
     when(col("atributos").isNotNull(),
    transform_values("atributos", lambda k, v: when(v.isNotNull(), v).otherwise(lit("No disponible"))))
)

En el contexto del aprendizaje automático, es común utilizar la clase Imputer de pyspark.ml.feature para imputar valores faltantes en columnas numéricas. Para reemplazar los valores nulos de la columna ingresos con la mediana:

from pyspark.ml.feature import Imputer

imputador = Imputer(
    inputCols=["ingresos"],
    outputCols=["ingresos_imputados"],
    strategy="median"
)

modelo_imputador = imputador.fit(df)
df_imputado = modelo_imputador.transform(df)

Es importante destacar que el Imputer funciona únicamente con columnas numéricas de tipo DoubleType o FloatType. Para utilizarlo en arrays de números, se requiere transformar el array en columnas individuales, aplicar el imputador y luego reconvertirlas en un array.

Al manipular fechas y timestamps con valores nulos, se aplican las mismas estrategias. Para reemplazar valores nulos en una columna de fechas por una fecha específica:

from pyspark.sql.functions import to_date, lit

fecha_defecto = "2024-01-01"
df_fechas_rellenadas = df.fillna({"fecha_evento": fecha_defecto})

Cuando se realizan agregaciones, es crucial considerar cómo los valores nulos pueden afectar los resultados. Por defecto, las funciones de agregación como sum y avg ignoran los valores nulos. Sin embargo, si se desea contar los valores nulos, se puede utilizar count y countDistinct junto con expresiones condicionales:

from pyspark.sql.functions import sum, when, count

total_nulos = df.select(
    sum(when(col("valor").isNull(), 1).otherwise(0)).alias("nulos_en_valor")
)

Para visualizar la distribución de valores nulos en todo el DataFrame, la función summary proporciona estadísticas descriptivas que ayudan a identificar columnas con valores faltantes:

df.summary().show()

En situaciones donde los valores nulos son significativos para el análisis, se pueden etiquetar o marcar utilizando funciones como when y otherwise. Por ejemplo, para crear una nueva columna que indique si el valor es nulo:

from pyspark.sql.functions import when

df_etiquetado = df.withColumn(
    "es_nulo",
    when(col("cantidad").isNull(), True).otherwise(False)
)

Es recomendable adoptar una estrategia coherente para el manejo de datos faltantes desde el inicio del proceso de análisis, evitando problemas en etapas posteriores como el entrenamiento de modelos o la generación de informes. Esto incluye documentar las decisiones tomadas y los métodos utilizados para garantizar la reproducibilidad y comprensión del flujo de trabajo.

Finalmente, es esencial considerar el impacto de los datos faltantes en el contexto específico del proyecto, evaluando si es más apropiado eliminar registros, imputar valores o aplicar técnicas más avanzadas como modelos de imputación múltiple, siempre alineado con los objetivos del análisis y las características de los datos complejos.

Aprende PySpark GRATIS online

Todas las lecciones de PySpark

Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Accede GRATIS a PySpark y certifícate

En esta lección

Objetivos de aprendizaje de esta lección

  • Dominar el uso de StructType, ArrayType y MapType en PySpark.
  • Aplicar funciones para acceder y manipular datos anidados.
  • Utilizar explode y flatten para transformar y aplanar estructuras.
  • Implementar funciones de orden superior como map, filter y aggregate.
  • Realizar operaciones avanzadas con fechas y timestamps.
  • Gestionar valores nulos y ausentes en datos complejos.