PySpark

Tutorial PySpark: Regresión con MLlib

Aprende a implementar regresiones de machine learning con PySpark MLlib. Optimiza tu procesamiento de datos distribuido y mejora tus predicciones con Apache Spark en Python.

Aprende PySpark GRATIS y certifícate

Regresión lineal simple y múltiple

La regresión lineal es un método estadístico utilizado para modelar la relación entre una variable dependiente y una o más variables independientes. En PySpark MLlib, puedes implementar regresiones lineales tanto simples como múltiples para predecir valores continuos basados en tus datos.

Para comenzar, es necesario preparar los datos en un DataFrame estructurado adecuadamente. Asegúrate de que tus características estén en un vector denso utilizando la clase VectorAssembler. Este paso es crucial, ya que la ingeniería de características influencia directamente el rendimiento del modelo.

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol='features')
training_data = assembler.transform(raw_data)

En el caso de una regresión lineal simple, solo utilizas una variable independiente para predecir la variable dependiente. Por ejemplo, si deseas predecir las ventas en función del gasto en publicidad:

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='sales')
model = lr.fit(training_data)

La regresión lineal múltiple extiende este concepto al utilizar múltiples variables independientes. Esto permite capturar relaciones más complejas entre las variables y obtener predicciones más precisas.

Una vez entrenado el modelo, puedes realizar predicciones sobre un conjunto de datos de prueba. Es importante utilizar el mismo proceso de transformación de características que aplicaste al conjunto de entrenamiento.

test_data = assembler.transform(new_raw_data)
predictions = model.transform(test_data)

Para interpretar el modelo, puedes examinar los coeficientes y el intercepto. Estos parámetros proporcionan información sobre la influencia de cada variable independiente en la predicción.

coefficients = model.coefficients
intercept = model.intercept

print(f'Coeficientes: {coefficients}')
print(f'Intercepto: {intercept}')

La comprensión de estos parámetros te permite identificar qué variables tienen un mayor impacto y cómo afectan al resultado. Además, puedes utilizar esta información para refinar el modelo y mejorar su precisión.

Es recomendable validar el modelo utilizando técnicas como la validación cruzada y ajustar los hiperparámetros según sea necesario. PySpark MLlib ofrece herramientas para facilitar este proceso, lo que ayuda a optimizar el rendimiento del modelo en escenarios del mundo real.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='sales'),
                          numFolds=2)

cvModel = crossval.fit(training_data)

Al aplicar regresión lineal con PySpark MLlib, aprovechas la capacidad de procesamiento distribuido de Spark, lo que es especialmente ventajoso cuando trabajas con grandes volúmenes de datos. Esto garantiza que los modelos sean escalables y eficientes en entornos de producción.

Además, PySpark MLlib integra herramientas para la evaluación de modelos, permitiendo medir el rendimiento y realizar ajustes necesarios. Aunque no profundizaremos en las métricas aquí, es fundamental familiarizarse con ellas para interpretar correctamente los resultados.

Finalmente, al implementar estos modelos, es importante seguir las mejores prácticas en el manejo de datos y modelado. Esto incluye la división adecuada de los datos, el manejo de valores atípicos y la consideración de las posibles correlaciones entre variables independientes.

KNN, Árboles de decisión, SVM

En esta sección exploraremos diferentes algoritmos de regresión disponibles en PySpark MLlib: K-Nearest Neighbors (KNN), Árboles de decisión y Máquinas de Vectores de Soporte (SVM). Estos métodos permiten modelar relaciones complejas entre las variables independientes y la variable dependiente, ofreciendo alternativas poderosas para la predicción en problemas de regresión.

Regresión con K-Nearest Neighbors (KNN)

El algoritmo K-Nearest Neighbors es un método basado en instancias que puede utilizarse para tareas de regresión. En KNN para regresión, el valor predictivo de una nueva observación se calcula como el promedio de los valores de sus k vecinos más cercanos en el espacio de características.

Aunque PySpark MLlib no proporciona una implementación nativa de KNN, es posible implementarlo aprovechando las capacidades de procesamiento distribuido de Spark. A continuación, se muestra un ejemplo simplificado de cómo aproximar KNN para regresión utilizando PySpark.

from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, sqrt, udf
from pyspark.sql.types import FloatType

# Preparar los datos
assembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol='features')
data = assembler.transform(raw_data).select('features', 'label')

# Definir una función UDF para calcular la distancia euclidiana
def euclidean_distance(v1, v2):
    return float(v1.squared_distance(v2)) ** 0.5

distance_udf = udf(euclidean_distance, FloatType())

# Punto de consulta
query_point = ...  # Vector de características de la nueva observación

# Calcular distancias y obtener los k vecinos más cercanos
k = 5
neighbors = data.withColumn('distance', distance_udf(col('features'), query_point)) \
                .orderBy(col('distance').asc()) \
                .limit(k)

# Calcular la predicción como el promedio de los vecinos
prediction = neighbors.groupBy().avg('label').collect()[0][0]

Esta aproximación permite implementar KNN para regresión, pero es importante tener en cuenta que puede no escalar eficientemente con conjuntos de datos muy grandes. La eficiencia del cálculo de distancias depende del número de características y del tamaño del conjunto de datos.

