PySpark
Tutorial PySpark: PySpark SQL
Explora PySpark SQL y aprende a trabajar con grandes conjuntos de datos estructurados usando SQL y DataFrames de manera eficiente en Apache Spark SQL.
Aprende PySpark GRATIS y certifícateIntroducción a PySpark SQL
PySpark SQL es un módulo de Apache Spark que permite trabajar con datos estructurados mediante la interfaz de SQL y DataFrames. Proporciona una forma eficiente y expresiva de realizar consultas y operaciones sobre grandes conjuntos de datos distribuidos. A través de PySpark SQL, es posible integrar la potencia de procesamiento de Spark con la familiaridad y versatilidad del lenguaje SQL.
El DataFrame es la abstracción central en PySpark SQL. Similar a una tabla en una base de datos relacional, un DataFrame es una colección distribuida de datos organizados en columnas con tipos conocidos. Los DataFrames ofrecen una API de alto nivel para la manipulación de datos estructurados, permitiendo realizar operaciones como selección, filtrado, agregación y ordenamiento de manera eficiente.
PySpark SQL simplifica la carga de datos desde diversas fuentes, incluyendo archivos Parquet, JSON, CSV y bases de datos JDBC, entre otros. Además, permite registrar DataFrames como vistas temporales o tablas permanentes, lo que habilita la ejecución de consultas SQL estándar sobre ellos. Esta integración facilita combinar la semántica de SQL con las capacidades de procesamiento distribuido de Spark.
Una característica clave de PySpark SQL es el Catalyst Optimizer, un optimizador de consultas que aplica diversas reglas y estrategias para mejorar el rendimiento de las operaciones. Al escribir consultas SQL o utilizar las API de DataFrame, el Catalyst Optimizer analiza y transforma las operaciones para ejecutarlas de manera más eficiente en el clúster, aprovechando las optimizaciones disponibles.
Por ejemplo, para leer un archivo CSV y crear un DataFrame:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EjemploPySparkSQL").getOrCreate()
df = spark.read.csv("datos.csv", header=True, inferSchema=True)
En este ejemplo, se crea una instancia de SparkSession
, que es el punto de entrada para usar PySpark SQL. Luego, se lee un archivo CSV y se crea un DataFrame con los datos, especificando que la primera fila contiene los nombres de las columnas (header=True
) y que los tipos de datos deben inferirse automáticamente (inferSchema=True
).
Los DataFrames en PySpark SQL soportan una amplia gama de funciones integradas para transformar y analizar los datos. Estas funciones incluyen operaciones matemáticas, de cadena, de fecha y agregaciones, entre otras. Además, es posible definir UDFs (User Defined Functions) para extender la funcionalidad con lógica personalizada en Python. Las UDFs permiten aplicar transformaciones complejas a los datos que no están cubiertas por las funciones integradas.
El uso de PySpark SQL habilita la combinación de operaciones SQL con código Python, ofreciendo flexibilidad y eficiencia en el procesamiento de datos. Al aprovechar el modelado relacional y las optimizaciones del Catalyst Optimizer, es posible manejar grandes volúmenes de datos con alto rendimiento y escalabilidad.
Creación de vistas temporales y tablas permanentes
En PySpark SQL, es común interactuar con los datos utilizando vistas temporales y tablas permanentes. Estas estructuras permiten acceder y manipular los datos con sintaxis SQL estándar, facilitando la integración entre código PySpark y consultas SQL.
Para crear una vista temporal, se utiliza el método createOrReplaceTempView()
sobre un DataFrame existente. Esta vista existe únicamente durante la sesión actual de Spark y no persiste en el sistema de archivos. Por ejemplo:
# Creamos una vista temporal a partir de un DataFrame llamado df
df.createOrReplaceTempView("vista_temporal")
Con la vista temporal vista_temporal
creada, es posible realizar consultas SQL directamente:
# Ejecutamos una consulta SQL sobre la vista temporal
resultado = spark.sql("SELECT columna1, columna2 FROM vista_temporal WHERE columna3 > 100")
La vista temporal es útil para operaciones intermedias y análisis que no requieren almacenamiento a largo plazo. Además, mejora la legibilidad del código al permitir referenciar conjuntos de datos con nombres intuitivos.
Para crear una tabla permanente, que persista más allá de la sesión actual y esté disponible para otras aplicaciones Spark, se utiliza el método saveAsTable()
en conjunto con write
. Por ejemplo:
# Guardamos el DataFrame como una tabla permanente en el catálogo de Hive
df.write.mode("overwrite").saveAsTable("nombre_tabla")
Esta acción almacena la tabla en el metastore de Spark, permitiendo que sea consultada en futuras sesiones:
# Accedemos a la tabla permanente en una nueva sesión
tabla_permanente = spark.table("nombre_tabla")
Es importante considerar el modo de escritura al guardar tablas permanentes. El modo "overwrite"
reemplaza cualquier tabla existente con el mismo nombre, mientras que "append"
añade datos a una tabla existente:
# Añadimos datos a una tabla permanente existente
df.write.mode("append").saveAsTable("nombre_tabla")
Las tablas permanentes son especialmente útiles cuando se necesita compartir datos entre diferentes usuarios o aplicaciones, o cuando se requiere un almacenamiento persistente de resultados intermedios.
Además de las vistas temporales estándar, PySpark SQL soporta vistas globales temporales mediante el método createGlobalTempView()
. Estas vistas están disponibles en todas las sesiones dentro de la aplicación Spark actual:
# Creamos una vista global temporal
df.createGlobalTempView("vista_global_temporal")
Para acceder a una vista global temporal, se utiliza el esquema global_temp
en las consultas SQL:
# Consultamos la vista global temporal
resultado = spark.sql("SELECT * FROM global_temp.vista_global_temporal")
Las vistas globales temporales son útiles cuando se trabaja con aplicaciones multi-sesión y se requiere compartir datos entre ellas.
Es posible especificar el formato de almacenamiento al crear tablas permanentes. Por defecto, Spark utiliza el formato Parquet, pero se pueden utilizar otros formatos como JSON, CSV u ORC:
# Guardamos la tabla en formato JSON
df.write.mode("overwrite").format("json").saveAsTable("tabla_json")
También es posible administrar las tablas dentro de diferentes bases de datos en el catálogo. Para crear una tabla en una base de datos específica:
# Seleccionamos o creamos una base de datos
spark.sql("CREATE DATABASE IF NOT EXISTS mi_base_de_datos")
spark.catalog.setCurrentDatabase("mi_base_de_datos")
# Guardamos la tabla en la base de datos seleccionada
df.write.mode("overwrite").saveAsTable("mi_base_de_datos.tabla_en_base_datos")
Para listar las tablas disponibles en el catálogo actual, se puede utilizar:
# Listamos todas las tablas en la base de datos actual
tablas = spark.catalog.listTables()
for tabla in tablas:
print(f"Tabla: {tabla.name}, Tipo: {tabla.tableType}")
Al eliminar tablas o vistas, es esencial distinguir entre ellas. Para eliminar una vista temporal:
# Eliminamos una vista temporal
spark.catalog.dropTempView("vista_temporal")
Para eliminar una tabla permanente del catálogo:
# Eliminamos una tabla permanente
spark.sql("DROP TABLE nombre_tabla")
Es recomendable manejar con cuidado la eliminación de tablas permanentes, ya que esta acción puede resultar en la pérdida de datos almacenados.
Ejecución de consultas SQL en PySpark
Una vez creadas las vistas temporales o tablas permanentes en PySpark, es posible ejecutar consultas SQL para manipular y analizar los datos de manera eficaz. PySpark permite utilizar el lenguaje SQL a través del método spark.sql()
, lo que facilita el trabajo a quienes están familiarizados con SQL y desean aprovechar las capacidades de procesamiento distribuido de Spark.
Por ejemplo, para realizar una consulta que calcule el promedio de ventas por región:
# Ejecución de una consulta SQL que calcula el promedio de ventas por región
resultado = spark.sql("""
SELECT region, AVG(ventas) AS promedio_ventas
FROM tabla_ventas
GROUP BY region
ORDER BY promedio_ventas DESC
""")
En este ejemplo, se obtiene un DataFrame resultado
que contiene el promedio de ventas por región, ordenado de mayor a menor. Esta consulta aprovecha las funciones agregadas de SQL y la capacidad de PySpark para manejar grandes volúmenes de datos.
Las consultas SQL en PySpark pueden incluir operaciones complejas como joins, subconsultas y funciones de ventana. Por ejemplo, para unir dos tablas y filtrar los resultados:
# Consulta SQL que une las tablas de clientes y pedidos
resultado_join = spark.sql("""
SELECT c.nombre_cliente, p.fecha_pedido, p.monto_total
FROM clientes c
INNER JOIN pedidos p ON c.id_cliente = p.id_cliente
WHERE p.fecha_pedido >= '2023-01-01'
""")
Aquí, se realiza un join interno entre las tablas clientes
y pedidos
, obteniendo información combinada de ambas y filtrando los pedidos a partir de una fecha específica. Este tipo de consultas es esencial para relacionar datos de diferentes fuentes.
Es posible combinar consultas SQL con operaciones adicionales en PySpark. Por ejemplo, tras ejecutar una consulta SQL, se pueden aplicar transformaciones adicionales utilizando la API de DataFrames:
# Ejecutamos una consulta SQL y aplicamos una transformación con la API de DataFrames
ventas_por_producto = spark.sql("""
SELECT producto, SUM(cantidad) AS total_vendido
FROM ventas
GROUP BY producto
""")
# Añadimos una columna calculada al resultado
ventas_con_descuento = ventas_por_producto.withColumn("total_con_descuento", ventas_por_producto.total_vendido * 0.9)
En este caso, se calcula el total vendido por producto y luego se aplica un descuento del 10%, demostrando la integración fluida entre SQL y las transformaciones de PySpark.
Para ejecutar consultas SQL dinámicas, se pueden utilizar variables de Python en las cadenas de consulta. Esto permite crear consultas flexibles basadas en parámetros externos:
# Uso de variables en una consulta SQL
fecha_inicio = '2023-01-01'
fecha_fin = '2023-06-30'
consulta_dinamica = f"""
SELECT categoria, COUNT(*) AS num_ventas
FROM ventas
WHERE fecha >= '{fecha_inicio}' AND fecha <= '{fecha_fin}'
GROUP BY categoria
"""
resultado_consulta = spark.sql(consulta_dinamica)
Al construir consultas dinámicamente, es importante tener en cuenta la seguridad y evitar posibles problemas de inyección SQL. Es recomendable validar y sanitizar las entradas cuando se incorporan variables en las consultas.
Las consultas SQL en PySpark también soportan funciones de ventana para cálculos avanzados como rankings, acumulados y promedios móviles. Por ejemplo:
# Cálculo del ranking de ventas por vendedor utilizando una función de ventana
ranking_vendedores = spark.sql("""
SELECT vendedor, ventas_mensuales,
RANK() OVER (ORDER BY ventas_mensuales DESC) AS ranking
FROM (
SELECT vendedor, SUM(monto) AS ventas_mensuales
FROM ventas
GROUP BY vendedor
) subconsulta
""")
En este ejemplo, se determina el ranking de vendedores según sus ventas mensuales, utilizando una subconsulta y una función de ventana RANK()
.
Para organizar consultas complejas, es útil emplear Common Table Expressions (CTEs), que mejoran la legibilidad y el mantenimiento del código:
# Uso de CTEs para estructurar una consulta SQL
resultado_cte = spark.sql("""
WITH ventas_ultimos_meses AS (
SELECT *
FROM ventas
WHERE fecha >= DATE_SUB(CURRENT_DATE(), 90)
)
SELECT categoria, SUM(monto) AS total_ventas
FROM ventas_ultimos_meses
GROUP BY categoria
""")
Aquí, se define una CTE ventas_ultimos_meses
que selecciona las ventas de los últimos 90 días, y luego se calcula el total vendido por categoría.
Las funciones integradas de PySpark SQL incluyen operaciones matemáticas, de cadena, de fecha y más. Por ejemplo, para extraer el año y mes de una fecha y agrupar las ventas mensuales:
# Agrupación de ventas por año y mes
ventas_mensuales = spark.sql("""
SELECT YEAR(fecha) AS anio, MONTH(fecha) AS mes, SUM(monto) AS total_mes
FROM ventas
GROUP BY anio, mes
ORDER BY anio, mes
""")
Este enfoque permite analizar tendencias temporales en los datos de ventas de manera eficiente.
Los resultados de las consultas SQL en PySpark son DataFrames, lo que significa que se pueden utilizar todas las operaciones y métodos disponibles en la API de DataFrames. Por ejemplo, para guardar el resultado en formato Parquet:
# Guardamos el resultado de una consulta SQL en formato Parquet
resultado.write.mode("overwrite").parquet("/ruta/al/directorio/ventas_por_mes")
Además, se pueden realizar acciones como show()
para visualizar los datos en pantalla o collect()
para traer los datos al controlador, teniendo cuidado con el tamaño de los datos para no agotar los recursos.
Es posible registrar una consulta SQL como una vista temporal adicional para reutilizarla en otras consultas:
# Registramos el resultado de una consulta como vista temporal
ventas_mensuales.createOrReplaceTempView("vista_ventas_mensuales")
# Utilizamos la vista en una nueva consulta SQL
analisis_anual = spark.sql("""
SELECT anio, SUM(total_mes) AS ventas_anuales
FROM vista_ventas_mensuales
GROUP BY anio
ORDER BY anio
""")
Esta técnica permite encadenar consultas y organizar el análisis de manera modular.
Al ejecutar consultas SQL en PySpark, es crucial considerar el rendimiento y la optimización. Algunas prácticas recomendadas incluyen:
- Filtrar los datos lo antes posible para reducir el volumen procesado.
- Evitar el uso excesivo de subconsultas anidadas que puedan complicar el plan de ejecución.
- Utilizar funciones integradas en lugar de UDFs cuando sea posible, ya que están optimizadas y son más eficientes.
- Particionar los datos adecuadamente para mejorar la paralelización y el acceso.
Para inspeccionar y depurar las consultas SQL, es posible utilizar el método explain()
que muestra el plan de ejecución físico:
# Mostrar el plan de ejecución de una consulta SQL
plan_ejecucion = spark.sql("""
SELECT categoria, SUM(monto) AS total_categoria
FROM ventas
GROUP BY categoria
""")
plan_ejecucion.explain()
El plan de ejecución proporciona información valiosa sobre cómo PySpark optimiza y ejecuta la consulta, lo que ayuda a identificar posibles cuellos de botella.
Uso de funciones integradas y UDFs (User Defined Functions)
En PySpark SQL, las funciones integradas proporcionan una variedad de operaciones para manipular y analizar datos de forma eficiente y expresiva. Estas funciones abarcan desde manipulaciones de cadenas, operaciones matemáticas, hasta funciones de fecha y agregaciones complejas. Al utilizar estas funciones, es posible realizar transformaciones sofisticadas sin necesidad de implementar lógica adicional en Python.
Por ejemplo, para convertir los nombres de una columna a mayúsculas utilizando la función integrada upper
:
from pyspark.sql.functions import upper
df_nombres_mayusculas = df.withColumn("nombre_mayusculas", upper(df["nombre"]))
En este caso, se aplica la función upper
a la columna nombre
para crear una nueva columna nombre_mayusculas
con los nombres en mayúsculas.
Las funciones integradas también pueden emplearse en consultas SQL directamente. Por ejemplo:
df.createOrReplaceTempView("tabla_clientes")
resultado_sql = spark.sql("""
SELECT nombre, UPPER(nombre) AS nombre_mayusculas
FROM tabla_clientes
""")
PySpark SQL incluye una amplia gama de funciones integradas agrupadas en categorías como:
- Funciones de cadenas:
concat
,substring
,trim
,length
, utilizadas para manipular textos. - Funciones matemáticas:
abs
,sin
,log
,round
, para operaciones numéricas. - Funciones de fechas y tiempos:
current_date
,datediff
,add_months
, esenciales para gestionar fechas. - Funciones de agregación:
sum
,avg
,count
,max
,min
, empleadas en operaciones de resumen y análisis.
Para importar estas funciones, se utiliza el módulo pyspark.sql.functions
. Por ejemplo, para calcular el año de una fecha:
from pyspark.sql.functions import year
df_con_anio = df.withColumn("anio", year(df["fecha"]))
Aquí, se extrae el año de la columna fecha
y se añade como una nueva columna anio
.
Cuando las funciones integradas no cubren una necesidad específica, es posible definir UDFs (User Defined Functions) personalizadas. Las UDFs permiten incorporar lógica definida en Python y aplicarla sobre columnas de un DataFrame.
Para crear una UDF, se utiliza el decorador @udf
o la función udf
junto con el tipo de dato de retorno. Por ejemplo, para clasificar edades:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def clasificar_edad(edad):
if edad < 18:
return "Menor"
elif edad <= 65:
return "Adulto"
else:
return "Mayor"
clasificar_edad_udf = udf(clasificar_edad, StringType())
df_clasificado = df.withColumn("categoria_edad", clasificar_edad_udf(df["edad"]))
En este ejemplo, se define una función clasificar_edad
que asigna una categoría según la edad, y luego se registra como UDF para aplicarla a la columna edad
.
Es posible registrar la UDF para su uso en consultas SQL:
spark.udf.register("clasificar_edad_sql", clasificar_edad, StringType())
resultado_categoria = spark.sql("""
SELECT nombre, clasificar_edad_sql(edad) AS categoria_edad
FROM tabla_clientes
""")
Sin embargo, las UDFs tradicionales pueden tener limitaciones de rendimiento debido a la serialización de datos entre la JVM y Python. Para abordar esto, PySpark introduce las pandas UDFs, también conocidas como vectorized UDFs, que utilizan Apache Arrow para mejorar la eficiencia.
Las pandas UDFs trabajan con pandas Series y procesan datos en batches. Por ejemplo:
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def calcular_impuesto(precio):
return precio * 0.21
df_con_impuesto = df.withColumn("precio_con_impuesto", calcular_impuesto(df["precio"]))
Aquí, se calcula el impuesto sobre la columna precio
utilizando una pandas UDF, lo que mejora el rendimiento al procesar los datos de forma vectorizada.
Al utilizar UDFs, es importante considerar las buenas prácticas:
- Preferir funciones integradas siempre que sea posible, ya que están optimizadas para el rendimiento.
- Utilizar pandas UDFs en lugar de UDFs tradicionales para aprovechar el procesamiento vectorizado.
- Especificar claramente los tipos de datos de entrada y retorno para evitar errores de ejecución.
- Mantener las UDFs simples y eficientes, evitando operaciones costosas dentro de ellas.
Además, PySpark SQL permite utilizar funciones de orden superior, como transform
, filter
y aggregate
, para manipular estructuras de datos complejas sin necesidad de UDFs. Por ejemplo:
from pyspark.sql.functions import transform
df_listas = df.withColumn("lista_doble", transform(df["lista"], lambda x: x * 2))
En este caso, se duplica cada elemento en la columna lista
utilizando la función de orden superior transform
.
También es posible utilizar expresiones SQL dentro de la API de DataFrame con la función expr
:
from pyspark.sql.functions import expr
df_calculado = df.withColumn("indice", expr("columna_a / columna_b"))
Aquí, se realiza un cálculo directo entre dos columnas utilizando una expresión SQL.
Optimización de consultas con el Catalyst Optimizer
El Catalyst Optimizer es el componente central del motor de Apache Spark SQL que se encarga de optimizar las consultas y operaciones realizadas sobre los DataFrames y SQL. Su objetivo es mejorar el rendimiento de las aplicaciones mediante la generación de planes de ejecución eficientes, transformando las consultas en operaciones optimizadas que aprovechan al máximo los recursos del clúster.
El proceso de optimización del Catalyst Optimizer se divide en varias fases:
- Análisis: Se analiza la consulta para verificar su validez sintáctica y semántica. Durante esta fase, el optimizador resuelve referencias a columnas y tablas, y aplica reglas para convertir las expresiones a una representación interna.
- Optimización lógica: Se transforma el plan lógico inicial de la consulta aplicando una serie de reglas de optimización. Estas reglas incluyen la simplificación de expresiones, la eliminación de proyecciones redundantes, el reordenamiento de operaciones y la aplicación de filtros lo antes posible (predicado pushdown).
- Optimización física: Se selecciona la estrategia de ejecución más eficiente para cada operación, eligiendo entre diferentes métodos de unión (join) o agregación. El optimizador considera factores como el tamaño de los datos y su distribución para decidir, por ejemplo, si emplear un sort-merge join o un broadcast hash join.
- Generación del código: Se genera código bytecode optimizado utilizando técnicas de Whole-Stage Code Generation, que compila múltiples operadores en una única etapa para reducir la sobrecarga y mejorar el rendimiento.
Para aprovechar al máximo las capacidades del Catalyst Optimizer, es importante seguir ciertas prácticas al escribir consultas y operaciones en PySpark:
- Utilizar funciones integradas: Las funciones nativas de PySpark están diseñadas para ser reconocidas y optimizadas por el Catalyst Optimizer. Evitar el uso de UDFs (User Defined Functions) cuando sea posible, ya que estas pueden impedir algunas optimizaciones.
- Aplicar filtros anticipadamente: Incorporar los filtros o condiciones de selección lo más pronto posible en la consulta permite que el optimizador reduzca el conjunto de datos a procesar. Esto se conoce como predicado pushdown y ayuda a disminuir el volumen de datos en las etapas iniciales.
- Seleccionar únicamente las columnas necesarias: Limitar las columnas seleccionadas evita transferir y procesar datos innecesarios. El Catalyst Optimizer puede eliminar columnas no utilizadas, pero especificar explícitamente las columnas requeridas mejora la eficiencia.
- Explotar las particiones: Conocer cómo están particionados los datos y utilizar operaciones que respeten esta partición puede reducir la necesidad de movimientos de datos costosos. Funciones como
repartition()
ycoalesce()
permiten controlar el particionamiento de los DataFrames.
Es posible inspeccionar el plan de ejecución generado por el Catalyst Optimizer utilizando el método explain()
. Este método muestra el plan lógico y físico que se ejecutará, lo que ayuda a identificar posibles mejoras. Por ejemplo:
consulta = df.filter(df["edad"] > 30).select("nombre", "edad")
consulta.explain()
El resultado proporcionará información detallada sobre las operaciones que realizará Spark, incluyendo las optimizaciones aplicadas. Para obtener un plan más detallado, se puede utilizar:
consulta.explain(mode="extended")
o para ver el plan físico optimizado:
consulta.explain(mode="cost")
Al analizar el plan de ejecución, es importante prestar atención a aspectos como:
- Operadores Exchange: Indican movimientos de datos entre nodos, que pueden ser costosos. Reducir la cantidad de intercambios puede mejorar el rendimiento.
- Broadcast Joins: En situaciones donde una de las tablas es pequeña, el optimizador puede decidir realizar un broadcast join, enviando la tabla pequeña a todos los nodos para evitar costosos shuffle operations. Esto se puede controlar mediante la configuración
spark.sql.autoBroadcastJoinThreshold
. - Eliminación de redundancias: El optimizador es capaz de eliminar operaciones innecesarias, como filtros o proyecciones duplicadas. Asegurarse de no introducir redundancias que puedan complicar el plan de ejecución.
La configuración de Spark influye en el comportamiento del Catalyst Optimizer. Algunos parámetros clave que pueden ajustarse incluyen:
spark.sql.shuffle.partitions
: Define el número de particiones utilizadas en operaciones de shuffle. Ajustar este valor según el tamaño de los datos y los recursos disponibles puede mejorar el rendimiento.spark.sql.autoBroadcastJoinThreshold
: Establece el tamaño máximo de una tabla para que sea considerada en un broadcast join. Incrementar este valor puede permitir más broadcast joins, pero consume más memoria.spark.sql.cbo.enabled
: Activa el Cost-Based Optimizer, que utiliza estadísticas de los datos para mejorar el plan de ejecución. Para que funcione eficientemente, es necesario generar estadísticas medianteANALYZE TABLE
.
Para generar estadísticas de una tabla y habilitar el Cost-Based Optimizer:
spark.sql("ANALYZE TABLE mi_tabla COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE mi_tabla COMPUTE STATISTICS FOR COLUMNS columna1, columna2")
Con las estadísticas disponibles, el optimizador puede tomar decisiones más informadas sobre la estrategia de ejecución.
Otro aspecto a considerar es el uso de caching. Al almacenar en caché un DataFrame o una tabla, se evita recomputar operaciones costosas en operaciones subsiguientes:
df.cache()
df.count() # Acción para materializar el cache
Es importante utilizar el caching de manera selectiva, ya que un uso excesivo puede saturar la memoria del clúster.
Además, estructurar las consultas de manera que el Catalyst Optimizer pueda aplicar optimizaciones simplificadoras es beneficioso. Por ejemplo, al combinar múltiples filtros:
# Evitar esto
df.filter(df["edad"] > 30).filter(df["edad"] < 50)
# Preferir esto
df.filter((df["edad"] > 30) & (df["edad"] < 50))
La segunda forma permite que el optimizador combine las condiciones y aplique una sola operación de filtrado más eficiente.
También es recomendable ordenar las operaciones para minimizar el volumen de datos procesados en cada etapa. Por ejemplo, filtrar y seleccionar columnas antes de realizar joins o agregaciones.
El conocimiento de los detalles internos del Catalyst Optimizer puede ayudar a escribir código más eficiente y predecible. Sin embargo, es importante mantener el código claro y legible, permitiendo que el optimizador haga su trabajo sin sobrecomplicar las consultas.
Todas las lecciones de PySpark
Accede a todas las lecciones de PySpark y aprende con ejemplos prácticos de código y ejercicios de programación con IDE web sin instalar nada.
Introducción A Pyspark
Introducción Y Entorno
Instalación De Pyspark
Introducción Y Entorno
Fundamentos De Pyspark
Introducción Y Entorno
Manipulación Y Análisis De Datos Con Pyspark
Transformación De Datos
Pyspark Sql
Transformación De Datos
Trabajo Con Datos Complejos
Transformación De Datos
Introducción A Mllib
Aprendizaje Automático
Preparación De Datos Para Mllib
Aprendizaje Automático
Regresión Con Mllib
Aprendizaje Automático
Clasificación Con Mllib
Aprendizaje Automático
Modelos De Clustering
Aprendizaje Automático
Reducción De La Dimensionalidad
Aprendizaje Automático
Recomendación
Aprendizaje Automático
Pipelines
Aprendizaje Automático
Mllib Con Scikit Learn
Integraciones
Mllib Con Tensorflow
Integraciones
En esta lección
Objetivos de aprendizaje de esta lección
- Comprender la estructura y funcionalidad de PySpark SQL.
- Crear y gestionar DataFrames.
- Integrar y manipular datos usando SQL estándar.
- Aplicar optimizaciones con Catalyst Optimizer.
- Utilizar funciones integradas y UDFs en PySpark.