Pyspark Rdd - Transformaciones

Pyspark Rdd - Transformaciones
En Python, Pyspark es un módulo de chispa utilizado para proporcionar un tipo de procesamiento similar como Spark.

RDD significa conjuntos de datos distribuidos resilientes. Podemos llamar a RDD una estructura de datos fundamental en Apache Spark.

Necesitamos importar RDD del Pyspark.módulo RDD.

Entonces, en Pyspark para crear un RDD, podemos usar el método Parallelize ().

Sintaxis:

Spark_app.sparkcontext.Paralelizar (datos)

Dónde,

Los datos pueden ser un dimensional (datos lineales) o dos dimensiones (datos de columna de fila).

Transformaciones RDD:

Una transformación RDD es una operación que se aplica a un RDD para crear nuevos datos del RDD existente. Usando transformaciones, podemos filtrar el RDD aplicando algunas transformaciones.

Veamos las transformaciones que se realizan en el RDD dado.

Los discutiremos uno por uno.

1. mapa()

La transformación map () se usa para asignar un valor a los elementos presentes en el RDD. Se necesita una función anónima como un parámetro, como la lambda y transforma los elementos en un RDD.

Sintaxis:

Rdd_data.mapa (anónimo_function)

Parámetros:

Anónimo_function parece:

Elemento Lambda: operación

Por ejemplo, la operación es sumar/restar todos los elementos con algún elemento nuevo.

Veamos los ejemplos para comprender mejor esta transformación.

Ejemplo 1:

