Logotipo de Zephyrnet

Trabajar con funciones de ventana en PySpark

Fecha:

Introducción

Aprender sobre las funciones de ventana en PySpark puede ser un desafío, pero vale la pena el esfuerzo. Las funciones de ventana son una herramienta poderosa para analizar datos y pueden ayudarlo a obtener información que de otra manera no habría visto. Al comprender cómo utilizar las funciones de ventana en Spark; puedes tomar tu análisis de los datos habilidades al siguiente nivel y tomar decisiones más informadas. Ya sea que esté trabajando con grandes o pequeños conjuntos de datos, aprender funciones de ventana en Spark le permitirá manipular y analizar datos de formas nuevas y emocionantes.

Funciones de ventana en PySpark

En este blog, primero comprenderemos el concepto de funciones de ventana y luego discutiremos cómo usarlas con Spark SQL y PySpark DataFrame API. De modo que, al final de este artículo, comprenderá cómo utilizar las funciones de ventana con conjuntos de datos reales y obtendrá información esencial para los negocios.

OBJETIVOS DE APRENDIZAJE

  • Comprender el concepto de funciones de ventana.
  • Trabajar con funciones de ventana utilizando conjuntos de datos.
  • Descubra la información utilizando las funciones de la ventana.
  • Utilice Spark SQL y DataFrame API para trabajar con funciones de ventana.

Este artículo fue publicado como parte del Blogatón de ciencia de datos.

Tabla de contenidos.

¿Qué son las funciones de ventana?

Las funciones de ventana ayudan a analizar datos dentro de un grupo de filas relacionadas entre sí. Permiten a los usuarios realizar transformaciones complejas en las filas de un marco de datos o conjunto de datos asociados entre sí en función de algunos criterios de partición y ordenamiento.

Las funciones de ventana operan en una partición específica de un marco de datos o conjunto de datos definido por un conjunto de columnas de partición. El ORDEN POR La cláusula divide los datos en una función de ventana para organizarlos en un orden específico. Luego, las funciones de ventana realizan cálculos en una ventana deslizante de filas que incluye la fila actual y un subconjunto de las filas anteriores 'y'/'o' siguientes, como se especifica en el marco de la ventana.

Trabajar con funciones de ventana en PySpark

Algunos ejemplos comunes de funciones de ventana incluyen calcular promedios móviles, clasificar u ordenar filas en función de una columna o grupo específico de columnas, calcular totales acumulados y encontrar el primer o último valor en un grupo de filas. Con las potentes funciones de ventana de Spark, los usuarios pueden realizar análisis y agregaciones complejos sobre grandes conjuntos de datos con relativa facilidad, lo que la convierte en una herramienta popular para grandes proceso de datos y analítica.

"

Funciones de ventana en SQL

Spark SQL admite tres tipos de funciones de ventana:

  • Funciones de clasificación: - Estas funciones asignan una clasificación a cada fila dentro de una partición del conjunto de resultados. Por ejemplo, la función ROW_NUMBER() proporciona un número secuencial único a cada fila dentro de la partición.
  • Funciones de análisis: - Estas funciones calculan valores agregados en una ventana de filas. Por ejemplo, la función SUMA() calcula la suma de una columna en una ventana de filas.
  • Funciones de valor: - Estas funciones calculan un valor analítico para cada fila de una partición, en función de los valores de otras filas de la misma partición. Por ejemplo, la función LAG() devuelve el valor de una columna de la fila anterior de la partición.

Creación de marcos de datos

Crearemos un marco de datos de muestra para que podamos trabajar prácticamente con diferentes funciones de ventana. También intentaremos responder algunas preguntas con la ayuda de estos datos y funciones de ventana.

El marco de datos tiene detalles de los empleados como su nombre, designación, número de empleado, fecha de contratación, salario, etc. En total tenemos 8 columnas que son las siguientes:

  • 'empno': Esta columna contiene el número del empleado.
  • 'ename': esta columna tiene nombres de empleados.
  • 'trabajo': esta columna contiene información sobre los puestos de trabajo de los empleados.
  • 'hiredate': esta columna muestra la fecha de contratación del empleado.
  • 'sal': los detalles del salario se encuentran en esta columna.
  • 'comm': esta columna tiene detalles de la comisión de los empleados, si corresponde.
  • 'deptno': En esta columna se encuentra el número de departamento al que pertenece el empleado.
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()

# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename|      job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH|    CLERK|17-Dec-80| 800|  20|    10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300|    30|
| 7521|  WARD| SALESMAN|22-Feb-81|1250| 500|    30|
| 7566| JONES|  MANAGER| 2-Apr-81|2975|   0|    20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400|    30|
| 7698| BLAKE|  MANAGER| 1-May-81|2850|   0|    30|
| 7782| CLARK|  MANAGER| 9-Jun-81|2450|   0|    10|
| 7788| SCOTT|  ANALYST|19-Apr-87|3000|   0|    20|
| 7629|  ALEX| SALESMAN|28-Sep-79|1150|1400|    30|
| 7839|  KING|PRESIDENT|17-Nov-81|5000|   0|    10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500|   0|    30|
| 7876| ADAMS|    CLERK|23-May-87|1100|   0|    20|
+-----+------+---------+---------+----+----+------+

Ahora comprobaremos el esquema:

# Checking the schema

emp_df.printSchema()

# Output:-
root
 |-- empno: long (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- comm: long (nullable = true)
 |-- deptno: long (nullable = true)

Cree una vista temporal del DataFrame 'emp_df' con el nombre "emp". Nos permite consultar el DataFrame usando la sintaxis SQL en Spark SQL como si fuera una tabla. La vista temporal solo es válida mientras dure la sesión de Spark.

emp_df.createOrReplaceTempView("emp")

Resolver planteamientos de problemas utilizando funciones de ventana

Aquí resolveremos varios planteamientos de problemas utilizando funciones de Windows:

P1. Clasifique el salario dentro de cada departamento.

# Using spark sql

rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()

# Using PySpark

windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
            ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
            F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

# Output:-
+-----+------+---------+------+----+----+
|empno| ename|      job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839|  KING|PRESIDENT|    10|5000|   1|
| 7782| CLARK|  MANAGER|    10|2450|   2|
| 7369| SMITH|    CLERK|    10| 800|   3|
| 7788| SCOTT|  ANALYST|    20|3000|   1|
| 7566| JONES|  MANAGER|    20|2975|   2|
| 7876| ADAMS|    CLERK|    20|1100|   3|
| 7698| BLAKE|  MANAGER|    30|2850|   1|
| 7499| ALLEN| SALESMAN|    30|1600|   2|
| 7844|TURNER| SALESMAN|    30|1500|   3|
| 7521|  WARD| SALESMAN|    30|1250|   4|
| 7654|MARTIN| SALESMAN|    30|1250|   4|
| 7629|  ALEX| SALESMAN|    30|1150|   6|
+-----+------+---------+------+----+----+

Enfoque para el código PySpark

  • La función de ventana divide los datos por número de departamento usando particiónBy(col('deptno')) y luego ordena los datos dentro de cada partición por salario en orden descendente usando orderBy(col('sal').desc()). La variable windowSpec contiene la especificación final de la ventana.
  • 'emp_df' es el marco de datos que contiene datos de los empleados, incluidas las columnas para empno, ename, job, deptno y sal.
  • La función de clasificación se aplica a la columna de salario usando 'F.rank().over(windowSpec)' dentro de la declaración de selección. La columna resultante tiene un nombre de alias como "rango".
  • Creará un marco de datos, 'ranking_result_df', que incluye empno, ename, trabajo, deptno y salario. También tiene una nueva columna, "rango", que representa el rango del salario del empleado dentro de su departamento.

Salida:

El resultado tiene rango salarial en cada departamento.

P2. Rango denso el salario dentro de cada departamento.

# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                      F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()

# Output:-
+-----+------+---------+------+----+----------+
|empno| ename|      job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839|  KING|PRESIDENT|    10|5000|         1|
| 7782| CLARK|  MANAGER|    10|2450|         2|
| 7369| SMITH|    CLERK|    10| 800|         3|
| 7788| SCOTT|  ANALYST|    20|3000|         1|
| 7566| JONES|  MANAGER|    20|2975|         2|
| 7876| ADAMS|    CLERK|    20|1100|         3|
| 7698| BLAKE|  MANAGER|    30|2850|         1|
| 7499| ALLEN| SALESMAN|    30|1600|         2|
| 7844|TURNER| SALESMAN|    30|1500|         3|
| 7521|  WARD| SALESMAN|    30|1250|         4|
| 7654|MARTIN| SALESMAN|    30|1250|         4|
| 7629|  ALEX| SALESMAN|    30|1150|         5|
+-----+------+---------+------+----+----------+

Enfoque para el código PySpark

  • Primero, cree una especificación de ventana usando la función Ventana, que divide el DataFrame 'emp_df' por deptno y lo ordena descendentemente por la columna 'sal'.
  • Luego, la función densa_rank() se aplica sobre la especificación de la ventana, que asigna una clasificación densa a cada fila dentro de cada partición según su orden de clasificación.
  • Finalmente, se crea un nuevo DataFrame llamado 'dense_ranking_df' seleccionando columnas específicas de emp_df (es decir, 'empno', 'ename', 'job', 'deptno' y 'sal') y agregando una nueva columna 'dense_rank' que contiene los valores de clasificación densos calculados por la función de ventana.
  • Por último, muestre el DataFrame resultante en formato tabular.

Salida:

El resultado tiene un rango denso en términos salariales.

P3. Numere la fila dentro de cada departamento.

# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

# Output:-
+-----+------+---------+------+----+-------+
|empno| ename|      job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839|  KING|PRESIDENT|    10|5000|      1|
| 7782| CLARK|  MANAGER|    10|2450|      2|
| 7369| SMITH|    CLERK|    10| 800|      3|
| 7788| SCOTT|  ANALYST|    20|3000|      1|
| 7566| JONES|  MANAGER|    20|2975|      2|
| 7876| ADAMS|    CLERK|    20|1100|      3|
| 7698| BLAKE|  MANAGER|    30|2850|      1|
| 7499| ALLEN| SALESMAN|    30|1600|      2|
| 7844|TURNER| SALESMAN|    30|1500|      3|
| 7521|  WARD| SALESMAN|    30|1250|      4|
| 7654|MARTIN| SALESMAN|    30|1250|      5|
| 7629|  ALEX| SALESMAN|    30|1150|      6|
+-----+------+---------+------+----+-------+

Enfoque para el código PySpark

  • La primera línea define una especificación de ventana para el cálculo utilizando las funciones Window.partitionBy() y Window.orderBy(). Esta ventana está dividida por la columna deptno y ordenada por la columna sal en orden descendente.
  • La segunda línea crea un nuevo DataFrame llamado 'row_num_df', una proyección de 'emp_df' con una columna adicional llamada 'row_num' y contiene los detalles de los números de fila.
  • La función show() muestra el DataFrame resultante, que muestra las columnas empno, ename, job, deptno, sal y row_num de cada empleado.

Salida:

El resultado tendrá el número de fila de cada empleado dentro de su departamento según su salario.

P4. Suma total corriente del salario dentro de cada departamento.

# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename|      job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839|  KING|PRESIDENT|    10|5000|         5000|
| 7782| CLARK|  MANAGER|    10|2450|         7450|
| 7369| SMITH|    CLERK|    10| 800|         8250|
| 7788| SCOTT|  ANALYST|    20|3000|         3000|
| 7566| JONES|  MANAGER|    20|2975|         5975|
| 7876| ADAMS|    CLERK|    20|1100|         7075|
| 7698| BLAKE|  MANAGER|    30|2850|         2850|
| 7499| ALLEN| SALESMAN|    30|1600|         4450|
| 7844|TURNER| SALESMAN|    30|1500|         5950|
| 7521|  WARD| SALESMAN|    30|1250|         8450|
| 7654|MARTIN| SALESMAN|    30|1250|         8450|
| 7629|  ALEX| SALESMAN|    30|1150|         9600|
+-----+------+---------+------+----+-------------+

Enfoque para código PySpark

  • Primero, se define una especificación de ventana utilizando los métodos “Window.partitionBy()” y “Window.orderBy()”. El método “partitionBy()” divide los datos por la columna “deptno”, mientras que el método “orderBy()” ordena los datos por la columna “sal” en orden descendente.
  • A continuación, la función "suma()" se aplica a la columna "sal" utilizando el método "over()" para calcular el total acumulado de salarios dentro de cada departamento. El resultado estará en un nuevo DataFrame llamado "running_sum_sal_df", que contiene las columnas 'empno', 'ename', 'job', 'deptno', 'sal' y 'running_total'.
  • Finalmente, se llama al método "show()" en el DataFrame "running_sum_sal_df" para mostrar el resultado de la consulta. El DataFrame resultante muestra el total acumulado de salarios de cada empleado y otros detalles como nombre, número de departamento y trabajo.

Salida:

El resultado tendrá un total acumulado de los datos salariales de cada departamento.

P5: El próximo salario dentro de cada departamento.

Para encontrar el próximo salario dentro de cada departamento utilizamos la función LEAD. 

La función de ventana lead() ayuda a obtener el valor de la expresión en la siguiente fila de la partición de la ventana. Devuelve una columna para cada columna de entrada, donde cada columna contendrá el valor de la columna de entrada para la fila de desplazamiento encima de la fila actual dentro de la partición de la ventana. La sintaxis de la función principal es: principal (col, desplazamiento = 1, predeterminado = Ninguno).

# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|    null|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|    null|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|    null|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    2450|
| 7782| CLARK|  MANAGER|    10|2450|     800|
| 7369| SMITH|    CLERK|    10| 800|       0|
| 7788| SCOTT|  ANALYST|    20|3000|    2975|
| 7566| JONES|  MANAGER|    20|2975|    1100|
| 7876| ADAMS|    CLERK|    20|1100|       0|
| 7698| BLAKE|  MANAGER|    30|2850|    1600|
| 7499| ALLEN| SALESMAN|    30|1600|    1500|
| 7844|TURNER| SALESMAN|    30|1500|    1250|
| 7521|  WARD| SALESMAN|    30|1250|    1250|
| 7654|MARTIN| SALESMAN|    30|1250|    1150|
| 7629|  ALEX| SALESMAN|    30|1150|       0|
+-----+------+---------+------+----+--------+

Enfoque para el código PySpark

  • Primero, la función de ventana ayuda a dividir las filas del DataFrame por número de departamento (deptno) y ordenar los salarios en orden descendente dentro de cada partición.
  • Luego, la función lead() se aplica a la columna 'sal' ordenada dentro de cada partición para devolver el salario del siguiente empleado (con un desplazamiento de 1), y el valor predeterminado es 0 en caso de que no haya un siguiente empleado.
  • El DataFrame resultante 'next_salary_df' contiene columnas para el número de empleado (empno), nombre (ename), puesto de trabajo (job), número de departamento (deptno), salario actual (sal) y próximo salario (next_val).

Salida:

El resultado contiene el salario del siguiente empleado del departamento según el orden de salario descendente. 

P6. Salario anterior dentro de cada departamento.

Para calcular el salario anterior utilizamos la función LAG.

La función de retraso devuelve el valor de una expresión en un desplazamiento determinado antes de la fila actual dentro de la partición de la ventana. La sintaxis de la función de retraso es: - retraso (expr, desplazamiento = 1, predeterminado = Ninguno). over (ventanaSpec).

# Using Spark SQL
preious_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LAG(sal, 1) 
           OVER (PARTITION BY deptno ORDER BY sal DESC) 
           AS prev_val FROM emp
         """)
preious_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|    null|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|    null|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|    null|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()

# Output:-
+-----+------+---------+------+----+--------+
|empno| ename|      job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839|  KING|PRESIDENT|    10|5000|       0|
| 7782| CLARK|  MANAGER|    10|2450|    5000|
| 7369| SMITH|    CLERK|    10| 800|    2450|
| 7788| SCOTT|  ANALYST|    20|3000|       0|
| 7566| JONES|  MANAGER|    20|2975|    3000|
| 7876| ADAMS|    CLERK|    20|1100|    2975|
| 7698| BLAKE|  MANAGER|    30|2850|       0|
| 7499| ALLEN| SALESMAN|    30|1600|    2850|
| 7844|TURNER| SALESMAN|    30|1500|    1600|
| 7521|  WARD| SALESMAN|    30|1250|    1500|
| 7654|MARTIN| SALESMAN|    30|1250|    1250|
| 7629|  ALEX| SALESMAN|    30|1150|    1250|
+-----+------+---------+------+----+--------+

Enfoque para el código PySpark

  • El window.partitionBy(col('deptno')) especifica la partición de la ventana. Significa que la función de ventana funciona por separado para cada departamento.
  • Luego orderBy(col('sal').desc()) especifica el orden del salario y ordenará los salarios dentro de cada departamento en orden descendente.
  • F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val') crea una nueva columna llamada prev_val en el DataFrame 'prev_sal_df'.
  • Para cada fila, esta columna contiene el valor de la columna 'sal' de la fila anterior dentro de la ventana definida por windowSpec.
  • El parámetro offset=1 indica que la fila anterior debe estar una fila antes de la fila actual, y default=0 especifica el valor predeterminado para la primera fila en cada partición (ya que no hay una fila anterior para la primera fila).
  • Finalmente, prev_sal_df.show() muestra el DataFrame resultante.

Salida:

El resultado representa el salario anterior de cada empleado dentro de cada departamento, según el orden de los salarios en orden descendente.

P7. Primer salario dentro de cada departamento y comparándolo con cada miembro dentro de cada departamento.

# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal, 
                   FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
                   AS first_val FROM emp """)
