Aby ste mohli absolvovať túto lekciu, musíte mať na svojom počítači aktívnu inštaláciu programu Kafka. Prečítajte si článok Inštalácia Apache Kafka na Ubuntu a dozviete sa, ako na to.
Inštalácia klienta Python pre Apache Kafka
Predtým, ako začneme pracovať s programom Apache Kafka v programe Python, je potrebné nainštalovať klienta Python pre program Apache Kafka. To je možné vykonať pomocou pip (Index balíka Python). Tu je príkaz, ako to dosiahnuť:
pip3 nainštalujte kafka-pythonBude to rýchla inštalácia na terminál:
Inštalácia klienta Python Kafka pomocou PIP
Teraz, keď máme aktívnu inštaláciu pre Apache Kafka a nainštalovali sme aj klienta Python Kafka, sme pripravení začať programovať.
Tvorba producenta
Prvá vec, ktorú musíte zverejniť na Kafke, je aplikácia producenta, ktorá dokáže posielať správy na témy v Kafke.
Upozorňujeme, že producenti Kafky sú producentmi asynchrónnych správ. To znamená, že operácie vykonané počas zverejnenia správy v témovej oblasti Kafka neblokujú. Pre zjednodušenie napíšeme pre túto lekciu jednoduchého vydavateľa JSON.
Ak chcete začať, vytvorte inštanciu pre producenta Kafka:
z kafka import KafkaProducerimport json
importovať tlač
producent = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.skládky (v).kódovať ('utf-8'))
Atribút bootstrap_servers informuje o hostiteľovi a porte pre server Kafka. Atribút value_serializer slúži iba na účely serializácie JSON zistených hodnôt JSON.
Ak si chcete zahrať s producentom Kafka, skúsme vytlačiť metriky týkajúce sa klastra Producent a Kafka:
metriky = producent.metriky ()pprint.pprint (metriky)
Teraz uvidíme nasledujúce:
Kafka Mterics
Teraz už konečne skúsme poslať nejakú správu do frontu Kafka. Dobrým príkladom bude jednoduchý objekt JSON:
producent.send ('linuxhint', 'topic': 'kafka')The linuxhint je tematický oddiel, na ktorý sa bude posielať objekt JSON. Keď spustíte skript, nebudete mať žiadny výstup, pretože správa sa práve pošle do tematického oddielu. Je čas napísať spotrebiteľovi, aby sme mohli otestovať našu aplikáciu.
Výroba spotrebiteľa
Teraz sme pripravení vytvoriť nové pripojenie ako spotrebiteľskú aplikáciu a dostávať správy z témy Kafka. Začnite vytvorením novej inštancie pre spotrebiteľa:
z kafka import KafkaConsumerz kafka import TopicPartition
print ('Vytvára sa spojenie.")
consumer = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
Teraz priraďte k tomuto spojeniu tému a tiež možnú hodnotu posunu.
print ('Priradenie témy.")spotrebiteľ.assign ([TopicPartition ('linuxhint', 2)]))
Na záver sme pripravení tlačiť stránku:
print ('Získava sa správa.")pre správu u spotrebiteľa:
print ("OFFSET:" + str (správa [0]) + "\ t MSG:" + str (správa))
Prostredníctvom toho získame zoznam všetkých zverejnených správ v spotrebiteľskej témovej oblasti Kafka. Výstupom pre tento program bude:
Spotrebiteľ Kafka
Len pre rýchlu informáciu, tu je kompletný producentský skript:
z kafka import KafkaProducerimport json
importovať tlač
producent = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.skládky (v).kódovať ('utf-8'))
producent.send ('linuxhint', 'topic': 'kafka')
# metrics = producent.metriky ()
# pprint.pprint (metriky)
A tu je kompletný program pre spotrebiteľov, ktorý sme použili:
z kafka import KafkaConsumerz importu kafka TopicPartition
print ('Vytvára sa spojenie.")
consumer = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
print ('Priradenie témy.")
spotrebiteľ.assign ([TopicPartition ('linuxhint', 2)]))
print ('Získava sa správa.")
pre správu u spotrebiteľa:
print ("OFFSET:" + str (správa [0]) + "\ t MSG:" + str (správa))
Záver
V tejto lekcii sme sa pozreli na to, ako môžeme nainštalovať a začať používať Apache Kafka v našich programoch Python. Ukázali sme, aké ľahké je vykonávať jednoduché úlohy spojené s Kafkou v Pythone, s demonštrovaným klientom Kafka pre Python.