Regresión con árboles de decisión

Los árboles de decisión son modelos no paramétricos que realizan particiones recursivas del espacio de características para predecir valores continuos o discretos. Son especialmente útiles para capturar relaciones no lineales y gestionar interacciones entre variables.

Preparación de los datos

Es esencial preparar adecuadamente los datos antes de entrenar un modelo de árbol de decisión. Utilizamos VectorAssembler para combinar las características en un solo vector.

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['feature1', 'feature2', 'feature3'], outputCol='features')
prepared_data = assembler.transform(raw_data).select('features', 'label')

Entrenamiento del modelo

Con los datos preparados, entrenamos un modelo de regresión con árboles de decisión usando PySpark MLlib.

from pyspark.ml.regression import DecisionTreeRegressor

# Crear instancia del modelo
dt_regressor = DecisionTreeRegressor(featuresCol='features', labelCol='label', maxDepth=5)

# Dividir los datos en entrenamiento y prueba
train_data, test_data = prepared_data.randomSplit([0.8, 0.2], seed=42)

# Entrenar el modelo
dt_model = dt_regressor.fit(train_data)

El parámetro maxDepth controla la profundidad máxima del árbol, lo que ayuda a prevenir el sobreajuste al limitar la complejidad del modelo.

Evaluación y predicción

Después de entrenar el modelo, realizamos predicciones y evaluamos su rendimiento.

# Realizar predicciones
predictions = dt_model.transform(test_data)

# Evaluar el modelo
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

print(f'Error cuadrático medio (RMSE): {rmse}')

El RMSE (Root Mean Squared Error) proporciona una medida de la desviación estándar de las predicciones residuales, indicando qué tan dispersos están los datos alrededor de la línea de regresión.

Interpretación del modelo

Los árboles de decisión son interpretables, lo que permite extraer información valiosa sobre las variables más relevantes.

# Extraer información del modelo
feature_importances = dt_model.featureImportances

print('Importancia de las características:')
for idx, importance in enumerate(feature_importances):
    print(f'Característica {idx}: {importance}')

Esta información ayuda a identificar cuáles características tienen mayor peso en el modelo, lo que es útil para la selección de características y para comprender mejor los datos.

Regresión con máquinas de vectores de soporte (SVM)

Las Máquinas de Vectores de Soporte son algoritmos supervisados que pueden utilizarse tanto para clasificación como para regresión. En la regresión SVM, el objetivo es encontrar una función que tenga al menos una desviación ε de los valores reales para todos los datos de entrenamiento y que sea lo más plana posible.

Aunque PySpark MLlib incluye una implementación de SVM para clasificación, no proporciona una implementación directa para regresión SVM. Sin embargo, es posible integrar PySpark con otras bibliotecas especializadas para utilizar SVM en tareas de regresión.

Comparación y selección de modelos

La elección entre KNN, árboles de decisión y SVM depende de la naturaleza de los datos y los requisitos específicos del problema. Es recomendable probar múltiples algoritmos y evaluar su desempeño utilizando métricas adecuadas.

Uso de pipelines y validación cruzada

PySpark MLlib facilita la creación de pipelines para estructurar el flujo de trabajo y realizar validación cruzada para optimizar los hiperparámetros.

from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Definir el pipeline
pipeline = Pipeline(stages=[assembler, dt_regressor])

# Establecer la rejilla de hiperparámetros
paramGrid = ParamGridBuilder() \
    .addGrid(dt_regressor.maxDepth, [3, 5, 7]) \
    .addGrid(dt_regressor.minInstancesPerNode, [1, 2, 4]) \
    .build()

# Configurar la validación cruzada
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='label'),
                          numFolds=5)

# Entrenar el modelo con validación cruzada
cv_model = crossval.fit(train_data)

Esta metodología ayuda a encontrar la combinación óptima de hiperparámetros, mejorando la generalización del modelo a nuevos datos.

Métricas de regresión: RMSE, MAE, R2

La evaluación de modelos de regresión es un paso crítico para asegurar que el modelo prediga con precisión los valores continuos. PySpark MLlib proporciona varias métricas para medir el rendimiento de los modelos, entre las cuales destacan el Root Mean Squared Error (RMSE), el Mean Absolute Error (MAE) y el Coeficiente de Determinación (R²). Comprender y utilizar correctamente estas métricas es esencial para interpretar la eficacia de los modelos y compararlos entre sí.

RMSE

El Error Cuadrático Medio (RMSE) calcula la raíz cuadrada de la media de los errores al cuadrado entre los valores predichos y los valores reales. Es útil para penalizar errores grandes y es sensible a valores atípicos. La fórmula del RMSE es:

$$
RMSE = \sqrt{\frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2}
$$

Donde ( y_i ) es el valor real, ( $hat{y}_i $) es el valor predicho, y ( n ) es el número total de observaciones. Para calcular el RMSE en PySpark MLlib, se utiliza la clase RegressionEvaluator.

from pyspark.ml.evaluation import RegressionEvaluator

