PySpark

Tutorial PySpark: Preparación de datos para MLlib

En esta lección de PySpark, aprende cómo manejar valores faltantes con MLlib, mejorando tus modelos de Machine Learning de manera eficiente con Apache Spark.

Aprende PySpark GRATIS y certifícate

Limpieza de valores faltantes

La limpieza de valores faltantes es un paso esencial en la preparación de datos para Machine Learning, ya que los valores ausentes pueden afectar negativamente el rendimiento de los modelos y conducir a conclusiones erróneas.

En PySpark MLlib, existen diversas técnicas para manejar los valores faltantes en un DataFrame. Una de las más comunes es el imputado, que consiste en reemplazar los valores nulos con estimaciones apropiadas.

Por ejemplo, supongamos que tenemos un DataFrame con valores faltantes en la columna edad:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LimpiezaValoresFaltantes").getOrCreate()

data = [("Ana", 25), ("Luis", None), ("María", 29), ("Juan", None)]
columns = ["nombre", "edad"]

df = spark.createDataFrame(data, columns)
df.show()

Para imputar los valores faltantes con la media de la columna edad, utilizamos el transformador Imputer de MLlib:

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=["edad"], outputCols=["edad_imputada"]).setStrategy("mean")
df_imputado = imputer.fit(df).transform(df)
df_imputado.show()

Esta técnica reemplaza los valores nulos por la media de los valores existentes, permitiendo mantener la integridad del conjunto de datos.

Otra opción es eliminar las filas con valores faltantes usando el método dropna():

df_sin_nulos = df.dropna()
df_sin_nulos.show()

Si bien esta estrategia elimina los valores nulos, también puede reducir significativamente el tamaño del conjunto de datos, lo cual podría no ser deseable si se pierden muestras importantes.

Para casos en los que se quiera reemplazar los valores faltantes por un valor constante, como cero o una cadena específica, se utiliza el método fillna():

# Reemplazar valores nulos en columnas numéricas
df_relleno_num = df.fillna({'edad': 0})
df_relleno_num.show()

# Reemplazar valores nulos en columnas de tipo string
df_relleno_str = df.fillna({'nombre': 'Desconocido'})
df_relleno_str.show()

Es importante destacar que el método de limpieza elegido debe ser coherente con la naturaleza de los datos y el objetivo del análisis. Por ejemplo, utilizar la mediana en lugar de la media puede ser más apropiado en distribuciones sesgadas:

imputer_mediana = Imputer(inputCols=["edad"], outputCols=["edad_imputada"]).setStrategy("median")
df_imputado_mediana = imputer_mediana.fit(df).transform(df)
df_imputado_mediana.show()

Además, conviene realizar un análisis exploratorio previo para entender la distribución de los valores faltantes y decidir la mejor estrategia. Esto puede incluir calcular el porcentaje de valores nulos por columna o visualizar patrones de ausencia de datos.

Ingeniería de características y encoding

La ingeniería de características es un paso crucial en el proceso de Machine Learning, ya que las características adecuadas pueden mejorar sustancialmente el rendimiento de los modelos. En PySpark MLlib, disponemos de diversas herramientas para transformar y codificar los datos de manera eficiente y escalable.

Una tarea común es el manejo de variables categóricas, que deben ser convertidas a un formato numérico para ser interpretadas por los algoritmos de Machine Learning. Para ello, se utiliza el encoding de variables categóricas.

Una herramienta fundamental es StringIndexer, que asigna un índice numérico a cada categoría de una columna string:

from pyspark.ml.feature import StringIndexer

data = [("rojo",), ("azul",), ("verde",), ("azul",), ("rojo",)]
columns = ["color"]
df = spark.createDataFrame(data, columns)

indexer = StringIndexer(inputCol="color", outputCol="color_indexed")
df_indexed = indexer.fit(df).transform(df)
df_indexed.show()

En este ejemplo, hemos convertido la columna categórica color en una columna numérica color_indexed mediante StringIndexer. Sin embargo, los índices numéricos pueden sugerir un orden inexistente en las categorías. Para evitar este problema, se utiliza OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCols=["color_indexed"], outputCols=["color_vector"])
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
df_encoded.show()

