Pyspark - Función de retraso

Pyspark - Función de retraso
La función Lag () en Pyspark está disponible en el módulo de ventana que se utiliza para devolver los valores de filas anteriores a las filas actuales. FirstL, la función lag () devuelve nulo para las filas superiores. Toma un parámetro de compensación que representa el número total de filas de modo que los valores de la fila anterior se devuelven a las siguientes filas. Para las primeras filas superiores, se colocan los nulos (desplazados).

Es posible particionar las filas en el marco de datos utilizando la función de la ventana. Está disponible en el pyspark.sql.ventana módulo.

Sintaxis:

dataframe_obj.WithColumn ("Lag_column", lag ("columna", offset).sobre (partición))

Se necesitan dos parámetros:

  1. La columna es el nombre de la columna en Pyspark DataFrame en el que los valores de la fila rezagados se colocan en función de los valores en esta columna.
  2. El desplazamiento especifica el entero para devolver ese número de filas anteriores a los valores de la fila actual.

Pasos:

  1. Cree un marco de datos de Pyspark que tenga algunos valores similares en al menos una columna.
  2. Partition los datos utilizando el método PartitionBy () disponible en la función de la ventana y pídalos en función de la columna utilizando la función OrderBy ().

Sintaxis:

partición = ventana.Partitionby ("columna").Orderby ("columna")

Podemos ordenar los datos particionados con la columna particionada o cualquier otra columna.

Ahora, puede usar la función Lag () en las filas particionadas utilizando el encima() función.

Agregamos una columna para almacenar el número de fila usando el WithColumn () función.

Sintaxis:

dataframe_obj.WithColumn ("Lag_column", lag ("columna", offset).sobre (partición))

Aquí, el nombre especifica el nombre de la fila y el DataFrame_obj es nuestro Pyspark DataFrame.

Implementemos el código.

Ejemplo 1:

Aquí, creamos un marco de datos Pyspark que tiene 5 columnas: ['ujem_id', 'nombre', 'edad', 'tecnología1', 'tecnología2'] con 10 filas y divididos en las filas basadas en las filas basadas Tecnología1 Usando la función de ventana. Después de eso, retrasamos 1 fila.

importar pyspark
de Pyspark.Importación SQL *
Spark_app = Sparksession.constructor.nombre de la aplicación('_').getorcreate ()
Estudiantes = [(4, 'Sravan', 23, 'Php', 'Pruebas'),
(4, 'Sravan', 23, 'Php', 'Pruebas'),
(46, 'Mounika', 22, '.Net ',' html '),
(4, 'Deepika', 21, 'Oracle', 'html'),
(46, 'Mounika', 22, 'Oracle', 'Prueba'),
(12, 'Chandrika', 22, 'Hadoop', 'C#'),
(12, 'Chandrika', 22, 'Oracle', 'Prueba'),
(4, 'Sravan', 23, 'Oracle', 'C#'),
(4, 'Deepika', 21, 'Php', 'C#'),
(46, 'Mounika', 22, '.Net ',' prueba ')
]
dataFrame_OBJ = Spark_App.creatataFrame (estudiantes, ['temas_id', 'nombre', 'edad', 'tecnología1', 'tecnología2'])
imprimir ("---------- DataFrame ----------")
dataframe_obj.espectáculo()
# Importar la función de la ventana
de Pyspark.sql.Ventana de importación de ventana
#Importa el retraso de Pyspark.sql.funciones
de Pyspark.sql.Funciones Importación LAG
#Partition El DataFrame basado en los valores en la columna Technology1 y
#Oder las filas en cada partición basada en la columna TIEMPLE_ID
partición = ventana.Partitionby ("Technology1").Orderby ('temas_id')
imprimir ("---------- DataFrame dividido ----------")
#Ahora Mencione el retraso con offset-1 basado en temas_id
dataframe_obj.WithColumn ("LAG", LAG ("TUJET_ID", 1).sobre (partición)).espectáculo()

Producción:

Explicación:

En la primera salida representa los datos reales presentes en el marco de datos. En la segunda salida, la partición se realiza en base a la Tecnología1 columna.

El número total de particiones es 4.

Partición 1:

El .La red ocurrió dos veces en la primera partición. Desde que especificamos la compensación de retraso como 1, el primero .El valor neto es nulo y el siguiente .El valor neto es el valor de la fila anterior, 46 - 46.

Partición 2:

Hadoop ocurrió una vez en la segunda partición. Entonces, el retraso es nulo.

Partición 3:

Oracle ocurrió cuatro veces en la tercera partición.

Para el primer oráculo, el retraso es nulo.

