PySpark

Tutorial PySpark: Clasificación con MLlib

Aprende a usar la regresión logística y algoritmos de clasificación en PySpark MLlib para clasificar grandes volúmenes de datos con Python en Apache Spark.

Aprende PySpark GRATIS y certifícate

Regresión logística

La regresión logística es un algoritmo de clasificación ampliamente utilizado para predecir la probabilidad de pertenencia a clases categóricas. En PySpark MLlib, podemos implementar este algoritmo de manera eficiente para manejar grandes volúmenes de datos distribuidos.

Para utilizar la regresión logística en PySpark, primero debemos importar la clase correspondiente:

from pyspark.ml.classification import LogisticRegression

Imaginemos que tenemos un DataFrame llamado datos con una columna de características llamadas features y una columna de etiquetas denominada label. Podemos instanciar y entrenar el modelo de la siguiente manera:

# Crear instancia del modelo de regresión logística
reg_log = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)

# Entrenar el modelo
modelo = reg_log.fit(datos)

Es esencial que la columna features sea un vector de características. Para convertir múltiples columnas en un solo vector, usamos el VectorAssembler:

from pyspark.ml.feature import VectorAssembler

# Especificar las columnas de entrada
assembler = VectorAssembler(inputCols=['columna1', 'columna2', 'columna3'], outputCol='features')

# Transformar los datos
datos = assembler.transform(datos)

Una vez entrenado el modelo, podemos realizar predicciones sobre nuevos datos:

# Realizar predicciones
predicciones = modelo.transform(datos_prueba)

# Mostrar las primeras filas
predicciones.select('label', 'prediction', 'probability').show(5)

La salida incluirá la etiqueta real, la predicción del modelo y la probabilidad asociada a cada clase, lo que nos permite evaluar su rendimiento inicial.

La regresión logística en PySpark MLlib ofrece varios hiperparámetros que podemos ajustar:

  • maxIter: número máximo de iteraciones del algoritmo de optimización.
  • regParam: parámetro de regularización para controlar el sobreajuste.
  • elasticNetParam: mezcla entre regularización L1 (Lasso) y L2 (Ridge).

Por ejemplo, para agregar regularización L1:

# Configurar el modelo con regularización L1
reg_log = LogisticRegression(featuresCol='features', labelCol='label', elasticNetParam=1.0)

Podemos acceder a los coeficientes y al intercepto del modelo para interpretar los resultados:

# Obtener coeficientes e intercepto
coeficientes = modelo.coefficients
intercepto = modelo.intercept

print(f"Coeficientes: {coeficientes}")
print(f"Intercepto: {intercepto}")

La interpretación de los coeficientes nos permite entender la influencia de cada característica en la clasificación de las observaciones.

Para problemas de clasificación multiclase, PySpark MLlib utiliza una estrategia "one-vs-rest" por defecto. Sin embargo, podemos especificar el tipo de familia para el modelo:

# Configurar para clasificación multiclase
reg_log = LogisticRegression(featuresCol='features', labelCol='label', family='multinomial')

Es recomendable estandarizar o normalizar las características para mejorar el rendimiento del modelo. Podemos utilizar el StandardScaler para este propósito:

from pyspark.ml.feature import StandardScaler

# Instanciar el escalador
escalador = StandardScaler(inputCol='features', outputCol='features_escaladas')

# Ajustar y transformar los datos
escalador_modelo = escalador.fit(datos)
datos = escalador_modelo.transform(datos)

Luego, actualizamos el modelo para utilizar las características escaladas:

# Entrenar el modelo con las características escaladas
reg_log = LogisticRegression(featuresCol='features_escaladas', labelCol='label')
modelo = reg_log.fit(datos)

La selección de características también es crucial, especialmente cuando trabajamos con conjuntos de datos de alta dimensionalidad. Podemos emplear métodos como Chi-Squared Selector para seleccionar las características más relevantes:

from pyspark.ml.feature import ChiSqSelector

# Instanciar el selector
selector = ChiSqSelector(numTopFeatures=50, featuresCol='features', labelCol='label', outputCol='features_seleccionadas')

# Aplicar el selector
datos = selector.fit(datos).transform(datos)

Finalmente, es buena práctica dividir los datos en conjuntos de entrenamiento y prueba para evaluar el rendimiento real del modelo:

# Dividir los datos
entrenamiento, prueba = datos.randomSplit([0.8, 0.2], seed=12345)

# Entrenar el modelo con el conjunto de entrenamiento
modelo = reg_log.fit(entrenamiento)

# Evaluar el modelo con el conjunto de prueba
predicciones = modelo.transform(prueba)

De esta manera, podemos utilizar la regresión logística en PySpark MLlib para resolver problemas de clasificación de manera eficaz y escalable, aprovechando las capacidades de procesamiento distribuido de Spark.

KNN, Árboles de decisión, SVM, Naive Bayes

En esta sección exploraremos varios algoritmos de clasificación disponibles en PySpark MLlib: K-Nearest Neighbors (KNN), árboles de decisión, Máquinas de Vectores de Soporte (SVM) y Naive Bayes. Estos algoritmos son fundamentales en el aprendizaje automático supervisado y permiten resolver problemas de clasificación en grandes volúmenes de datos.

Árboles de decisión

Los árboles de decisión son modelos que realizan particiones recursivas de los datos basadas en el valor de las características para predecir la etiqueta de clase. En PySpark MLlib, utilizamos la clase DecisionTreeClassifier para implementar este algoritmo:

from pyspark.ml.classification import DecisionTreeClassifier

Suponiendo un DataFrame llamado datos con las columnas features y label, instanciamos y entrenamos el modelo:

# Crear instancia del clasificador de árbol de decisión
arbol = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=5)

# Entrenar el modelo
modelo_arbol = arbol.fit(datos)

El parámetro maxDepth controla la profundidad máxima del árbol, lo que ayuda a prevenir el sobreajuste. Para realizar predicciones:

# Realizar predicciones
predicciones_arbol = modelo_arbol.transform(datos_prueba)

# Mostrar resultados
predicciones_arbol.select('label', 'prediction', 'probability').show(5)

Podemos visualizar la estructura del árbol utilizando modelo_arbol.toDebugString, lo que facilita la interpretación del modelo.

Máquinas de Vectores de Soporte (SVM)

Las SVM son algoritmos que buscan el hiperplano que mejor separa las clases en el espacio de características. En PySpark MLlib, utilizamos LinearSVC para implementarlas:

from pyspark.ml.classification import LinearSVC

Instanciamos y entrenamos el modelo:

# Crear instancia del clasificador SVM lineal
svm = LinearSVC(featuresCol='features', labelCol='label', maxIter=10, regParam=0.1)

# Entrenar el modelo
modelo_svm = svm.fit(datos)

El parámetro regParam es el parámetro de regularización que ayuda a evitar el sobreajuste. Para realizar predicciones:

# Realizar predicciones
predicciones_svm = modelo_svm.transform(datos_prueba)

# Mostrar resultados
predicciones_svm.select('label', 'prediction').show(5)

Es importante destacar que LinearSVC soporta solo clasificación binaria. Para problemas multiclase, podemos considerar otros algoritmos o estrategias como One-vs-Rest.

Naive Bayes

El algoritmo Naive Bayes se basa en el teorema de Bayes, asumiendo independencia entre las características. Es especialmente útil en problemas como la clasificación de textos. Importamos la clase correspondiente:

from pyspark.ml.classification import NaiveBayes

Instanciamos y entrenamos el modelo:

# Crear instancia del clasificador Naive Bayes
nb = NaiveBayes(featuresCol='features', labelCol='label', smoothing=1.0, modelType='multinomial')