Ahora, la columna color_vector representa las categorías como vectores one-hot, eliminando cualquier interpretación ordinal.

Otra técnica útil es la creación de features polinómicas para capturar relaciones no lineales:

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

data = [(Vectors.dense([1.0, 2.0]),),
        (Vectors.dense([0.5, 1.5]),),
        (Vectors.dense([1.5, 1.0]),)]
columns = ["features"]

df_poly = spark.createDataFrame(data, columns)

polyExpansion = PolynomialExpansion(degree=2, inputCol="features", outputCol="poly_features")
df_expanded = polyExpansion.transform(df_poly)
df_expanded.show(truncate=False)

Aquí, hemos expandido las características originales para incluir términos polinómicos de grado 2, potenciando la capacidad del modelo para capturar relaciones complejas.

Para procesar texto, PySpark MLlib ofrece el transformador Tokenizer, que convierte una columna de texto en palabras individuales:

from pyspark.ml.feature import Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Hola, bienvenidos al curso de PySpark."),
    (1, "La ingeniería de características es fundamental.")
], ["id", "frase"])

tokenizer = Tokenizer(inputCol="frase", outputCol="palabras")
tokenized = tokenizer.transform(sentenceData)
tokenized.show(truncate=False)

Si queremos eliminar palabras vacías (stop words), utilizamos StopWordsRemover:

from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="palabras", outputCol="palabras_filtradas")
filtered = remover.transform(tokenized)
filtered.select("id", "palabras_filtradas").show(truncate=False)

Además, para convertir las palabras en una representación numérica, se emplea CountVectorizer o TF-IDF mediante HashingTF y IDF:

from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="palabras_filtradas", outputCol="rawFeatures")
featurizedData = hashingTF.transform(filtered)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("id", "features").show(truncate=False)

La representación TF-IDF asigna pesos a las palabras basados en su frecuencia en el documento y en el corpus, proporcionando una medida más informativa para los modelos de Machine Learning.

Para combinar múltiples características en un solo vector de características, se utiliza VectorAssembler:

from pyspark.ml.feature import VectorAssembler

df_final = df_encoded.withColumn("feature1", df_encoded["color_indexed"] * 2)
assembler = VectorAssembler(inputCols=["color_vector", "feature1"], outputCol="features")
output = assembler.transform(df_final)
output.select("features").show(truncate=False)

La capacidad de combinar múltiples columnas en una única columna de características es esencial para preparar los datos antes de entrenar un modelo.

Es importante tener en cuenta que todos estos transformadores pueden ser encadenados en un Pipeline, lo cual facilita la aplicación secuencial de múltiples transformaciones y mantiene un proceso reproducible y ordenado.

Escalado y normalización de datos

El escalado y la normalización de datos son procesos esenciales en el preprocesamiento de datos para Machine Learning. Muchos algoritmos son sensibles a la escala de las características y pueden rendir mejor cuando los datos están adecuadamente escalados o normalizados.

En PySpark MLlib, disponemos de varios transformadores que facilitan estas tareas. Uno de los más comunes es StandardScaler, que estandariza las características restando la media y dividiendo por la desviación estándar, produciendo una distribución con media cero y varianza uno.

Por ejemplo, supongamos que tenemos un DataFrame con características numéricas:

from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("EscaladoNormalizacion").getOrCreate()

data = [(0, Vectors.dense([1.0, 0.5, -1.0])),
        (1, Vectors.dense([2.0, 1.0, 1.0])),
        (2, Vectors.dense([4.0, 10.0, 2.0]))]

columns = ["id", "características"]
df = spark.createDataFrame(data, columns)
df.show()

Para aplicar StandardScaler, procedemos de la siguiente manera:

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="características", outputCol="características_escaladas", withMean=True, withStd=True)
scalerModel = scaler.fit(df)
df_escalado = scalerModel.transform(df)
df_escalado.select("características_escaladas").show(truncate=False)

Con withMean=True y withStd=True, estamos centrándonos y escalando los datos para obtener una distribución estandarizada.

