Cómo leer datos de Kafka con Python

Cómo leer datos de Kafka con Python
Kafka es un sistema de mensajería distribuido de código abierto para enviar el mensaje en temas divididos y diferentes. La transmisión de datos en tiempo real se puede implementar utilizando Kafka para recibir datos entre las aplicaciones. Tiene tres partes principales. Estos son productores, consumidores y temas. El productor se usa para enviar un mensaje a un tema en particular y cada mensaje se adjunta con una clave. El consumidor se usa para leer un mensaje sobre un tema en particular del conjunto de particiones. Los datos recibidos del productor y almacenados en las particiones basadas en un tema en particular. Muchas bibliotecas existen en Python para crear productores y consumidores para construir un sistema de mensajería utilizando kafka. Cómo se pueden leer los datos de Kafka usando Python se muestran en este tutorial.

Requisito previo

Debe instalar la biblioteca de Python necesaria para leer datos de Kafka. Python3 se usa en este tutorial para escribir el guión del consumidor y el productor. Si el paquete PIP no está instalado antes en su sistema operativo Linux, entonces debe instalar PIP antes de instalar la biblioteca Kafka para Python. python3-kafka se utiliza en este tutorial para leer datos de Kafka. Ejecute el siguiente comando para instalar la biblioteca.

$ Pip Instale Python3-Kafka

Leer datos de texto simples de Kafka

Se pueden enviar diferentes tipos de datos del productor sobre un tema particular que el consumidor puede leer. En esta parte de este tutorial se pueden enviar y recibir datos de texto simples utilizando el productor y el consumidor.

Crea un archivo llamado productor1.py con el siguiente guión de Python. Kafkaproducer El módulo se importa de la biblioteca Kafka. La lista de corredores debe definir en el momento de la inicialización del objeto del productor para conectarse con el servidor Kafka. El puerto predeterminado de Kafka es '9092'. El argumento bootstrap_servers se utiliza para definir el nombre de host con el puerto. 'Primero'se establece como un nombre de tema por el cual se enviará el mensaje de texto del productor. A continuación, un simple mensaje de texto, 'Hola de Kafka'se envía usando enviar() método de Kafkaproducer al tema, 'Primero'.

productor1.PY:

# Importar kafkaproducer de la biblioteca Kafka
de Kafka import kafkaproducer
# Definir servidor con puerto
bootstrap_servers = ['localhost: 9092']
# Definir el nombre del tema donde publicará el mensaje
TopicName = 'First_topic'
# Inicializar la variable del productor
productor = kafkaproducer (bootstrap_servers = bootstrap_servers)
# Publicar texto en tema definido
productor.Enviar (TopicName, B'hello de Kafka ... ')
# Mensaje de impresión
Imprimir ("Mensaje enviado")

Crea un archivo llamado consumidor1.py con el siguiente guión de Python. Kafkaconsumer El módulo se importa de la biblioteca Kafka para leer datos de Kafka. sys El módulo se usa aquí para terminar el script. El mismo nombre de host y el número de puerto del productor se utilizan en el script del consumidor para leer datos de Kafka. El nombre del tema del consumidor y el productor debe ser el mismo que es 'Primero'. A continuación, el objeto de consumo se inicializa con los tres argumentos. Nombre del tema, ID de grupo e información del servidor. para El bucle se usa aquí para leer el envío de texto del productor de Kafka.

consumidor1.PY:

# Importar kafkaconsumer de la biblioteca Kafka
de kafka import kafkaconsumer
# Módulo de importación SYS
Sys de importación
# Definir servidor con puerto
bootstrap_servers = ['localhost: 9092']
# Definir el nombre del tema de donde recibirá el mensaje
TopicName = 'First_topic'
# Inicializar la variable del consumidor
Consumer = kafkaconsumer (topicname, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Leer e imprimir mensaje del consumidor
Para MSG en el consumidor:
imprimir ("Nombre del tema =%s, mensaje =%s"%(MSG.tema, msg.valor))
# Terminar el script
sys.salida()

Producción:

Ejecute el siguiente comando de un terminal para ejecutar el script del productor.

$ python3 productor1.py

La siguiente salida aparecerá después de enviar el mensaje.

Ejecute el siguiente comando desde otro terminal para ejecutar el script del consumidor.

$ Python3 Consumer1.py

La salida muestra el nombre del tema y el mensaje de texto enviado desde el productor.

Lectura de datos formateados de JSON de Kafka

Los datos formateados por JSON pueden ser enviados por el productor de Kafka y leer por el consumidor de Kafka utilizando el json Módulo de Python. La forma en que los datos JSON se pueden serializar y des-serializar antes de enviar y recibir los datos utilizando el módulo Python-Kafka se muestra en esta parte de este tutorial.

Crea un script de Python llamado productor2.py Con el siguiente script. Otro módulo llamado JSON se importa con Kafkaproducer módulo aquí. value_serializer El argumento se usa con bootstrap_servers argumento aquí para inicializar el objeto del productor de Kafka. Este argumento indica que los datos JSON se codificarán usando 'UTF-8'personaje establecido en el momento de enviar. A continuación, los datos formateados JSON se envían al tema nombrado Jsontópico.

productor2.PY:

# Importar kafkaproducer de la biblioteca Kafka
de Kafka import kafkaproducer
# Importar módulo JSON para serializar datos
importar json
# Inicializar la variable del productor y establecer el parámetro para la codificación JSON
productor = kafkaproducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.vertederos (v).encode ('utf-8'))
# Enviar datos en formato JSON
productor.Enviar ('JSontópico', 'Nombre': 'Fahmida', 'Correo electrónico': '[email protected] ')
# Mensaje de impresión
Imprimir ("Mensaje enviado a JSontópico")

Crea un script de Python llamado consumidor2.py Con el siguiente script. Kafkaconsumer, sys y los módulos JSON se importan en este script. Kafkaconsumer El módulo se usa para leer datos formateados de JSON de Kafka. El módulo JSON se usa para decodificar el envío de datos JSON codificados desde el productor de Kafka. Sys El módulo se usa para terminar el script. valor_deserializador El argumento se usa con bootstrap_servers Para definir cómo se decodificarán los datos JSON. Próximo, para El bucle se usa para imprimir todos los registros de consumo y los datos JSON recuperados de Kafka.

consumidor2.PY:

# Importar kafkaconsumer de la biblioteca Kafka
de kafka import kafkaconsumer
# Módulo de importación SYS
Sys de importación
# Importar módulo JSON para serializar datos
importar json
# Inicializar la variable del consumidor y establecer la propiedad para JSON Decode
Consumer = kafkaconsumer ('jsontópico', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.Cargas (M.decode ('utf-8'))))
# Leer datos de Kafka
Para el mensaje en el consumidor:
Imprimir ("Registros de consumo: \ n")
Imprimir (mensaje)
Imprimir ("\ Neading desde JSON Data \ n")
Imprimir ("Nombre:", Mensaje [6] ['Nombre'])
imprimir ("correo electrónico:", mensaje [6] ['correo electrónico'])
# Terminar el script
sys.salida()

Producción:

Ejecute el siguiente comando de un terminal para ejecutar el script del productor.

$ Python3 Producer2.py

El script imprimirá el siguiente mensaje después de enviar los datos JSON.

Ejecute el siguiente comando desde otro terminal para ejecutar el script del consumidor.

$ Python3 Consumer2.py

La siguiente salida aparecerá después de ejecutar el script.

Conclusión:

Los datos se pueden enviar y recibir en diferentes formatos de Kafka usando Python. Los datos también se pueden almacenar en la base de datos y recuperarse de la base de datos utilizando Kafka y Python. I Home, este tutorial ayudará al usuario de Python a comenzar a trabajar con Kafka.