PySpark
Tutorial PySpark: Reducción de la dimensionalidad
PySpark MLlib: Descubre cómo aplicar PCA para reducción de dimensionalidad y mejorar la eficiencia de modelos de aprendizaje automático, reduciendo la dimensionalidad de conjuntos de datos con Python en Apache spark.
Aprende PySpark GRATIS y certifícateAnálisis de Componentes Principales (PCA)
El Análisis de Componentes Principales (PCA) es una técnica esencial en la reducción de dimensionalidad que permite transformar un conjunto de variables posiblemente correlacionadas en un conjunto de variables no correlacionadas llamadas componentes principales. En PySpark MLlib, PCA se utiliza para simplificar modelos de aprendizaje automático, mejorar el rendimiento computacional y eliminar ruido en los datos.
Para aplicar PCA en PySpark, es fundamental trabajar con el módulo pyspark.ml.feature.PCA
. Este módulo proporciona herramientas para ajustar un modelo PCA y transformar los datos de entrada en el espacio reducido de componentes principales.
Primero, es necesario cargar y preparar los datos. Supongamos que disponemos de un DataFrame
con características numéricas que queremos reducir:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PCAExample").getOrCreate()
# Cargar los datos en un DataFrame
data = spark.read.format("libsvm").load("datos.libsvm")
Antes de aplicar PCA, es habitual utilizar un VectorAssembler para combinar múltiples columnas en un solo vector de características:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["col1", "col2", "col3"], outputCol="features")
assembled_data = assembler.transform(data)
Una vez preparados los datos, se puede configurar y ajustar el modelo PCA especificando el número de componentes principales que se desean extraer:
from pyspark.ml.feature import PCA
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(assembled_data)
Aquí, k=2
indica que queremos reducir las dimensiones a dos componentes principales. El método fit
ajusta el modelo PCA a los datos proporcionados.
Tras ajustar el modelo, podemos transformar los datos originales para obtener las nuevas características reducidas:
result = model.transform(assembled_data)
El DataFrame
resultante contiene una nueva columna pcaFeatures
que almacena las componentes principales calculadas. Es útil visualizar los resultados para interpretar las componentes extraídas:
result.select("pcaFeatures").show(truncate=False)
La interpretación de los componentes es crucial. Los vectores propios asociados a las componentes principales indican cómo se combinan las variables originales. Además, la varianza explicada por cada componente es accesible a través del atributo explainedVariance
:
explained_variance = model.explainedVariance
print("Varianza explicada por cada componente principal: " + str(explained_variance))
Conocer la varianza explicada ayuda a determinar cuántos componentes son necesarios para representar adecuadamente los datos sin perder información significativa.
Es importante destacar que, en PySpark MLlib, todas las operaciones son distribuidas, lo que permite aplicar PCA a conjuntos de datos masivos de manera eficiente. Además, PCA puede integrarse en pipelines de MLlib, facilitando su uso en flujos de trabajo más complejos.
Para finalizar, cuando se trabaja con PCA es recomendable estandarizar los datos antes de aplicarlo, ya que las componentes principales son sensibles a las escalas de las variables originales:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True)
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)
# Aplicar PCA a los datos escalados
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
model = pca.fit(scaled_data)
result = model.transform(scaled_data)
La estandarización asegura que todas las características contribuyan equitativamente al análisis, mejorando la calidad de los componentes principales obtenidos.
Descomposición en Valores Singulares (SVD)
La Descomposición en Valores Singulares (SVD) es una técnica fundamental en el análisis de datos y aprendizaje automático, utilizada para la reducción de dimensionalidad, filtrado de ruido y extracción de características latentes. En el contexto de PySpark MLlib, SVD permite descomponer grandes matrices en componentes más manejables, facilitando el procesamiento y análisis de conjuntos de datos distribuidos.
Matemáticamente, SVD descompone una matriz ( A ) en el producto de tres matrices:
$$ A = U \Sigma V^T $$
Donde:
- ( $U$ ) es una matriz ortogonal cuyas columnas son los vectores singulares izquierdos.
- ( $Sigma$ ) es una matriz diagonal con los valores singulares no negativos en orden decreciente.
- ( $V^T$ ) es la transpuesta de la matriz ortogonal ( V ), cuyas columnas son los vectores singulares derechos.
En PySpark MLlib, para aplicar SVD es necesario trabajar con una RowMatrix, que es una matriz de filas distribuidas. A continuación, se detalla cómo implementar SVD utilizando PySpark.
Primero, se importa y se configura el entorno necesario:
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
spark = SparkSession.builder.appName("SVDExample").getOrCreate()
Se crea un DataFrame con datos de ejemplo:
# Datos de ejemplo
data = [
(Vectors.dense([1.0, 2.0, 3.0]),),
(Vectors.dense([4.0, 5.0, 6.0]),),
(Vectors.dense([7.0, 8.0, 9.0]),)
]
df = spark.createDataFrame(data, ["features"])
Luego, se convierte el DataFrame en un RDD de vectores y se crea una RowMatrix:
# Convertir DataFrame a RDD de vectores
rows_rdd = df.select("features").rdd.map(lambda row: row[0])
# Crear la RowMatrix
mat = RowMatrix(rows_rdd)
A continuación, se calcula la descomposición en valores singulares especificando el número de componentes ( k ):
# Calcular la SVD con k=2
svd = mat.computeSVD(k=2, computeU=True)
El objeto svd
contiene:
svd.U
: Matriz de vectores singulares izquierdos como RowMatrix.svd.s
: Vector de valores singulares.svd.V
: Matriz de vectores singulares derechos como DenseMatrix.
Para visualizar los valores singulares:
print("Valores singulares:", svd.s)
Los valores singulares reflejan la importancia de cada componente en la estructura de los datos. Seleccionando los mayores valores singulares y sus vectores asociados, se puede reducir la dimensionalidad conservando la mayor parte de la información.
Para proyectar los datos originales en el espacio reducido, se multiplica la matriz ( U ) por los valores singulares:
from pyspark.mllib.linalg import DenseVector
# Proyectar los datos en el espacio reducido
projected_data = svd.U.rows.map(lambda vector: DenseVector([x * svd.s[i] for i, x in enumerate(vector)]))
Este conjunto de datos proyectados representa los datos originales en un espacio de menor dimensionalidad, útil para tareas como visualización o clustering.
Es importante considerar que la SVD puede ser computacionalmente intensiva para matrices muy grandes. PySpark MLlib optimiza este proceso mediante procesamiento distribuido, pero es recomendable reducir el tamaño de los datos si es posible. Además, si los datos son dispersos, es eficiente utilizar estructuras como IndexedRowMatrix o CoordinateMatrix.
Un ejemplo completo que resume el uso de SVD en PySpark es el siguiente:
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
# Inicializar SparkSession
spark = SparkSession.builder.appName("SVDExample").getOrCreate()
# Crear DataFrame de ejemplo
data = [
(Vectors.dense([1.0, 0.0, 0.0]),),
(Vectors.dense([0.0, 1.0, 0.0]),),
(Vectors.dense([0.0, 0.0, 1.0]),),
(Vectors.dense([1.0, 1.0, 1.0]),)
]
df = spark.createDataFrame(data, ["features"])
# Convertir DataFrame a RDD de vectores
rows_rdd = df.select("features").rdd.map(lambda row: row[0])
# Crear RowMatrix
mat = RowMatrix(rows_rdd)
# Calcular SVD con k=2
svd = mat.computeSVD(2, computeU=True)
# Mostrar valores singulares
print("Valores singulares:", svd.s)
# Extraer los valores singulares de svd
singular_values = svd.s
# Transmitir los Valores Singulares a los Workers de Manera Segura
broadcast_s = spark.sparkContext.broadcast(singular_values)
# Proyectar los datos en el espacio reducido
from pyspark.mllib.linalg import DenseVector
projected_data = svd.U.rows.map(lambda vector: DenseVector([x * broadcast_s.value[i] for i, x in enumerate(vector)]))
# Mostrar datos proyectados
for row in projected_data.collect():
print(row)
Este código demuestra cómo cargar los datos, calcular la SVD y proyectar los datos originales en un espacio de menor dimensionalidad utilizando PySpark MLlib.
Aplicaciones prácticas y visualización
La reducción de la dimensionalidad tiene numerosas aplicaciones prácticas en el ámbito del aprendizaje automático y el análisis de datos con PySpark MLlib. Al simplificar los datos manteniendo la mayor parte de la información relevante, se mejora la eficiencia computacional y se facilita la interpretación de los resultados.
Una aplicación común es en el preprocesamiento de datos antes de entrenar modelos de clasificación o regresión. Al reducir el número de características, se minimiza el riesgo de sobreajuste y se acelera el entrenamiento de los algoritmos de aprendizaje automático.
Por ejemplo, consideremos un conjunto de datos con un gran número de variables numéricas. Aplicando PCA o SVD podemos transformar estas variables en un conjunto más pequeño de componentes principales que capturan la mayor parte de la varianza. Esto es especialmente útil en dominios como el procesamiento de imágenes, donde las imágenes de alta resolución se pueden representar de manera más eficiente.
A continuación, se presenta un ejemplo práctico de cómo aplicar PCA para reducir la dimensionalidad y visualizar los resultados en PySpark:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, PCA
import matplotlib.pyplot as plt
# Inicializar SparkSession
spark = SparkSession.builder.appName("PCAVisualization").getOrCreate()
# Cargar datos de ejemplo
data = [
(1, 5.1, 3.5, 1.4, 0.2),
(2, 4.9, 3.0, 1.4, 0.2),
(3, 6.7, 3.1, 4.7, 1.5),
(4, 5.6, 2.9, 3.6, 1.3),
(5, 6.3, 3.3, 6.0, 2.5),
(6, 5.8, 2.7, 5.1, 1.9)
]
columns = ["id", "sepal_length", "sepal_width", "petal_length", "petal_width"]
df = spark.createDataFrame(data, columns)
# Ensamblar las características en un vector
assembler = VectorAssembler(
inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
outputCol="features"
)
assembled_data = assembler.transform(df)
# Aplicar PCA para reducir a 2 dimensiones
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(assembled_data)
result = model.transform(assembled_data).select("id", "pcaFeatures")
# Convertir a Pandas DataFrame para visualización
pandas_df = result.toPandas()
pandas_df["pc1"] = pandas_df["pcaFeatures"].apply(lambda x: x[0])
pandas_df["pc2"] = pandas_df["pcaFeatures"].apply(lambda x: x[1])
# Visualizar los datos proyectados
plt.figure(figsize=(8,6))
plt.scatter(pandas_df["pc1"], pandas_df["pc2"])
plt.xlabel("Componente Principal 1")
plt.ylabel("Componente Principal 2")
plt.title("Visualización de PCA")
plt.show()
En este ejemplo, se utilizan datos de muestra para demostrar cómo reducir de cuatro a dos dimensiones y visualizar los resultados. Es importante notar que para la visualización se utiliza matplotlib, y se convierte el DataFrame de Spark a un DataFrame de pandas para facilitar el proceso.
Otra aplicación práctica es en el análisis exploratorio de datos. Al proyectar datos de alta dimensionalidad en dos o tres componentes principales, se pueden detectar patrones, agrupamientos o anomalías. Esto es esencial en campos como la detección de fraudes o el análisis de mercado.
Además, la reducción de dimensionalidad es útil para mejorar el rendimiento de algoritmos de clustering. Reducir las dimensiones puede ayudar a que técnicas como K-Means converjan más rápidamente y sean más efectivas al eliminar ruido y redundancia en los datos.
Para visualizar los datos después de aplicar SVD, aunque PySpark MLlib no proporciona una función directa para SVD en pipelines de DataFrames, es posible utilizar aproximaciones como el PCA Truncado para trabajar con matrices dispersas y grandes conjuntos de datos.
Es fundamental destacar que, al trabajar con PySpark, se puede aprovechar su capacidad para manejar grandes volúmenes de datos distribuidos. Esto permite aplicar técnicas de reducción de dimensionalidad a conjuntos de datos que no cabrían en la memoria de una sola máquina, manteniendo la eficiencia y escalabilidad.
A continuación, se muestra cómo integrar la reducción de dimensionalidad en un pipeline de aprendizaje automático:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
# Supongamos un DataFrame 'df' con características y etiquetas
# Ensamblador de características
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3", ...],
outputCol="features"
)
# Reducción de dimensionalidad con PCA
pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
# Modelo de clasificación
lr = LogisticRegression(featuresCol="pcaFeatures", labelCol="label")
# Crear el pipeline
pipeline = Pipeline(stages=[assembler, pca, lr])
# Dividir los datos en entrenamiento y prueba
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
# Entrenar el modelo
model = pipeline.fit(train_data)
# Realizar predicciones
predictions = model.transform(test_data)
En este pipeline, se ensamblan las características, se reducen las dimensiones y luego se entrena un modelo de regresión logística. Al reducir las dimensiones antes del entrenamiento, se mejora la eficiencia y se puede lograr una mejor generalización del modelo.
Finalmente, es importante utilizar herramientas de visualización para interpretar y comunicar los resultados. Además de matplotlib, librerías como Seaborn o Plotly pueden integrarse con PySpark para generar gráficos interactivos y más elaborados.
Es recomendable siempre analizar la varianza explicada por los componentes principales para determinar el número óptimo de dimensiones a conservar. Esto asegura que la reducción de dimensionalidad no comprometa la calidad de los datos ni la precisión de los modelos de aprendizaje automático.
Todas las lecciones de PySpark
Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Pyspark
Introducción Y Entorno
Instalación De Pyspark
Introducción Y Entorno
Fundamentos De Pyspark
Introducción Y Entorno
Manipulación Y Análisis De Datos Con Pyspark
Transformación De Datos
Pyspark Sql
Transformación De Datos
Trabajo Con Datos Complejos
Transformación De Datos
Introducción A Mllib
Aprendizaje Automático
Preparación De Datos Para Mllib
Aprendizaje Automático
Regresión Con Mllib
Aprendizaje Automático
Clasificación Con Mllib
Aprendizaje Automático
Modelos De Clustering
Aprendizaje Automático
Reducción De La Dimensionalidad
Aprendizaje Automático
Recomendación
Aprendizaje Automático
Pipelines
Aprendizaje Automático
Mllib Con Scikit Learn
Integraciones
Mllib Con Tensorflow
Integraciones
En esta lección
Objetivos de aprendizaje de esta lección
- Comprender el concepto de Análisis de Componentes Principales (PCA).
- Aplicar PCA usando PySpark MLlib.
- Reducir la dimensionalidad de datos masivos.
- Mejorar la eficiencia de modelos de aprendizaje automático.
- Interpretar la varianza explicada por los componentes principales.
- Establecer un pipeline para el flujo de trabajo en PySpark.