Otra opción es utilizar MinMaxScaler, que escala los datos a un rango específico, generalmente entre 0 y 1. Esto es útil cuando se desea conservar la interpretación de los datos y mantener la proporcionalidad entre las variables:

from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="características", outputCol="características_minmax")
scalerModel = scaler.fit(df)
df_minmax = scalerModel.transform(df)
df_minmax.select("características_minmax").show(truncate=False)

El resultado es que cada característica se ajusta al rango definido, permitiendo que todas las variables contribuyan de manera equitativa en los algoritmos sensibles a la escala.

Si estamos trabajando con datos donde las variables pueden tener valores enormes o muy pequeños, MaxAbsScaler es conveniente, ya que escala cada característica dividiéndola por su valor absoluto máximo, manteniendo los datos originales en el rango [-1, 1]:

from pyspark.ml.feature import MaxAbsScaler

scaler = MaxAbsScaler(inputCol="características", outputCol="características_maxabs")
scalerModel = scaler.fit(df)
df_maxabs = scalerModel.transform(df)
df_maxabs.select("características_maxabs").show(truncate=False)

Además del escalado, la normalización es otro proceso que ajusta la magnitud de los vectores para que tengan una norma unitaria. Esto es particularmente útil en algoritmos como regresión logística o redes neuronales, donde es importante que las características estén en una escala similar.

Para normalizar los datos, utilizamos el transformador Normalizer:

from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="características", outputCol="características_normalizadas", p=2.0)
df_normalizado = normalizer.transform(df)
df_normalizado.select("características_normalizadas").show(truncate=False)

En este ejemplo, utilizamos la norma L² (p=2.0) para normalizar los vectores, aunque también es posible utilizar otras normas, como L¹ (p=1.0).

Es importante comprender cuándo aplicar escalado y cuándo normalización. El escalado ajusta la distribución de los datos, mientras que la normalización ajusta las magnitudes de los vectores de características. La elección depende del algoritmo que se vaya a utilizar y de la naturaleza de los datos.

Para incorporar estos procesos en un flujo de trabajo reproducible, es recomendable utilizar un Pipeline de MLlib. Esto permite concatenar múltiples transformaciones y aplicarlas secuencialmente:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["características"], outputCol="features")
standardScaler = StandardScaler(inputCol="features", outputCol="features_escaladas", withMean=True, withStd=True)
pipeline = Pipeline(stages=[assembler, standardScaler])

model = pipeline.fit(df)
df_transformado = model.transform(df)
df_transformado.select("features_escaladas").show(truncate=False)

Además, al guardar el modelo entrenado, también se guardan los parámetros de escalado y normalización, lo que facilita la reproducción de resultados en nuevos datos.

Es crucial aplicar el mismo escalado y normalización tanto a los datos de entrenamiento como a los de prueba para asegurar la consistencia del modelo. Por lo tanto, siempre se debe ajustar el transformador en los datos de entrenamiento y luego aplicarlo a otros conjuntos de datos.

Selección y extracción de características

La selección y extracción de características es un paso fundamental en el aprendizaje automático, ya que permite reducir la dimensionalidad de los datos, mejorar el rendimiento de los modelos y facilitar su interpretación. En PySpark MLlib, existen diversas herramientas que ayudan a identificar y seleccionar las características más relevantes de un conjunto de datos.

Una técnica común de selección de características es el uso de pruebas estadísticas para evaluar la relación entre las características y la variable objetivo. El selector ChiSqSelector utiliza la prueba de chi-cuadrado para seleccionar las características más significativas en problemas de clasificación categórica.

Por ejemplo, supongamos que tenemos un DataFrame con características discretas y una etiqueta de clase:

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.appName("SeleccionCaracteristicas").getOrCreate()

data = [(Vectors.dense([0, 0, 1, 0, 2]), 0.0),
        (Vectors.dense([0, 1, 0, 1, 2]), 1.0),
        (Vectors.dense([1, 0, 1, 0, 2]), 1.0),
        (Vectors.dense([0, 1, 0, 1, 2]), 0.0)]