evaluator_rmse = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='rmse')
rmse = evaluator_rmse.evaluate(predictions)

print(f'El RMSE del modelo es: {rmse}')

MAE

El Error Absoluto Medio (MAE) mide la media de los valores absolutos de las diferencias entre los valores predichos y los reales. Es menos sensible a valores atípicos que el RMSE y proporciona una medida directa del error promedio. La fórmula del MAE es:

$$
MAE = \frac{1}{n} \sum_{i=1}^{n} | y_i - \hat{y}_i |
$$

Para calcular el MAE en PySpark MLlib:

evaluator_mae = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='mae')
mae = evaluator_mae.evaluate(predictions)

print(f'El MAE del modelo es: {mae}')

R2

El Coeficiente de Determinación (R²) indica la proporción de la varianza en la variable dependiente que es predecible a partir de las variables independientes. El valor de R² oscila entre 0 y 1, donde valores cercanos a 1 indican que el modelo explica la mayor parte de la variabilidad de los datos. La fórmula del R² es:

$$R^2 = 1 - \frac{\sum_{i=1}^{n} (y_i - \hat{y}_i)^2}{\sum_{i=1}^{n} (y_i - \bar{y})^2}$$

Donde ( $bar{y}$ ) es el valor medio de los valores reales. Para calcular R² en PySpark MLlib:

evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')
r2 = evaluator_r2.evaluate(predictions)

print(f'El coeficiente R² del modelo es: {r2}')

Es esencial interpretar estas métricas en el contexto del problema específico. Un RMSE bajo indica que las predicciones están, en promedio, cerca de los valores reales. El MAE ofrece una medida directa del error promedio sin penalizar excesivamente los errores grandes. El proporciona una medida relativa del ajuste del modelo, independiente de las unidades de la variable de salida.

A continuación, se muestra un ejemplo completo de cómo entrenar un modelo de regresión y evaluar su rendimiento utilizando las métricas mencionadas:

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

# Preparar los datos
assembler = VectorAssembler(inputCols=['feature1', 'feature2', 'feature3'], outputCol='features')
data = assembler.transform(raw_data).select('features', 'label')

# Dividir los datos
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Entrenar el modelo
lr_model = lr.fit(train_data)

# Realizar predicciones
predictions = lr_model.transform(test_data)

# Evaluar el modelo
evaluator_rmse = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='rmse')
evaluator_mae = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='mae')
evaluator_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')

rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f'RMSE: {rmse}')
print(f'MAE: {mae}')
print(f'R2: {r2}')

Utilizar y comprender adecuadamente estas métricas permite mejorar los modelos, realizar comparaciones objetivas y tomar decisiones informadas durante el desarrollo de proyectos de aprendizaje automático.

En algunos casos, es posible que desees utilizar métricas personalizadas o adicionales. PySpark MLlib permite definir evaluadores personalizados mediante la implementación de la clase RegressionEvaluator. Por ejemplo, si se requiere calcular el Error Cuadrático Medio Logarítmico (RMSLE):

import pyspark.sql.functions as F
from pyspark.sql import DataFrame

def calculate_rmsle(df: DataFrame, prediction_col: str, label_col: str):
    df = df.withColumn('log_prediction', F.log1p(F.col(prediction_col)))
    df = df.withColumn('log_label', F.log1p(F.col(label_col)))
    df = df.withColumn('squared_error', F.pow(F.col('log_prediction') - F.col('log_label'), 2))
    mean_sle = df.select(F.mean('squared_error')).collect()[0][0]
    rmsle = mean_sle ** 0.5
    return rmsle

rmsle = calculate_rmsle(predictions, 'prediction', 'label')
print(f'RMSLE: {rmsle}')

Definir métricas personalizadas permite ajustar la evaluación del modelo a las necesidades específicas del problema. Al realizar la búsqueda de hiperparámetros, es común utilizar estas métricas para guiar el proceso de optimización:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_rmse,
                          numFolds=5)

cvModel = crossval.fit(train_data)

bestModel = cvModel.bestModel

En este ejemplo, se utiliza el RMSE como métrica para seleccionar los mejores hiperparámetros durante la validación cruzada. Las métricas de evaluación de regresión son herramientas fundamentales en el aprendizaje automático para cuantificar el rendimiento de los modelos y asegurar que cumplen con los objetivos planteados.

Aprende PySpark GRATIS online

Todas las lecciones de PySpark

Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Accede GRATIS a PySpark y certifícate

En esta lección

Objetivos de aprendizaje de esta lección

  1. Entender el concepto y aplicación de la regresión lineal simple y múltiple.
  2. Preparar adecuadamente datos para modelos de regresión en PySpark MLlib.
  3. Aplicar técnicas de ingeniería de características utilizando VectorAssembler.
  4. Implementar modelos de regresión lineal en PySpark MLlib para predecir variables continuas.
  5. Evaluar el rendimiento del modelo mediante indicadores como Coeficientes e Intercepto.
  6. Optimizar los modelos con validación cruzada y ajuste de hiperparámetros.
  7. Aprovechar el procesamiento distribuido de PySpark para manejar grandes volúmenes de datos.