# Entrenar el modelo
modelo_nb = nb.fit(datos)

El parámetro smoothing aplica suavizado de Laplace para manejar mejor las probabilidades. Realizamos predicciones:

# Realizar predicciones
predicciones_nb = modelo_nb.transform(datos_prueba)

# Mostrar resultados
predicciones_nb.select('label', 'prediction', 'probability').show(5)

El clasificador Naive Bayes es eficiente para clasificación multiclase y escalable para grandes conjuntos de datos.

K-Nearest Neighbors (KNN)

Aunque PySpark MLlib no incluye una implementación nativa de KNN, podemos aproximar este algoritmo utilizando Locality Sensitive Hashing (LSH) para encontrar vecinos cercanos de forma eficiente. Importamos la clase BucketedRandomProjectionLSH:

from pyspark.ml.feature import BucketedRandomProjectionLSH

Configuramos y ajustamos el modelo LSH:

# Crear instancia del modelo LSH
lsh = BucketedRandomProjectionLSH(inputCol='features', outputCol='hashes', bucketLength=2.0)

# Ajustar el modelo
modelo_lsh = lsh.fit(datos)

Para encontrar los vecinos más cercanos de un punto de consulta:

# Punto de consulta
punto_consulta = datos_prueba.limit(1)

# Encontrar vecinos más cercanos
vecinos = modelo_lsh.approxNearestNeighbors(datos, punto_consulta.collect()[0].features, k=5)

# Mostrar vecinos
vecinos.select('label', 'features').show()

Este enfoque nos permite implementar una versión aproximada de KNN en un entorno distribuido.

Consideraciones adicionales

Preprocesamiento de datos: Asegúrese de que las características estén escaladas o normalizadas si el algoritmo lo requiere. Por ejemplo, las SVM y KNN son sensibles a la escala de las características.

División de datos: Es fundamental dividir los datos en conjuntos de entrenamiento y prueba para evaluar el rendimiento real del modelo:

entrenamiento, prueba = datos.randomSplit([0.8, 0.2], seed=42)
  • Ajuste de hiperparámetros: Podemos utilizar herramientas como ParamGridBuilder y CrossValidator para optimizar los hiperparámetros:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  
# Configurar la cuadrícula de hiperparámetros
paramGrid = ParamGridBuilder() \
    .addGrid(arbol.maxDepth, [3, 5, 7]) \
    .build()
  