columns = ["features", "label"]
df = spark.createDataFrame(data, columns)
df.show()

Aplicamos ChiSqSelector para seleccionar las tres características más relevantes:

from pyspark.ml.feature import ChiSqSelector

selector = ChiSqSelector(numTopFeatures=3, featuresCol="features", outputCol="selectedFeatures", labelCol="label")
result = selector.fit(df).transform(df)
result.show()

El resultado es un nuevo conjunto de características en la columna selectedFeatures, que contiene solo las características seleccionadas.

Otra opción es utilizar VectorSlicer, que permite seleccionar características por sus índices. Esto es útil cuando sabemos de antemano cuáles son las características más importantes:

from pyspark.ml.feature import VectorSlicer

slicer = VectorSlicer(inputCol="features", outputCol="selectedFeatures", indices=[1, 4])
output = slicer.transform(df)
output.show()

En este caso, se han seleccionado las características en los índices 1 y 4.

Para problemas de regresión o cuando las características son continuas, el método UnivariateFeatureSelector ofrece una selección basada en pruebas univariadas, como el estadístico F o la correlación de Pearson:

from pyspark.ml.feature import UnivariateFeatureSelector

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures", labelCol="label",
                                     selectionMode="numTopFeatures")

selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(2)
result = selector.fit(df).transform(df)
result.show()

Aquí, selectionMode="numTopFeatures" indica que se desean las 2 características más significativas.

En cuanto a la extracción de características, MLlib proporciona herramientas para transformar datos en nuevas representaciones más útiles. Un ejemplo es RFormula, que permite especificar relaciones entre variables utilizando una notación similar a la de lenguajes estadísticos.

Utilizando RFormula, se pueden generar automáticamente características basadas en interacciones y transformaciones:

from pyspark.ml.feature import RFormula

formula = RFormula(formula="label ~ . + features:features")
output = formula.fit(df).transform(df)
output.select("features", "label", "features:features").show()

El método RFormula crea nuevas características al combinar las existentes, lo cual es útil para capturar relaciones no lineales.

Además, para datos textuales, se puede utilizar Word2Vec para representar palabras como vectores densos en un espacio vectorial, capturando relaciones semánticas:

from pyspark.ml.feature import Word2Vec

documentDF = spark.createDataFrame([
    ("La selección de características es esencial".split(" "),),
    ("PySpark ofrece herramientas avanzadas".split(" "),)
], ["text"])

word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
result.show(truncate=False)

La columna result contiene los vectores que representan cada documento, obtenidos mediante la media de los vectores de las palabras.

Otra técnica relevante es el uso de modelos de árbol para extraer la importancia de las características. Los modelos de ensamble como RandomForestClassifier proporcionan métricas que indican cuánto contribuye cada característica al modelo:

from pyspark.ml.classification import RandomForestClassifier

# Suponiendo que ya tenemos un DataFrame 'df' con 'features' y 'label'
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)
model = rf.fit(df)
importancias = model.featureImportances
print("Importancia de las características:", importancias)

De esta forma, se pueden identificar las características con mayor peso en la predicción y considerar eliminar las menos relevantes para simplificar el modelo.

Es crucial tener en cuenta que la selección de características debe realizarse de manera cuidadosa, ya que eliminar variables relevantes puede degradar el rendimiento del modelo. Por ello, es recomendable combinar diferentes métodos y validar los resultados mediante técnicas como la validación cruzada.

Además, la extracción de características permite crear nuevas representaciones de los datos que pueden facilitar la detección de patrones. Por ejemplo, aplicar transformaciones estadísticas o emplear técnicas de embedding puede revelar relaciones ocultas en los datos.

Manejo de datos desbalanceados

El manejo de datos desbalanceados es un desafío común en el aprendizaje automático, especialmente en problemas de clasificación donde una o más clases están subrepresentadas. Un conjunto de datos desbalanceado puede conducir a modelos que tienen un buen rendimiento global pero que fallan en predecir correctamente las clases minoritarias, lo que es crítico en aplicaciones como la detección de fraudes o diagnósticos médicos.

