PySpark
Tutorial PySpark: Pipelines
Aprende a crear y optimizar pipelines en PySpark MLlib para mejorar los flujos de trabajo de aprendizaje automático con técnicas avanzadas como grid search en Apache Spark con Python.
Aprende PySpark GRATIS y certifícateUso de pipelines en PySpark MLlib
En PySpark MLlib, los pipelines son una herramienta esencial para estructurar y simplificar los flujos de trabajo de aprendizaje automático. Permiten encadenar múltiples etapas de transformación y modelado en una secuencia ordenada, facilitando la construcción, evaluación y mantenimiento de modelos complejos.
Un pipeline se compone de una serie de etapas (stages), donde cada etapa puede ser un transformador o un estimador. Los transformadores, como StringIndexer
o VectorAssembler
, aplican transformaciones a los datos y generan una nueva columna en el DataFrame. Los estimadores, como LogisticRegression
o DecisionTreeClassifier
, entrenan modelos basados en los datos y producen un transformador como resultado del ajuste.
La creación de un pipeline permite encapsular todo el proceso de preprocesamiento y entrenamiento en un solo objeto. Esto no solo mejora la legibilidad del código, sino que también asegura que las mismas transformaciones se apliquen consistentemente durante el entrenamiento y la inferencia.
A continuación, se muestra un ejemplo práctico de cómo utilizar un pipeline en PySpark MLlib:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
# Paso 1: Definir los transformadores y estimadores
indexador = StringIndexer(inputCol="categoria", outputCol="categoria_indexada")
ensamblador = VectorAssembler(inputCols=["categoria_indexada", "feature1", "feature2"], outputCol="features")
clasificador = RandomForestClassifier(featuresCol="features", labelCol="etiqueta")
# Paso 2: Crear el pipeline con las etapas definidas
pipeline = Pipeline(stages=[indexador, ensamblador, clasificador])
# Paso 3: Ajustar el pipeline a los datos de entrenamiento
modelo = pipeline.fit(datos_entrenamiento)
# Paso 4: Realizar predicciones en datos de prueba
predicciones = modelo.transform(datos_prueba)
En este ejemplo, se siguen estos pasos:
- Indexar variables categóricas:
StringIndexer
convierte la columna categórica "categoria" en una columna numérica "categoria_indexada". - Ensamblar características:
VectorAssembler
combina las columnas relevantes en una sola columna de características. - Entrenar el modelo:
RandomForestClassifier
utiliza las características ensambladas para entrenar un modelo de clasificación. - Crear el pipeline: Se definen las etapas en orden secuencial dentro del pipeline.
- Ajustar y predecir: Se entrena el pipeline completo y se aplican las transformaciones y predicciones sobre los datos de prueba.
El uso de pipelines ofrece varias ventajas:
- Modularidad: Cada etapa es independiente y puede ser modificada o reemplazada sin afectar al resto del pipeline.
- Reproducibilidad: Al mantener todas las etapas en un objeto, se asegura que las mismas transformaciones se aplican en cada ejecución.
- Mantenibilidad: Facilita la lectura y mantenimiento del código, especialmente en proyectos con múltiples etapas de preprocesamiento y modelado.
- Integración con herramientas avanzadas: Los pipelines son compatibles con técnicas como la validación cruzada y el ajuste de hiperparámetros.
Es posible incorporar etapas más avanzadas en un pipeline, como escalado de características o reducción de dimensionalidad:
from pyspark.ml.feature import StandardScaler, PCA
escalador = StandardScaler(inputCol="features", outputCol="features_escaladas")
pca = PCA(k=3, inputCol="features_escaladas", outputCol="features_pca")
pipeline = Pipeline(stages=[indexador, ensamblador, escalador, pca, clasificador])
En este caso, se añaden dos transformadores adicionales:
- Escalado de características:
StandardScaler
normaliza las características para tener media cero y desviación estándar uno. - Reducción de dimensionalidad:
PCA
reduce el número de características a las componentes principales más significativas.
Al entrenar el pipeline, todas estas transformaciones se aplican secuencialmente, y el modelo final se entrena sobre las características reducidas.
Para visualizar el flujo de datos a través del pipeline, se puede inspeccionar el atributo stages
:
for stage in pipeline.getStages():
print(stage)
Esto permite entender mejor cada componente y su configuración dentro del pipeline.
Transformadores y estimadores personalizados
En PySpark MLlib, los transformadores y estimadores personalizados permiten extender la funcionalidad estándar de la librería para adaptarse a necesidades específicas de los proyectos. Mediante la creación de estos componentes, es posible implementar transformaciones y modelos que no están contemplados en las clases predefinidas, ofreciendo soluciones a problemas particulares de procesamiento y modelado de datos.
Un transformador personalizado es una clase que hereda de Transformer
y redefine el método _transform
, aplicando una transformación específica a un DataFrame. Por ejemplo, si se requiere una transformación que calcule el cuadrado de una columna numérica, se puede crear el siguiente transformador:
from pyspark.ml import Transformer
from pyspark.sql.functions import pow
from pyspark.sql.types import DoubleType
class PowerTransformer(Transformer):
def __init__(self, inputCol, outputCol, exponent):
super(PowerTransformer, self).__init__()
self.inputCol = inputCol
self.outputCol = outputCol
self.exponent = exponent
def _transform(self, dataset):
return dataset.withColumn(self.outputCol, pow(dataset[self.inputCol], self.exponent))
En este caso, PowerTransformer
aplica la función potencia a la columna indicada, elevándola al exponente especificado. Este transformador puede ser utilizado en un pipeline como cualquier otro componente estándar.
Por otra parte, un estimador personalizado es una clase que hereda de Estimator
y redefine el método _fit
, produciendo un modelo entrenado que hereda de Model
. Este modelo, a su vez, debe implementar el método _transform
para realizar predicciones. Un ejemplo sencillo es un estimador que implementa una regresión polinómica:
from pyspark.ml import Estimator, Model
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
class PolynomialRegressionModel(Model):
def __init__(self, coefficients):
super(PolynomialRegressionModel, self).__init__()
self.coefficients = coefficients
def _transform(self, dataset):
def predict(features):
return sum(c * (f ** i) for i, (c, f) in enumerate(zip(self.coefficients, features)))
predict_udf = udf(predict, DoubleType())
return dataset.withColumn("prediction", predict_udf(dataset.features))
class PolynomialRegression(Estimator):
def __init__(self, degree):
super(PolynomialRegression, self).__init__()
self.degree = degree
def _fit(self, dataset):
# Implementación simplificada para el ejemplo
# Aquí se realizaría el ajuste del modelo y cálculo de coeficientes
coefficients = [1.0 for _ in range(self.degree + 1)]
return PolynomialRegressionModel(coefficients)
Este estimador ajusta un modelo de regresión polinomial de grado especificado. Aunque la implementación está simplificada, ilustra cómo estructurar un estimador personalizado que produce un modelo capaz de realizar predicciones en nuevos datos.
Al crear transformadores y estimadores personalizados, es fundamental manejar los parámetros de manera adecuada. PySpark MLlib proporciona la clase Params
y el módulo param
para definir y gestionar parámetros, lo que facilita la integración de los componentes personalizados con el resto de la librería.
Un ejemplo de implementación con parámetros gestionados es el siguiente transformador que normaliza una columna numérica:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.sql.functions import col
class MinMaxScaler(Transformer, HasInputCol, HasOutputCol):
def __init__(self, inputCol=None, outputCol=None):
super(MinMaxScaler, self).__init__()
kwargs = self._input_kwargs
self.setParams(**kwargs)
def setParams(self, inputCol=None, outputCol=None):
return self._set(inputCol=inputCol, outputCol=outputCol)
def _transform(self, dataset):
min_value = dataset.agg({self.getInputCol(): "min"}).collect()[0][0]
max_value = dataset.agg({self.getInputCol(): "max"}).collect()[0][0]
scaled_col = (col(self.getInputCol()) - min_value) / (max_value - min_value)
return dataset.withColumn(self.getOutputCol(), scaled_col)
Este MinMaxScaler
calcula el mínimo y máximo de la columna de entrada y genera una nueva columna normalizada entre 0 y 1. La utilización de HasInputCol
y HasOutputCol
simplifica la gestión de los nombres de columnas y mejora la interoperabilidad con otros componentes de MLlib.
Integrar los transformadores y estimadores personalizados en un pipeline es tan sencillo como hacerlo con los componentes integrados:
from pyspark.ml import Pipeline
scaler = MinMaxScaler(inputCol="feature", outputCol="feature_scaled")
poly_reg = PolynomialRegression(degree=2)
pipeline = Pipeline(stages=[scaler, poly_reg])
model = pipeline.fit(training_data)
predictions = model.transform(test_data)
Al ejecutar este pipeline, los datos pasan primero por el transformador personalizado MinMaxScaler
y luego por el estimador PolynomialRegression
, que ajusta el modelo y realiza las predicciones correspondientes.
Es crucial asegurarse de que los componentes personalizados sean serializables si se desea guardar y cargar los modelos entrenados. Para ello, se deben proporcionar implementaciones para los métodos write
y read
si es necesario, aunque muchas veces la herencia de las clases base de MLlib es suficiente.
Finalmente, al desarrollar transformadores y estimadores personalizados, es importante considerar aspectos como:
- Eficiencia computacional: Optimizar las transformaciones para manejar grandes volúmenes de datos sin perder rendimiento.
- Compatibilidad: Asegurar que los componentes funcionan correctamente con otros transformadores y estimadores en pipelines complejos.
- Validación de entradas: Implementar comprobaciones y manejar excepciones para garantizar la robustez del código.
La creación de transformadores y estimadores personalizados en PySpark MLlib ofrece una gran flexibilidad para abordar desafíos específicos en proyectos de análisis de datos y aprendizaje automático. Permite incorporar lógica y algoritmos propios, aprovechando al mismo tiempo la potencia y escalabilidad del framework de Spark.
Grid Search y Cross Validation
La validación cruzada y el grid search son técnicas esenciales en el aprendizaje automático para evaluar y mejorar el rendimiento de los modelos. En PySpark MLlib, estas herramientas se integran de forma nativa en los pipelines, permitiendo un ajuste sistemático y eficiente de los modelos en entornos de datos a gran escala.
La validación cruzada es un método que consiste en dividir el conjunto de datos en varias particiones o folds. En cada iteración, se utiliza una partición distinta como conjunto de validación y el resto como conjunto de entrenamiento. Esto proporciona una estimación más fiable del rendimiento del modelo al reducir el sesgo asociado a una única división de datos. PySpark MLlib ofrece la clase CrossValidator
para implementar esta técnica de manera sencilla.
El grid search, o búsqueda en cuadrícula, es un proceso exhaustivo que evalúa todas las combinaciones posibles de un conjunto definido de hiperparámetros. Mediante la prueba sistemática de estas combinaciones, se identifica la configuración que optimiza el rendimiento del modelo según una métrica específica. La clase ParamGridBuilder
en PySpark facilita la construcción de esta cuadrícula de hiperparámetros.
A continuación, se muestra cómo integrar estas técnicas en un pipeline de PySpark MLlib:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Ensamblador de características
vector_assembler = VectorAssembler(inputCols=["edad", "ingresos", "puntaje_crediticio"], outputCol="features")
# Modelo de regresión logística
regresion_logistica = LogisticRegression(featuresCol="features", labelCol="etiqueta")
# Definición del pipeline
pipeline = Pipeline(stages=[vector_assembler, regresion_logistica])
# Evaluador basado en el área bajo la curva ROC
evaluador = BinaryClassificationEvaluator(labelCol="etiqueta", metricName="areaUnderROC")
# Construcción de la cuadrícula de hiperparámetros
parametros = ParamGridBuilder() \
.addGrid(regresion_logistica.regParam, [0.01, 0.1, 1.0]) \
.addGrid(regresion_logistica.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()
# Configuración del validador cruzado
validador_cruzado = CrossValidator(estimator=pipeline,
estimatorParamMaps=parametros,
evaluator=evaluador,
numFolds=5)
# Ajuste del modelo con validación cruzada y grid search
modelo_cv = validador_cruzado.fit(datos_entrenamiento)
# Aplicación del modelo al conjunto de prueba
predicciones = modelo_cv.transform(datos_prueba)
# Evaluación del modelo
auc = evaluador.evaluate(predicciones)
print(f"Área bajo la curva ROC: {auc}")
En este ejemplo, se realiza lo siguiente:
- Creación del ensamblador:
VectorAssembler
combina las columnas de entrada en un vector de características. - Definición del modelo: Se utiliza
LogisticRegression
para tareas de clasificación binaria. - Construcción del pipeline: Se encapsulan las etapas de preprocesamiento y modelado.
- Configuración del evaluador:
BinaryClassificationEvaluator
calcula el área bajo la curva ROC para medir el rendimiento. - Definición del grid de hiperparámetros:
ParamGridBuilder
especifica los valores a probar pararegParam
yelasticNetParam
. - Configuración del validador cruzado:
CrossValidator
implementa la validación cruzada con el grid de hiperparámetros y el evaluador. - Entrenamiento del modelo: Se ajusta el modelo utilizando todas las combinaciones de hiperparámetros y seleccionando el que ofrece el mejor rendimiento promedio.
- Generación de predicciones: Se aplica el modelo entrenado al conjunto de datos de prueba.
- Evaluación final: Se calcula el AUC para las predicciones obtenidas.
Es importante tener en cuenta que la utilización de grid search junto con validación cruzada puede ser computacionalmente intensiva, especialmente cuando el grid de parámetros es amplio y el conjunto de datos es grande. Por ello, es recomendable seleccionar cuidadosamente los valores de los hiperparámetros y considerar el equilibrio entre costo computacional y beneficio en la mejora del modelo.
Además de CrossValidator
, PySpark MLlib proporciona TrainValidationSplit
, que es una alternativa cuando se busca reducir el tiempo de cómputo. En lugar de realizar múltiples particiones como en la validación cruzada, esta técnica divide los datos en un conjunto de entrenamiento y otro de validación.
Ejemplo de uso de TrainValidationSplit
:
from pyspark.ml.tuning import TrainValidationSplit
# Configuración del divisor de entrenamiento y validación
divisor_entrenamiento = TrainValidationSplit(estimator=pipeline,
estimatorParamMaps=parametros,
evaluator=evaluador,
trainRatio=0.8) # 80% para entrenamiento y 20% para validación
# Ajuste del modelo con grid search sin validación cruzada
modelo_tvs = divisor_entrenamiento.fit(datos_entrenamiento)
# Predicciones y evaluación
predicciones = modelo_tvs.transform(datos_prueba)
auc = evaluador.evaluate(predicciones)
print(f"Área bajo la curva ROC: {auc}")
Aunque TrainValidationSplit
puede ser menos preciso en la estimación del rendimiento del modelo, es una opción válida cuando los recursos computacionales son limitados y se requiere una solución más rápida.
La integración de grid search y validación cruzada en los pipelines de PySpark MLlib permite automatizar el proceso de búsqueda de la mejor configuración del modelo. Esto es especialmente útil en entornos de producción, donde es crucial mantener la consistencia y reproducibilidad de los experimentos.
Al trabajar con estas técnicas, es recomendable:
- Limitar el número de combinaciones: Reducir el grid de hiperparámetros a valores significativos para evitar un crecimiento exponencial en el número de modelos a entrenar.
- Utilizar métricas adecuadas: Seleccionar el evaluador que mejor refleje el objetivo del modelo, como precisión, recall o F1-score en casos específicos.
- Monitorizar el rendimiento: Estar atento al tiempo de ejecución y al uso de recursos para optimizar el proceso.
Finalmente, es esencial recordar que la validación cruzada y el grid search son herramientas para encontrar la mejor configuración de un modelo dado, pero no sustituyen el entendimiento profundo del problema y la naturaleza de los datos. Una exploración cuidadosa y una ingeniería de características adecuada siguen siendo fundamentales para el éxito en proyectos de aprendizaje automático.
Tuning de hiperparámetros
En el aprendizaje automático con PySpark MLlib, el tuning de hiperparámetros es un paso fundamental para optimizar el rendimiento de los modelos. Los hiperparámetros son parámetros que controlan el funcionamiento del algoritmo y no se obtienen directamente del entrenamiento con datos. Ajustar estos valores de manera apropiada puede marcar una diferencia significativa en la precisión y generalización del modelo.
Aunque el grid search y la validación cruzada son métodos ampliamente utilizados para el tuning, pueden resultar computacionalmente costosos, especialmente con grandes volúmenes de datos o modelos con múltiples hiperparámetros. Una alternativa eficiente es la búsqueda aleatoria, donde se exploran combinaciones aleatorias de hiperparámetros dentro de rangos especificados. Esto permite cubrir un espacio más amplio de posibles configuraciones con menos recursos computacionales.
Para implementar la búsqueda aleatoria en PySpark MLlib, se puede generar una cuadrícula de hiperparámetros utilizando funciones aleatorias:
import numpy as np
from pyspark.ml.tuning import ParamGridBuilder
# Generar valores aleatorios para los hiperparámetros
reg_param_values = np.random.uniform(0.0, 1.0, 10)
elastic_net_values = np.random.uniform(0.0, 1.0, 10)
# Construir la cuadrícula aleatoria de hiperparámetros
param_grid = ParamGridBuilder() \
.addGrid(regresion_logistica.regParam, reg_param_values) \
.addGrid(regresion_logistica.elasticNetParam, elastic_net_values) \
.build()
Además, es esencial aprovechar el paralelismo de PySpark para acelerar el proceso de tuning. Al especificar el parámetro parallelism
en CrossValidator
o TrainValidationSplit
, se pueden ejecutar múltiples evaluaciones de modelos en paralelo:
validador_cruzado = CrossValidator(estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluador,
numFolds=5,
parallelism=4) # Ejecuta 4 tareas en paralelo
De esta manera, se reduce el tiempo de cómputo total al distribuir las tareas a través de los recursos del clúster.
En ciertas situaciones, las métricas estándar pueden no alinearse con los objetivos específicos del proyecto. En estos casos, crear un evaluador personalizado puede ser muy útil. Por ejemplo, si se desea optimizar la precisión en lugar del área bajo la curva ROC, se puede definir un evaluador propio:
from pyspark.ml.evaluation import Evaluator
class PrecisionEvaluator(Evaluator):
def __init__(self, labelCol="etiqueta", predictionCol="prediction"):
super(PrecisionEvaluator, self).__init__()
self.labelCol = labelCol
self.predictionCol = predictionCol
def _evaluate(self, dataset):
tp = dataset.filter((dataset[self.labelCol] == 1) & (dataset[self.predictionCol] == 1)).count()
fp = dataset.filter((dataset[self.labelCol] == 0) & (dataset[self.predictionCol] == 1)).count()
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
return precision
def isLargerBetter(self):
return True
Al utilizar este evaluador en el proceso de tuning, se dirige la optimización hacia la métrica de interés específica.
Cuando se trabaja con modelos más complejos como Random Forests o Gradient-Boosted Trees, es importante focalizar el tuning en los hiperparámetros que mayor impacto tienen en el rendimiento. Por ejemplo, en un Random Forest, los hiperparámetros clave suelen ser numTrees
, maxDepth
y maxBins
. Se puede crear una cuadrícula enfocada:
from pyspark.ml.classification import RandomForestClassifier
clasificador_rf = RandomForestClassifier(featuresCol="features", labelCol="etiqueta")
param_grid_rf = ParamGridBuilder() \
.addGrid(clasificador_rf.numTrees, [50, 100, 150]) \
.addGrid(clasificador_rf.maxDepth, [5, 10, 15]) \
.addGrid(clasificador_rf.maxBins, [32, 64]) \
.build()
Otra práctica recomendable es integrar herramientas como MLflow para el seguimiento de experimentos. MLflow permite registrar los hiperparámetros utilizados, las métricas obtenidas y los modelos entrenados, facilitando la comparación y reproducibilidad de los resultados:
import mlflow
import mlflow.spark
with mlflow.start_run():
modelo_cv = validador_cruzado.fit(datos_entrenamiento)
mlflow.spark.log_model(modelo_cv.bestModel, "modelo_mejor")
best_params = modelo_cv.bestModel.stages[-1].extractParamMap()
for param, valor in best_params.items():
mlflow.log_param(param.name, valor)
metric = evaluador.evaluate(modelo_cv.bestModel.transform(datos_prueba))
mlflow.log_metric("metric", metric)
Además, para llevar a cabo un tuning más avanzado, es posible integrar bibliotecas externas como Hyperopt o Optuna que implementan técnicas de optimización como la búsqueda bayesiana. Aunque PySpark MLlib no incluye estas técnicas de forma nativa, su uso combinado puede mejorar la eficiencia del proceso.
Por ejemplo, utilizando Hyperopt:
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
def objetivo(params):
lr = LogisticRegression(regParam=params['regParam'],
elasticNetParam=params['elasticNetParam'],
featuresCol="features",
labelCol="etiqueta")
pipeline = Pipeline(stages=[vector_assembler, lr])
modelo = pipeline.fit(datos_entrenamiento)
predicciones = modelo.transform(datos_prueba)
metric = evaluador.evaluate(predicciones)
return {'loss': -metric, 'status': STATUS_OK}
space = {
'regParam': hp.uniform('regParam', 0.0, 1.0),
'elasticNetParam': hp.uniform('elasticNetParam', 0.0, 1.0)
}
trials = Trials()
mejores_params = fmin(fn=objetivo,
space=space,
algo=tpe.suggest,
max_evals=20,
trials=trials)
Finalmente, algunas mejores prácticas a considerar durante el tuning de hiperparámetros en PySpark MLlib son:
- Reducir el espacio de búsqueda: Focalizarse en los hiperparámetros más influyentes y limitar el número de valores para evitar combinaciones excesivas.
- Utilizar muestras representativas: Trabajar con un subconjunto de los datos que mantenga las características esenciales puede acelerar el proceso sin sacrificar la calidad.
- Monitorizar el uso de recursos: Supervisar el consumo de memoria y tiempo de cómputo para optimizar la eficiencia y evitar sobrecargas.
- Automatizar los flujos de trabajo: Implementar pipelines automatizados facilita repetir el proceso y mantener la coherencia en las evaluaciones.
Aplicando estas estrategias, el proceso de tuning de hiperparámetros se vuelve más eficiente y efectivo, permitiendo obtener modelos de alto rendimiento en entornos de big data con PySpark MLlib.
Guardar y cargar modelos entrenados
En proyectos de aprendizaje automático con PySpark MLlib, es fundamental guardar los modelos entrenados para su posterior reutilización, despliegue o análisis. PySpark proporciona métodos integrados para almacenar tanto modelos individuales como pipelines completos, facilitando su persistencia en disco y su carga en futuros procesos sin necesidad de reentrenamiento.
Para guardar un modelo entrenado, se utiliza el método save
o write().save()
, especificando la ruta donde se almacenará el modelo. Es importante destacar que los modelos se guardan en formato Spark ML nativo, lo que permite mantener toda la información necesaria para su reconstrucción.
A continuación, se muestra un ejemplo de cómo guardar un modelo o pipeline entrenado:
# Supongamos que 'modelo_entrenado' es un pipeline o modelo ya ajustado
ruta_modelo = "hdfs://ruta/a/destino/modelo_entrenado"
# Guardar el modelo en la ruta especificada
modelo_entrenado.write().overwrite().save(ruta_modelo)
En este ejemplo, se utiliza el método write()
seguido de save()
para almacenar el modelo. El método overwrite()
asegura que si ya existe un modelo en esa ruta, será reemplazado por el nuevo. La ruta puede ser local o en un sistema de archivos distribuido como HDFS.
Para cargar un modelo previamente guardado, se emplea el método load
del mismo tipo de modelo o pipeline. Esto permite restaurar el modelo y utilizarlo para realizar predicciones o continuar con el procesamiento.
Ejemplo de carga de un modelo guardado:
from pyspark.ml import PipelineModel
# Ruta donde se encuentra el modelo guardado
ruta_modelo = "hdfs://ruta/a/destino/modelo_entrenado"
# Cargar el modelo desde la ruta especificada
modelo_cargado = PipelineModel.load(ruta_modelo)
En este caso, se ha utilizado PipelineModel.load()
ya que se está cargando un pipeline completo. Si se tratara de un modelo específico, como RandomForestClassificationModel
, se utilizaría la clase correspondiente:
from pyspark.ml.classification import RandomForestClassificationModel
# Cargar un modelo de Random Forest previamente guardado
modelo_rf = RandomForestClassificationModel.load("hdfs://ruta/a/destino/modelo_rf")
Al cargar el modelo, éste conserva toda la configuración y los parámetros ajustados durante el entrenamiento, permitiendo realizar predicciones de forma inmediata:
# Realizar predicciones utilizando el modelo cargado
predicciones = modelo_cargado.transform(datos_nuevos)
Es recomendable guardar el pipeline completo que incluye todas las etapas de preprocesamiento y modelado. De esta manera, se garantiza que las mismas transformaciones aplicadas durante el entrenamiento se realizan en los datos nuevos, manteniendo la consistencia y evitando errores.
Consideraciones al guardar y cargar modelos
Compatibilidad de versiones: Es fundamental asegurar que la versión de PySpark utilizada al guardar el modelo sea la misma al momento de cargarlo. Diferencias en versiones pueden causar incompatibilidades y errores.
Ubicación de almacenamiento: Se puede almacenar el modelo en el sistema de archivos local, HDFS, S3 u otros sistemas compatibles. Es importante que la ruta sea accesible desde el entorno donde se cargará el modelo.
Gestión de dependencias: Si el modelo incluye transformadores o estimadores personalizados, es necesario que el código de estos componentes esté disponible en el entorno donde se cargue el modelo, ya que PySpark solo guarda referencias a las clases utilizadas.
Ejemplo completo
A continuación, se presenta un ejemplo completo que muestra cómo entrenar un pipeline, guardarlo y posteriormente cargarlo para realizar predicciones:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
# Paso 1: Definir las etapas del pipeline
indexador = StringIndexer(inputCol="categoria", outputCol="categoria_indexada")
ensamblador = VectorAssembler(inputCols=["categoria_indexada", "feature1", "feature2"], outputCol="features")
clasificador = LogisticRegression(featuresCol="features", labelCol="etiqueta")
# Paso 2: Crear el pipeline
pipeline = Pipeline(stages=[indexador, ensamblador, clasificador])
# Paso 3: Entrenar el pipeline con los datos de entrenamiento
modelo_entrenado = pipeline.fit(datos_entrenamiento)
# Paso 4: Guardar el modelo entrenado
ruta_modelo = "hdfs://ruta/a/destino/pipeline_entrenado"
modelo_entrenado.write().overwrite().save(ruta_modelo)
# -- En otro momento o entorno --
# Paso 5: Cargar el modelo entrenado
modelo_cargado = PipelineModel.load(ruta_modelo)
# Paso 6: Utilizar el modelo cargado para hacer predicciones
predicciones = modelo_cargado.transform(datos_prueba)
Este ejemplo ilustra el flujo completo desde el entrenamiento hasta la aplicación de un modelo guardado. Al cargar el PipelineModel
, se recuperan todas las etapas con sus respectivos parámetros y ajustes.
Guardado y carga de modelos en formato PMML y MLeap
Si se requiere interoperabilidad con otras herramientas o lenguajes, es posible convertir y guardar modelos en formatos estándar como PMML (Predictive Model Markup Language) o utilizar librerías como MLeap para exportar los modelos. Sin embargo, es importante considerar que estas opciones pueden tener limitaciones y no soportar todos los tipos de modelos o transformaciones disponibles en PySpark MLlib. Siempre se debe verificar la compatibilidad y realizar pruebas exhaustivas al utilizar formatos de intercambio.
Uso de modelos en producción
Al desplegar modelos entrenados en un entorno de producción, es crucial asegurar que:
Se mantienen las mismas dependencias y configuraciones que en el entorno de entrenamiento.
Se implementan mecanismos de versionado y control de cambios para los modelos, facilitando la trazabilidad y revisiones.
Se monitoriza el rendimiento y las predicciones del modelo para detectar posibles desviaciones o degradación en su desempeño.
Todas las lecciones de PySpark
Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Pyspark
Introducción Y Entorno
Instalación De Pyspark
Introducción Y Entorno
Fundamentos De Pyspark
Introducción Y Entorno
Manipulación Y Análisis De Datos Con Pyspark
Transformación De Datos
Pyspark Sql
Transformación De Datos
Trabajo Con Datos Complejos
Transformación De Datos
Introducción A Mllib
Aprendizaje Automático
Preparación De Datos Para Mllib
Aprendizaje Automático
Regresión Con Mllib
Aprendizaje Automático
Clasificación Con Mllib
Aprendizaje Automático
Modelos De Clustering
Aprendizaje Automático
Reducción De La Dimensionalidad
Aprendizaje Automático
Recomendación
Aprendizaje Automático
Pipelines
Aprendizaje Automático
Mllib Con Scikit Learn
Integraciones
Mllib Con Tensorflow
Integraciones
En esta lección
Objetivos de aprendizaje de esta lección
- Comprender la estructura y funcionalidad de los pipelines en PySpark MLlib.
- Crear y configurar transformadores y estimadores dentro de un pipeline.
- Implementar transformadores y estimadores personalizados para tareas específicas.
- Integrar técnicas avanzadas como escalado y PCA en los pipelines.
- Aplicar grid search y validación cruzada para optimizar modelos.
- Guardar y cargar modelos entrenados para su reutilización.