Comenzando con Apache Kafka y Python

Comenzando con Apache Kafka y Python
En esta lección, veremos cómo podemos usar Apache Kafka con Python y hacer una aplicación de muestra usando el cliente Python para Apache Kafka.

Para completar esta lección, debe tener una instalación activa para Kafka en su máquina. Leer Instalar Apache Kafka en Ubuntu para saber cómo hacer esto.

Instalación del cliente Python para Apache Kafka

Antes de que podamos comenzar a trabajar con Apache Kafka en el programa Python, necesitamos instalar el cliente Python para Apache Kafka. Esto se puede hacer usando pepita (Índice de paquetes de Python). Aquí hay un comando para lograr esto:

PIP3 instalar kafka-python

Esta será una instalación rápida en la terminal:

Instalación del cliente de Python Kafka con PIP

Ahora que tenemos una instalación activa para Apache Kafka y también hemos instalado el cliente Python Kafka, estamos listos para comenzar a codificar.

Hacer un productor

Lo primero que debe publicar mensajes en Kafka es una aplicación de productor que puede enviar mensajes a temas en Kafka.

Tenga en cuenta que los productores de Kafka son productores de mensajes asíncronos. Esto significa que las operaciones realizadas mientras se publican un mensaje en Kafka Topic Partition no son bloqueados. Para mantener las cosas simples, escribiremos un editor JSON simple para esta lección.

Para comenzar, haga una instancia para el productor de Kafka:

de Kafka import kafkaproducer
importar json
Importar PPRint
productor = kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.vertederos (v).encode ('utf-8'))

El atributo Bootstrap_Servers informa sobre el host y el puerto para el servidor Kafka. El atributo value_serializer es solo para el propósito de serialización de JSON de los valores JSON encontrados.

Para jugar con el productor de Kafka, intentemos imprimir las métricas relacionadas con el productor y el clúster Kafka:

Métricas = productor.métrica()
piprint.PPRINT (Métricas)

Veremos lo siguiente ahora:

Kafka mterics

Ahora, finalmente intentemos enviar algún mensaje a la cola de kafka. Un simple objeto JSON será un buen ejemplo:

productor.Enviar ('Linuxhint', 'Topic': 'Kafka')

El Linuxhint es la partición del tema en la que se enviará el objeto JSON. Cuando ejecuta el script, no obtendrá ninguna salida, ya que el mensaje se envía a la partición del tema. Es hora de escribir un consumidor para que podamos probar nuestra aplicación.

Hacer un consumidor

Ahora, estamos listos para hacer una nueva conexión como aplicación de consumidor y recibir los mensajes del tema de Kafka. Comience con hacer una nueva instancia para el consumidor:

de kafka import kafkaconsumer
de Kafka Import TopicPartition
Imprimir ('Hacer conexión.')
Consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')

Ahora, asigne un tema a esta conexión y también un posible valor de compensación.

Imprimir ('Asignación del tema.')
consumidor.Asignar ([TopicPartition ('Linuxhint', 2)])

Finalmente, estamos listos para imprimir el MSSAGE:

Imprimir ('Recibir mensaje.')
Para el mensaje en el consumidor:
print ("offset:" + str (mensaje [0]) + "\ t msg:" + str (mensaje)))

A través de esto, obtendremos una lista de todos los mensajes publicados en Kafka Consumer Topic Partition. La salida de este programa será:

Consumidor de kafka

Solo para una referencia rápida, aquí está el script del productor completo:

de Kafka import kafkaproducer
importar json
Importar PPRint
productor = kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.vertederos (v).encode ('utf-8'))
productor.Enviar ('Linuxhint', 'Topic': 'Kafka')
# Métricos = Productor.métrica()
# pprint.PPRINT (Métricas)

Y aquí está el programa completo de consumo que utilizamos:

de kafka import kafkaconsumer
de Kafka Import TopicPartition
Imprimir ('Hacer conexión.')
Consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')
Imprimir ('Asignación del tema.')
consumidor.Asignar ([TopicPartition ('Linuxhint', 2)])
Imprimir ('Recibir mensaje.')
Para el mensaje en el consumidor:
print ("offset:" + str (mensaje [0]) + "\ t msg:" + str (mensaje)))

Conclusión

En esta lección, observamos cómo podemos instalar y comenzar a usar Apache Kafka en nuestros programas de Python. Mostramos lo fácil que es realizar tareas simples relacionadas con Kafka en Python con el cliente de Kafka demostrado para Python.