En PySpark MLlib, disponemos de varias técnicas para abordar el desbalanceo de clases y mejorar el rendimiento de los modelos en las clases minoritarias.

Submuestreo y sobremuestreo

Una estrategia fundamental es ajustar la proporción de clases mediante submuestreo (undersampling) de la clase mayoritaria o sobremuestreo (oversampling) de la clase minoritaria.

Submuestreo de la clase mayoritaria

El submuestreo reduce el número de muestras de la clase mayoritaria para equilibrar el conjunto de datos. Aunque sencillo, puede conducir a la pérdida de información valiosa. En PySpark, podemos realizar submuestreo utilizando sampleBy:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Desbalanceo").getOrCreate()

# Supongamos un DataFrame con una columna 'label' que indica la clase
data = [(0, 'A'), (1, 'A'), (2, 'A'), (3, 'B'), (4, 'B')]
df = spark.createDataFrame(data, ['id', 'label'])

# Calcular las fracciones para el submuestreo
fractions = df.groupby('label').count().withColumnRenamed('count', 'total').collect()
total_counts = {row['label']: row['total'] for row in fractions}

# Suponiendo que la clase 'A' es mayoritaria
fraction_A = total_counts['B'] / total_counts['A']
df_submuestreado = df.sampleBy('label', {'A': fraction_A, 'B': 1.0}, seed=42)
df_submuestreado.groupBy('label').count().show()

En este ejemplo, hemos reducido el número de muestras de la clase 'A' para igualar el número de muestras de la clase 'B'.

Sobremuestreo de la clase minoritaria

El sobremuestreo incrementa el número de muestras de la clase minoritaria, repitiendo instancias o sintetizando nuevas muestras. Aunque puede generar overfitting, es útil para lograr un balance. Para replicar instancias:

from pyspark.sql.functions import col

# Filtrar las muestras de la clase minoritaria
df_minoritario = df.filter(col('label') == 'B')

# Determinar cuántas veces replicar
replicas = int(total_counts['A'] / total_counts['B']) - 1

# Crear múltiples copias y unirlas
df_sobremuestreado = df_minoritario
for _ in range(replicas):
    df_sobremuestreado = df_sobremuestreado.union(df_minoritario)

df_balanceado = df.union(df_sobremuestreado)
df_balanceado.groupBy('label').count().show()

Ahora, ambas clases tienen un número similar de muestras, lo que facilita el entrenamiento del modelo.

Técnicas avanzadas: SMOTE

El Synthetic Minority Over-sampling Technique (SMOTE) genera nuevas muestras sintéticas de la clase minoritaria creando combinaciones lineales de vecinos cercanos. Aunque PySpark MLlib no incluye una implementación nativa de SMOTE, podemos utilizar bibliotecas externas como spark_ml_ext o implementar una versión personalizada.

Ejemplo utilizando pyspark.ml y pyspark.sql:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import rand

# Suponiendo un DataFrame 'df_minoritario' con características en 'features'
assembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol='features')
df_min_feat = assembler.transform(df_minoritario)

# Generar muestras sintéticas (simplificado)
def generar_sinteticas(df, numero_muestras):
    muestras = df.collect()
    sinteticas = []
    for _ in range(numero_muestras):
        idx1, idx2 = rand(seed=42).between(0, len(muestras)-1), rand(seed=42).between(0, len(muestras)-1)
        vec1, vec2 = muestras[idx1]['features'], muestras[idx2]['features']
        nueva_muestra = Vectors.dense([(a + b)/2 for a, b in zip(vec1, vec2)])
        sinteticas.append((nueva_muestra, muestras[0]['label']))
    return spark.createDataFrame(sinteticas, ['features', 'label'])

nuevas_muestras = generar_sinteticas(df_min_feat, total_counts['A'] - total_counts['B'])
df_sintetico = df_min_feat.union(nuevas_muestras)
df_balanceado = df_sintetico.union(assembler.transform(df.filter(col('label') == 'A')))

Esta implementación simplificada ilustra el concepto, aunque para entornos de producción se recomienda utilizar librerías especializadas.