first_val_df.show()

# Using PySpark 
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                   F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()

# Output:-
+-----+------+---------+------+----+---------+
|empno| ename|      job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839|  KING|PRESIDENT|    10|5000|     5000|
| 7782| CLARK|  MANAGER|    10|2450|     5000|
| 7369| SMITH|    CLERK|    10| 800|     5000|
| 7788| SCOTT|  ANALYST|    20|3000|     3000|
| 7566| JONES|  MANAGER|    20|2975|     3000|
| 7876| ADAMS|    CLERK|    20|1100|     3000|
| 7698| BLAKE|  MANAGER|    30|2850|     2850|
| 7499| ALLEN| SALESMAN|    30|1600|     2850|
| 7844|TURNER| SALESMAN|    30|1500|     2850|
| 7521|  WARD| SALESMAN|    30|1250|     2850|
| 7654|MARTIN| SALESMAN|    30|1250|     2850|
| 7629|  ALEX| SALESMAN|    30|1150|     2850|
+-----+------+---------+------+----+---------+

Enfoque para el código PySpark

  • Primero, cree un objeto WindowSpec que divida los datos por número de departamento (deptno) y los ordene por salario (sal) en orden descendente.
  • Luego aplica la función analítica first() a la columna 'sal' sobre la ventana definida por windowSpec. Esta función devuelve el primer valor de la columna 'sal' dentro de cada partición (es decir, cada grupo de departamento) ordenado por 'sal' descendente. La columna resultante tiene un nuevo nombre, 'first_val'.
  • Ahora asigna el DataFrame resultante, que contiene las columnas seleccionadas y una nueva columna, 'first_val', que muestra el primer salario más alto para cada departamento según el orden descendente de los valores salariales, a una nueva variable llamada 'first_value_df'.