# Configurar la validación cruzada
evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
cv = CrossValidator(estimator=arbol, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
  
# Ajustar el modelo
cv_modelo = cv.fit(entrenamiento)
  • Interpretación del modelo: Para los árboles de decisión, podemos extraer características importantes y entender cómo el modelo toma decisiones, lo que es valioso para interpretabilidad.

Estos algoritmos amplían las capacidades de clasificación en PySpark MLlib, permitiendo manejar diversos tipos de problemas y conjuntos de datos, aprovechando la potencia del procesamiento distribuido.

Métricas de clasificación: Matriz de confusión, Accuracy, Precision, Recall, F1-score, ROC y AUC

En la evaluación de modelos de clasificación, es fundamental utilizar métricas adecuadas para medir su rendimiento y capacidad predictiva. Estas métricas permiten cuantificar qué tan bien está funcionando el modelo y ayudan a identificar áreas de mejora.

La matriz de confusión es una tabla que resume las predicciones del modelo comparándolas con las etiquetas reales. En PySpark MLlib, podemos construirla utilizando operaciones de agrupación y conteo:

# Crear la matriz de confusión
matriz_confusion = predicciones.groupBy('label', 'prediction').count().orderBy('label', 'prediction')

# Mostrar la matriz de confusión
matriz_confusion.show()

Esta matriz muestra cómo se distribuyen las predicciones en relación con las etiquetas verdaderas, permitiendo identificar errores específicos en la clasificación.

La accuracy o exactitud mide la proporción de predicciones correctas sobre el total de observaciones. Se calcula utilizando MulticlassClassificationEvaluator:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Calcular la accuracy
evaluador_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluador_accuracy.evaluate(predicciones)

print(f"Accuracy: {accuracy}")

Una alta accuracy indica que el modelo predice correctamente la mayoría de las etiquetas.

La precisión (precision) evalúa la proporción de verdaderos positivos sobre el total de predicciones positivas. Se calcula de la siguiente manera:

# Calcular la precisión
evaluador_precision = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedPrecision')
precision = evaluador_precision.evaluate(predicciones)

print(f"Precisión: {precision}")

Una alta precisión significa que el modelo tiene pocos falsos positivos.

El recall (sensibilidad) mide la proporción de verdaderos positivos sobre el total de verdaderos positivos y falsos negativos:

# Calcular el recall
evaluador_recall = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedRecall')
recall = evaluador_recall.evaluate(predicciones)

print(f"Recall: {recall}")

Un alto recall indica que el modelo captura la mayoría de los casos positivos.

El F1-score es la media armónica entre la precisión y el recall, proporcionando un equilibrio entre ambas métricas:

# Calcular el F1-score
evaluador_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
f1_score = evaluador_f1.evaluate(predicciones)

print(f"F1-Score: {f1_score}")

El F1-score es especialmente útil cuando hay un desequilibrio en las clases y se requiere un balance entre precisión y recall.

La curva ROC (Receiver Operating Characteristic) es una representación gráfica que muestra la relación entre la tasa de verdaderos positivos y la tasa de falsos positivos a diferentes umbrales de clasificación. El AUC (Area Under the Curve) mide el área bajo esta curva y proporciona una métrica agregada del rendimiento del modelo.

Para problemas de clasificación binaria, podemos calcular el AUC utilizando BinaryClassificationEvaluator:

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Calcular el AUC
evaluador_auc = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
auc = evaluador_auc.evaluate(predicciones)

print(f"AUC: {auc}")

Un AUC cercano a 1 indica que el modelo tiene una excelente capacidad para distinguir entre las clases positivas y negativas.

Es importante destacar que para modelos multiclas, el AUC y la curva ROC requieren adaptaciones o métricas alternativas, ya que están diseñados originalmente para clasificación binaria.

Para visualizar la curva ROC en un modelo de regresión logística:

# Obtener los puntos de la curva ROC
roc = modelo.summary.roc
roc.show()

# Graficar la curva ROC (requiere importar matplotlib)
import matplotlib.pyplot as plt

roc_pd = roc.toPandas()
plt.plot(roc_pd['FPR'], roc_pd['TPR'])
plt.xlabel('Tasa de Falsos Positivos')
plt.ylabel('Tasa de Verdaderos Positivos')
plt.title('Curva ROC')
plt.show()

La curva ROC proporciona una visión detallada del rendimiento del modelo a través de diferentes umbrales, siendo una herramienta útil para seleccionar el punto óptimo de operación.

En resumen, el uso de estas métricas de clasificación es esencial para evaluar y comparar modelos, así como para tomar decisiones informadas en el proceso de mejora y ajuste de los algoritmos de aprendizaje automático.

Aprende PySpark GRATIS online

Todas las lecciones de PySpark

Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.

Accede GRATIS a PySpark y certifícate

En esta lección

Objetivos de aprendizaje de esta lección

  • Importar e instanciar el modelo de regresión logística usando PySpark MLlib.
  • Convertir columnas en vectores de características usando VectorAssembler.
  • Entrenar el modelo con datos distribuidos y realizar predicciones.
  • Ajustar hiperparámetros como maxIter y regParam para optimizar el rendimiento.
  • Interpretar coeficientes e interceptos para inferir la influencia de las características.
  • Aplicar estrategias para clasificación multiclase.
  • Estandarizar características con StandardScaler y seleccionar características con Chi-Squared Selector.