Ajuste de pesos en el modelo

Otra estrategia es ajustar los pesos de las clases en el algoritmo de aprendizaje para penalizar más los errores en la clase minoritaria. En MLlib, algunos algoritmos permiten especificar pesos o modificar la función de pérdida.

Por ejemplo, en LogisticRegression podemos utilizar la opción weightCol:

from pyspark.sql.functions import when
from pyspark.ml.classification import LogisticRegression

# Añadir una columna de pesos
df_pesado = df.withColumn('peso', when(col('label') == 'A', 1.0).otherwise(2.0))

# Entrenar el modelo con la columna de pesos
lr = LogisticRegression(featuresCol='features', labelCol='label', weightCol='peso')
modelo = lr.fit(df_pesado)

Al especificar weightCol, el modelo considerará los pesos durante el entrenamiento, mejorando la sensibilidad a la clase minoritaria.

Uso de métricas apropiadas

Es importante utilizar métricas de evaluación que reflejen el rendimiento en clases desbalanceadas, como la precisión por clase, la puntuación F1 o el área bajo la curva ROC. Aunque la descripción detallada de estas métricas se aborda en secciones posteriores, es fundamental tenerlas en cuenta al manejar datos desbalanceados.

Técnicas de ensemble y umbrales

Los modelos de ensemble, como Random Forest o Gradient-Boosted Trees, pueden ofrecer mejoras en conjuntos desbalanceados.

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=100)
modelo_rf = rf.fit(df_balanceado)

Modificar el umbral permite equilibrar la tasa de verdaderos positivos y la tasa de falsos positivos según las necesidades del problema.

Pipeline de preprocesamiento

Integrar estas técnicas en un Pipeline de MLlib asegura un flujo reproducible y escalable:

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, lr])
modelo = pipeline.fit(df_pesado)

Al incluir el preprocesamiento, el balanceo y el modelo en el pipeline, garantizamos que las transformaciones se apliquen de manera consistente en nuevos datos.

Pipeline de preprocesamiento de datos

El uso de un pipeline de preprocesamiento de datos en PySpark MLlib permite encadenar múltiples transformaciones de manera secuencial y sistemática. Un pipeline facilita la reproducibilidad, la organización y el mantenimiento del proceso de preparación de los datos para Machine Learning.

En PySpark, un Pipeline es un objeto que consiste en una serie de etapas (stages), donde cada etapa es un transformador (Transformer) o un estimador (Estimator). Los transformadores toman un DataFrame y producen otro DataFrame, mientras que los estimadores aprenden de los datos para producir un modelo que luego se puede usar como transformador.

Por ejemplo, supongamos que tenemos un conjunto de datos que requiere varios pasos de preprocesamiento:

  1. Imputación de valores faltantes utilizando la media.
  2. Codificación de variables categóricas con StringIndexer y OneHotEncoder.
  3. Escalado de características con StandardScaler.
  4. Selección de características relevantes.

Para implementar este proceso en un pipeline, comenzamos importando las clases necesarias:

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, ChiSqSelector

Creamos una sesión de Spark:

spark = SparkSession.builder.appName("PipelinePreprocesamiento").getOrCreate()

Supongamos que nuestro DataFrame inicial es:

data = [
    (1, "A", 34.0, None),
    (2, "B", None, 45.0),
    (3, "A", 29.0, 54.0),
    (4, "C", 40.0, 65.0),
    (5, None, 36.0, 52.0)
]
columns = ["id", "categoria", "edad", "ingresos"]
df = spark.createDataFrame(data, columns)
df.show()

Paso 1: Imputación de valores faltantes

Utilizamos Imputer para rellenar los valores nulos en las columnas numéricas:

imputer = Imputer(
    inputCols=["edad", "ingresos"],
    outputCols=["edad_imputada", "ingresos_imputados"]
)

Paso 2: Codificación de variables categóricas

Primero, convertimos las cadenas categóricas en índices numéricos con StringIndexer:

indexer = StringIndexer(
    inputCol="categoria", 
    outputCol="categoria_indexada",
    handleInvalid="keep"
)