Salida:

El resultado muestra el primer salario más alto para cada departamento en un DataFrame de empleado.

Conclusión

En este artículo, aprendemos sobre las funciones de ventana. Spark SQL tiene tres tipos de funciones de ventana: funciones de clasificación, funciones agregadas y funciones de valor. Utilizando esta función, trabajamos en un conjunto de datos para encontrar información importante y valiosa. Spark Window Functions ofrece potentes herramientas de análisis de datos como clasificación, análisis y cálculos de valor. Ya sea analizando información salarial por departamento o empleando ejemplos prácticos con PySpark y SQL, estas funciones proporcionan herramientas esenciales para el procesamiento y análisis de datos efectivos en Spark.

Puntos clave

  • Aprendimos sobre las funciones de ventana y trabajamos con ellas usando Spark SQL y PySpark DataFrame API.
  • Usamos funciones como rango, rango_denso, número_fila, retraso, avance, grupoBy, particiónBy y otras funciones para proporcionar un análisis adecuado.
  • También vimos las soluciones detalladas paso a paso del problema y analizamos el resultado al final de cada planteamiento del problema.

Este estudio de caso le ayuda a comprender mejor las funciones de PySpark. Si tiene alguna opinión o pregunta, comente a continuación. Conéctate conmigo en Etiqueta LinkedIn para mayor discusión. ¡¡¡Seguir aprendiendo!!!

Los medios que se muestran en este artículo no son propiedad de Analytics Vidhya y se utilizan a discreción del autor.

punto_img

Información más reciente

punto_img