Para el segundo oráculo, el valor de retraso es 4 (ya que el valor de la fila anterior de la fila es 4).

Para el tercer oráculo, el valor de retraso es 4 (ya que el valor de la fila anterior de la fila es 4).

Para el cuarto oráculo, el valor de retraso es 12 (ya que el valor de la fila anterior de la fila es 12).

Partición 4:

PHP ocurrió tres veces en la cuarta partición.

El valor de retraso para el primer PHP es nulo.

El valor de retraso para el segundo PHP es 4 (ya que el valor de la fila anterior de la fila es 4).

El valor de retraso para el 3er PHP es 4 (ya que el valor de la fila anterior de la fila es 4).

Ejemplo 2:

Retrasar las filas por 2. Asegúrese de crear el Pyspark DataFrame como se ve en el Ejemplo 1.

# Importar la función de la ventana
de Pyspark.sql.Ventana de importación de ventana
#Importa el retraso de Pyspark.sql.funciones
de Pyspark.sql.Funciones Importación LAG
#Partition El DataFrame basado en los valores en la columna Technology1 y
#Oder las filas en cada partición basada en la columna TIEMPLE_ID
partición = ventana.Partitionby ("Technology1").Orderby ('temas_id')
imprimir ("---------- DataFrame dividido ----------")
#Ahora mención el retraso con offset-2 basado en temas_id
dataframe_obj.WithColumn ("LAG", LAG ("TEUNT_ID", 2).sobre (partición)).espectáculo()

Producción:

Explicación:

La partición se realiza en base a la Tecnología1 columna. El número total de particiones es 4.

Partición 1:

El .La red ocurrió dos veces en la primera partición. Dado que especificamos la compensación de retraso como 2, el desplazamiento es nulo para ambos valores.

Partición 2:

Hadoop ocurrió una vez en la segunda partición. Entonces, el retraso es nulo.

Partición 3:

Oracle ocurrió cuatro veces en la tercera partición.

Para el primer y segundo oráculo, el retraso es nulo.

Para el tercer oráculo, el valor de retraso es 4 (ya que las 2 filas anteriores, el valor de la asignia es 4).

Para el cuarto oráculo, el valor de retraso es 4 (ya que las 2 filas anteriores, el valor de la asignia es 4).

Partición 4:

PHP ocurrió tres veces en la cuarta partición.

El valor de retraso para el primer y segundo PHP es nulo.

El valor de retraso para el 3er PHP es 4 (ya que las 2 filas anteriores, el valor de la asignia es 4).

Ejemplo 3:

Retrase las filas por 2 basadas en la columna de edad. Asegúrese de crear el Pyspark DataFrame como se ve en el Ejemplo 1.

# Importar la función de la ventana
de Pyspark.sql.Ventana de importación de ventana
#Importa el retraso de Pyspark.sql.funciones
de Pyspark.sql.Funciones Importación LAG
#Partition El DataFrame basado en los valores en la columna Technology1 y
#ordenar las filas en cada partición en función de la columna de edad
partición = ventana.Partitionby ("Technology1").Orderby ('edad')
imprimir ("---------- DataFrame dividido ----------")
#Ahora menciona el retraso con offset-2 basado en la edad
dataframe_obj.WithColumn ("LAG", LAG ("Age", 2).sobre (partición)).espectáculo()

Producción:

Explicación:

La partición se realiza en base a la Tecnología1 columna y retraso se define en función de la columna de edad. El número total de particiones es 4.

Partición 1:

El .La red ocurrió dos veces en la primera partición. Dado que especificamos la compensación de retraso como 2, el desplazamiento es nulo para ambos valores.

Partición 2:

Hadoop ocurrió una vez en la segunda partición. Entonces, el retraso es nulo.

Partición 3:

Oracle ocurrió cuatro veces en la tercera partición.

Para el primer y segundo oráculo, el retraso es nulo.

Para el tercer oráculo, el valor de retraso es 21 (el valor de edad de las dos filas anteriores es 21).

Para el cuarto oráculo, el valor de retraso es 22 (el valor de edad de las dos filas anteriores es 22).

Partición 4:

PHP ocurrió tres veces en la cuarta partición.

El valor de retraso para el primer y segundo PHP es nulo.

El valor de retraso para el 3er HP es 21 (el valor de edad de las dos filas anteriores es 21).

Conclusión

Aprendimos a obtener los valores de retraso en el marco de datos de Pyspark en filas divididas. La función Lag () en Pyspark está disponible en el módulo de ventana que se utiliza para devolver los valores de filas anteriores a las filas actuales. Aprendimos los diferentes ejemplos estableciendo las diferentes compensaciones.