Luego, aplicamos OneHotEncoder para obtener vectores one-hot:

encoder = OneHotEncoder(
    inputCols=["categoria_indexada"],
    outputCols=["categoria_codificada"]
)

Paso 3: Ensamblaje de características

Utilizamos VectorAssembler para combinar las columnas en un único vector de características:

assembler = VectorAssembler(
    inputCols=["edad_imputada", "ingresos_imputados", "categoria_codificada"],
    outputCol="características"
)

Paso 4: Escalado de características

Aplicamos StandardScaler para estandarizar las características numéricas:

scaler = StandardScaler(
    inputCol="características",
    outputCol="características_escaladas",
    withMean=True,
    withStd=True
)

Construcción del Pipeline

Creamos el pipeline agregando las etapas en el orden adecuado:

pipeline = Pipeline(stages=[imputer, indexer, encoder, assembler, scaler])

Ejecutar el Pipeline

Ajustamos el pipeline a los datos y transformamos el DataFrame:

modelo = pipeline.fit(df)
df_preprocesado = modelo.transform(df)
df_preprocesado.select("id", "características_escaladas").show(truncate=False)

El resultado es un DataFrame que contiene las características preprocesadas y listas para ser utilizadas en un modelo de Machine Learning. El uso del pipeline garantiza que todos los pasos se apliquen de manera consistente y reproducible.

Ventajas de utilizar Pipelines

  • Reproducibilidad: Al encapsular todos los pasos en un pipeline, podemos aplicar el mismo procesamiento a los nuevos datos de manera consistente.
  • Organización: Estructurar el preprocesamiento en etapas claras mejora la legibilidad y el mantenimiento del código.
  • Integración con Modelos: Los pipelines pueden incluir estimadores como modelos de regresión o clasificación, permitiendo que el entrenamiento y el preprocesamiento se realicen en un solo flujo.

Incorporación de un modelo al Pipeline

Podemos extender el pipeline para incluir un modelo de aprendizaje. Por ejemplo, añadir un RandomForestClassifier:

from pyspark.ml.classification import RandomForestClassifier

clasificador = RandomForestClassifier(
    featuresCol="características_escaladas",
    labelCol="id",
    predictionCol="predicción"
)

pipeline_completo = Pipeline(stages=[imputer, indexer, encoder, assembler, scaler, clasificador])

Ajustamos el pipeline completo:

modelo_completo = pipeline_completo.fit(df)
resultados = modelo_completo.transform(df)
resultados.select("id", "predicción").show()

Ahora, hemos construido un pipeline que incluye tanto el preprocesamiento como el entrenamiento del modelo, simplificando el flujo de trabajo.

Guardar y cargar el Pipeline

Es posible guardar el pipeline entrenado para su uso futuro:

modelo_completo.save("ruta/a/guardar/modelo_pipeline")

Y cargarlo posteriormente:

from pyspark.ml.pipeline import PipelineModel

modelo_cargado = PipelineModel.load("ruta/a/guardar/modelo_pipeline")

Esto facilita la persistencia de los modelos y su despliegue en entornos productivos.

Manejo de datos de prueba

Al aplicar el pipeline a datos de prueba, es importante utilizar el mismo pipeline entrenado para asegurar la consistencia en el preprocesamiento:

df_prueba = spark.createDataFrame([
    (6, "B", 28.0, None),
    (7, "A", None, 58.0)
], columns)

predicciones = modelo_completo.transform(df_prueba)
predicciones.select("id", "predicción").show()

De esta manera, garantizamos que los datos de prueba se procesan con los mismos parámetros que los datos de entrenamiento.

Aprende PySpark GRATIS online

Todas las lecciones de PySpark

Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Accede GRATIS a PySpark y certifícate

En esta lección

Objetivos de aprendizaje de esta lección

  • Comprender la importancia de tratar valores nulos en Machine Learning.
  • Implementar imputación de valores faltantes con MLlib.
  • Aplicar el método de eliminación de filas con dropna().
  • Utilizar fillna() para sustituir valores ausentes.