Pyspark Zip, ZipwithIndex y ZipwithuniqueID

Pyspark Zip, ZipwithIndex y ZipwithuniqueID
En este artículo presentaremos y demostraremos los métodos Zip (), ZipWithIndex () y ZipWithuniqueID () ZipWithuniqueID ().

Antes de comenzar con estos métodos, necesitamos importar RDD del Pyspark.módulo RDD. RDD significa conjuntos de datos distribuidos resilientes. Podemos llamar a RDD como una estructura de datos fundamental en Apache Spark. Entonces, en Pyspark para crear un RDD, podemos usar el método Parallelize ().

Sintaxis:

Spark_app.sparkcontext.Paralelizar (datos)

Donde: los datos pueden ser un data unidimensional (datos lineales) o bidimensionales (datos de columna de fila).

En este artículo de Pyspark, discutiremos Zip (), ZipWithIndex () y ZipwithuniqueID ().

Pyspark zip ()

La función Pyspark Zip () se usa para combinar valores en ambos pares de RDD como un nuevo RDD.

Sintaxis:

Datos RDD1.ZIP (rdd_data2)

Aquí:

  1. Rdd_data1 es el primer RDD
  2. Rdd_data2 es el segundo rdd

Nota que el número total de elementos en el RDDS debe ser el mismo. De lo contrario, devolverá un error.

Ejemplo 1:

En este ejemplo, devolveremos RDD con cremallera de student_marks1 y student_marks2 numérico rdds.

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# crear datos de alumnos con 5 elementos
student_marks1 = spark_app.sparkcontext.Paralelice ([89,76,78,89,90])
# crear datos de alumnos con 5 elementos
student_marks2 = spark_app.sparkcontext.Paralelice ([1,2,3,4,5])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en Student_Marks1:", Student_Marks1.mapa (elemento lambda: elemento).recolectar())
Imprimir ("Datos reales en Student_Marks2:", Student_Marks2.mapa (elemento lambda: elemento).recolectar())
#zip Los dos RDD usando zip ()
Imprimir (Student_Marks1.ZIP (Student_Marks2).recolectar())

Producción:

Datos reales en Student_Marks1: [89, 76, 78, 89, 90]
Datos reales en Student_Marks2: ['1', 2, 3, 4, 5]
[(89, '1'), (76, 2), (78, 3), (89, 4), (90, 5)]

Podemos ver que cada valor en el primer RDD se combina con el segundo RDD.

Ejemplo 2:

En este ejemplo, devolveremos RDD con cremallera de Student_Marks1 y studing_marks2 string rdds.

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# Crear datos de asignaturas de estudiantes con 2 elementos
Sujetos1 = Spark_App.sparkcontext.Paralelice (['Python', 'Java'])
# Crear datos de asignaturas de estudiantes con 2 elementos
Sujetos2 = Spark_App.sparkcontext.Paralelize (['html', 'java'])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en los sujetos1:", Sujetos1.mapa (elemento lambda: elemento).recolectar())
Imprimir ("Datos reales en los sujetos2:", Sujetos2.mapa (elemento lambda: elemento).recolectar())
#zip Los dos RDD usando zip ()
imprimir (sujetos1.Zip (sujetos2).recolectar())
Producción:
Datos reales en los sujetos1: ['Python', 'Java']
Datos reales en los sujetos2: ['html', 'java']
[('Python', 'html'), ('java', 'java')]]

Podemos ver que los valores de ambos RDD están enchufados.

Pyspark ZipwithIndex ()

La función pyspark zipwithindex () se usa para combinar valores en un solo RDD con valores. Aquí, los valores por defecto comienzan con 0.

Sintaxis:

Rdd_data.ZipWithIndex ()

Aquí, rdd_data es el rdd

Ejemplo 1:

En este ejemplo, creamos un RDD con 2 elementos de cadena y zip con valores usando zipwithindex ().

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# Crear datos de asignaturas de estudiantes con 2 elementos
Sujetos1 = Spark_App.sparkcontext.Paralelice (['Python', 'Java'])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en los sujetos1:", Sujetos1.mapa (elemento lambda: elemento).recolectar())
#zip Los dos RDD usando ZipWithIndex ()
imprimir (sujetos1.ZipWithIndex ().recolectar())

Producción:

Datos reales en los sujetos1: ['Python', 'Java']
[('Python', 0), ('Java', 1)]

Podemos ver que el valor Python está rasgado con el valor 0 y Java está encendido con el valor 1.

Ejemplo 2:

En este ejemplo, creamos un RDD con 6 elementos de cadena y zip con valores usando ZipWithIndex ().

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# Crear datos de temas de estudiantes con 6 elementos
Sujetos1 = Spark_App.sparkcontext.Paralelize (['Python', 'Java', 'Python', 'Java', 'Python', 'Java'])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en los sujetos1:", Sujetos1.mapa (elemento lambda: elemento).recolectar())
#zip Los dos RDD usando ZipWithIndex ()
imprimir (sujetos1.ZipWithIndex ().recolectar())

Producción:

Datos reales en los sujetos1: ['Python', 'java', 'python', 'java', 'python', 'java']
[('Python', 0), ('Java', 1), ('Python', 2), ('Java', 3), ('Python', 4), ('Java', 5)]]

Pyspark ZipwithuniqueID ()

La función Pyspark ZipwithuniqueID () es similar al del método anterior, pero los valores que forman un par están en el siguiente patrón:

K, 1*n+k, 2*n+k, 3*n+k .. .

n representa el número de particiones.

Sintaxis:

Rdd_data.ZipwithuniqueID ()

Aquí, rdd_data es el rdd

Puede haber muchas brechas entre los valores con zonas con zumbido.

Ejemplo:

#Importa el módulo Pyspark
importar pyspark
#Import Sparksession para crear una sesión
de Pyspark.SQL Import Sparksession
# Importar RDD de Pyspark.RDD
de Pyspark.RDD import rdd
#cree una aplicación llamada Linuxhint
Spark_app = Sparksession.constructor.AppName ('Linuxhint').getorcreate ()
# Crear datos de temas de estudiantes con 6 elementos
Sujetos1 = Spark_App.sparkcontext.Paralelize (['Python', 'Java', 'Python', 'Java', 'Python', 'Java'])
#Datos de desplazamiento en RDD
Imprimir ("Datos reales en los sujetos1:", Sujetos1.mapa (elemento lambda: elemento).recolectar())
#zip Los dos RDD usando ZipwithuniqueID ()
imprimir (sujetos1.ZipwithuniqueID ().recolectar())

Producción:

Datos reales en los sujetos1: ['Python', 'java', 'python', 'java', 'python', 'java']
[('Python', 0), ('Java', 2), ('Python', 4), ('Java', 1), ('Python', 3), ('Java', 5)]]

De la salida anterior, podemos ver que diferentes valores se amplían con valores reales.

Conclusión

En este tutorial, vimos cómo cerrar el RDD con algunos valores. Zip () se usa para cerrar dos pares de RDD. ZipwithIndex () se usa para crecer con valores y zipwithuniqueID () se usa para crecer con valores basados ​​en particiones.