En este ejemplo, creamos un RDD llamado Student_Marks con 20 elementos y aplicamos la transformación MAP () agregando cada elemento con 20 y mostrándolos usando Action Collect ().

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# crear datos de alumnos con 20 elementos
student_marks = spark_app.sparkcontext.Paralelice ([[89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en RDD:", Student_Marks.mapa (elemento lambda: elemento).recolectar())
#Apply Map () Transformación agregando 20 a cada elemento en RDD
Imprimir ("Después de agregar 20 a cada elemento en RDD:", Student_Marks.mapa (elemento lambda: elemento+ 20).recolectar())

Producción:

Datos reales en RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Después de agregar 20 a cada elemento en RDD: [109, 96, 98, 109, 110, 120, 54, 76, 74, 42, 65, 63, 43, 76, 98, 41, 54, 54, 76, 54]

De la salida anterior, podemos ver que el elemento 20 se agrega a todos y cada uno de los elementos en RDD a través de la función Lambda usando la transformación MAP ().

Ejemplo 2:

En este ejemplo, creamos un RDD llamado Student_Marks con 20 elementos y aplicamos la transformación MAP () restando cada elemento por 15 y mostrándolos usando Action Collect ().

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# crear datos de alumnos con 20 elementos
student_marks = spark_app.sparkcontext.Paralelice ([[89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en RDD:", Student_Marks.mapa (elemento lambda: elemento).recolectar())
#Apply Map () Transformación restando 15 de cada elemento en RDD
Imprimir ("Después de restar 15 de cada elemento en RDD:", Student_Marks.Mapa (Elemento Lambda: Elemento-15).recolectar())

Producción:

Datos reales en RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Después de restar 15 de cada elemento en RDD: [74, 61, 63, 74, 75, 85, 19, 41, 39, 7, 30, 28, 8, 41, 63, 6, 19, 19, 41, 19]

De la salida anterior, podemos ver que el elemento 15 se resta a todos y cada uno de los elementos de RDD a través de la función Lambda usando la transformación MAP ().

2. filtrar()

La transformación de filtro () se usa para filtrar los valores del RDD. Se necesita una función anónima como Lambda y devuelve los elementos filtrando elementos de un RDD.

Sintaxis:

Rdd_data.Filtro (anónimo_function)

Parámetros:

Anónimo_function parece:

Elemento Lambda: condición/expresión

Por ejemplo, la condición se usa para especificar las declaraciones expresivas para filtrar el RDD.

Veamos ejemplos para comprender mejor esta transformación.

Ejemplo 1:

En este ejemplo, creamos un RDD llamado Student_Marks con 20 elementos y aplicamos la transformación Filter () filtrando solo múltiplos de 5 y mostrándolos usando Action Collect ().

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# crear datos de alumnos con 20 elementos
student_marks = spark_app.sparkcontext.Paralelice ([[89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en RDD:", Student_Marks.mapa (elemento lambda: elemento).recolectar())
#Apply Filter () Transformación devolviendo múltiplos de 5.
Imprimir ("Múltiples de 5 de un RDD:", Student_Marks.Filtro (elemento lambda: elemento%5 == 0).recolectar())
)

Producción:

Datos reales en RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Múltiples de 5 de un RDD: [90, 100, 45]

De la salida anterior, podemos ver que los múltiplos de 5 elementos se filtran desde el RDD.

Ejemplo 2:

En este ejemplo, creamos un RDD llamado Student_Marks con 20 elementos y aplicamos la transformación Filter () filtrando elementos que son mayores de 45 y mostrándolos usando Action Collect () Action.

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# crear datos de alumnos con 20 elementos
student_marks = spark_app.sparkcontext.Paralelice ([[89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en RDD:", Student_Marks.mapa (elemento lambda: elemento).recolectar())
#Apply Filter () Transformación mediante el filtrado de valores superiores a 45
Imprimir ("Valores mayores que 45:", Student_Marks.Filtro (elemento lambda: elemento> 45).recolectar())

Producción:

Datos reales en RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Valores superiores a 45: [89, 76, 78, 89, 90, 100, 56, 54, 56, 78, 56]

De la salida anterior, podemos ver que esos elementos superiores a 45 se filtran desde el RDD.

3. Unión()

La transformación de unión () se usa para combinar dos RDDS. Podemos realizar esta transformación en dos RDD ..

Sintaxis:

Rdd_data1.Unión (RDD_DATA2)

Veamos ejemplos para comprender mejor esta transformación.

Ejemplo 1:

En este ejemplo, crearemos un solo RDD con datos de marcas de alumnos y generaremos dos RDD del único RDD filtrando algunos valores usando Filter () Transformación. Después de eso, podemos realizar la transformación union () en los dos RDD filtrados.

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# crear datos de alumnos con 20 elementos
student_marks = spark_app.sparkcontext.Paralelice ([[89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en RDD:", Student_Marks.mapa (elemento lambda: elemento).recolectar())
First_Filter = Student_Marks.Filtro (elemento lambda: elemento> 90)
Second_Filter = Student_Marks.Filtro (elemento lambda: elemento <40)
#Dispray primero la transformación filtrada
Imprimir ("Elementos en RDD más de 90", First_Filter.recolectar())
#Dispray Segunda transformación filtrada
Imprimir ("Elementos en RDD menos de 40", Second_Filter.recolectar())
#Apply Union () Transformación realizando unión en los 2 filtros anteriores
Imprimir ("Transformación sindical en dos datos filtrados", First_Filter.Unión (Second_Filter).recolectar())

Producción:

Datos reales en RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Elementos en RDD superior a 90 [100]
Elementos en RDD menos de 40 [34, 22, 23, 21, 34, 34, 34]
Transformación de la unión en dos datos filtrados [100, 34, 22, 23, 21, 34, 34, 34]

De la salida anterior, puede ver que realizamos unión en First_Filter y Second_Filter.

First_Filter se obtiene obteniendo elementos de Students Mermanes RDD más de 90 y Second_Filter se obtiene obteniendo elementos de Students Marks RDD menos de 40 usando Filter () Transformación.

Ejemplo 2:

En este ejemplo, crearemos dos RDD de tal manera que el primer RDD tiene 20 elementos y el segundo RDD tiene 10 elementos. Después de eso, podemos aplicar una transformación de unión () a estos dos RDDS.

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# crear datos de alumnos con 20 elementos
student_marks1 = spark_app.sparkcontext.Paralelice ([[89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
# Crear datos de alumnos con 10 elementos
student_marks2 = spark_app.sparkcontext.Paralelice ([45,43,23,56,78,21,34,34,56,34]))
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en las marcas de estudiante 1 RDD:", Student_Marks1.mapa (elemento lambda: elemento).recolectar())
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en las marcas de estudiante 2 RDD:", Student_Marks2.mapa (elemento lambda: elemento).recolectar())
#Apply Union () Transformación realizando unión en los 2 RDD anteriores
Imprimir ("Transformación de la Unión en Two RDD", Student_Marks1.Unión (Student_Marks2).recolectar())

Producción:

Datos reales en las marcas de los estudiantes 1 RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Datos reales en las marcas de los estudiantes 2 RDD: [45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Transformación de la Unión en dos RDD [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Podemos ver que dos RDD se combinan usando la transformación union ().

Conclusión

De este tutorial de Pyspark, vemos tres transformaciones aplicadas a RDD. La transformación MAP () se usa para mapear transformando elementos en un RDD, Filter () se usa para realizar operaciones de filtro y crear un nuevo RDD filtrado del RDD existente. Finalmente, discutimos Union () RDD que se usa para combinar dos RDDS.