Apache Kafka

Ako čítať údaje z Kafky pomocou Pythonu

Ako čítať údaje z Kafky pomocou Pythonu
Kafka je systém distribuovaných správ s otvoreným zdrojovým kódom na odosielanie správ v rozdelených témach a na rôzne témy. Streamovanie údajov v reálnom čase je možné implementovať pomocou Kafky na príjem údajov medzi aplikáciami. Má tri hlavné časti. Jedná sa o producenta, spotrebiteľa a témy. Producent sa používa na odoslanie správy na konkrétnu tému a ku každej správe sa pripája kľúč. Spotrebiteľ je zvyknutý čítať správu o konkrétnej téme z množiny oddielov. Údaje prijaté od výrobcu a uložené v oddieloch na základe konkrétnej témy. Veľa knižníc v pythone existuje na to, aby vytvorili výrobcu a spotrebiteľa na vybudovanie systému správ pomocou Kafky. Ako sa dajú čítať údaje z Kafky pomocou jazyka python, ukazuje tento návod.

Predpoklad

Musíte si nainštalovať potrebnú knižnicu pythonu na čítanie údajov z Kafky. V tomto výučbe sa program Python3 používa na napísanie skriptu spotrebiteľa a producenta. Ak balík pip nie je nainštalovaný vo vašom operačnom systéme Linux, musíte si pred inštaláciou knižnice Kafka pre python nainštalovať pip. python3-kafka sa v tomto návode používa na čítanie údajov z Kafky. Spustite nasledujúci príkaz na inštaláciu knižnice.

$ pip nainštalovať python3-kafka

Čítanie jednoduchých textových údajov od Kafku

Od výrobcu možno zaslať rôzne typy údajov o konkrétnej téme, ktoré si spotrebiteľ môže prečítať. Ako sa dajú odoslať a prijať jednoduché textové údaje od spoločnosti Kafka pomocou výrobcu a spotrebiteľa, sa dozviete v tejto časti tohto tutoriálu.

Vytvorte súbor s názvom producent1.py s nasledujúcim skriptom python. KafkaVýrobca modul sa importuje z knižnice Kafka. Zoznam sprostredkovateľov musí byť definovaný v čase inicializácie objektu producenta, aby sa mohol spojiť so serverom Kafka. Predvolený port Kafky je '9092„. argument bootstrap_servers sa používa na definovanie názvu hostiteľa s portom. „Prvá_Téma„je nastavený ako názov témy, pomocou ktorej bude textová správa odosielaná od producenta. Ďalej jednoduchá textová správa. “Ahojte od Kafky„sa odosiela pomocou poslať () metóda KafkaVýrobca k téme, “Prvá_Téma„.

producent1.py:

# Importujte produkt KafkaProducer z knižnice Kafka
z kafka import KafkaProducer
# Definujte server s portom
bootstrap_servers = ['localhost: 9092']
# Definujte názov témy, kde sa bude správa zverejňovať
topicName = 'Prvá_Téma'
# Inicializovať premennú výrobcu
producent = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Publikovanie textu v definovanej téme
producent.send (topicName, b'Hello from kafka ... ')
# Tlačiť správu
print ("Správa odoslaná")

Vytvorte súbor s názvom spotrebiteľ1.py s nasledujúcim skriptom python. Kafkaspotrebiteľ modul sa importuje z knižnice Kafka na načítanie údajov z Kafky. sys modul sa tu používa na ukončenie skriptu. Rovnaké meno hostiteľa a číslo portu výrobcu sa používa v skripte spotrebiteľa na načítanie údajov z Kafky. Názov témy spotrebiteľa a výrobcu musí byť rovnaký, ako je „First_topic„.  Ďalej sa spotrebiteľský objekt inicializuje pomocou troch argumentov. Názov témy, ID skupiny a informácie o serveri. pre slučka sa tu používa na čítanie textu odoslaného od výrobcu Kafka.

spotrebiteľ1.py:

# Importujte spotrebiteľa Kafka z knižnice Kafka
z kafka import KafkaConsumer
# Importovať sys modul
import sys
# Definujte server s portom
bootstrap_servers = ['localhost: 9092']
# Definujte názov témy, odkiaľ bude správa prijímaná
topicName = 'Prvá_Téma'
# Inicializovať spotrebiteľskú premennú
consumer = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Čítajte a tlačte správy od spotrebiteľov
pre správu u spotrebiteľa:
print ("Názov témy =% s, Správa =% s"% (msg.téma, správa.hodnota))
# Ukončite skript
sys.východ()

Výkon:

Spustením nasledujúceho príkazu z jedného terminálu vykonajte producentský skript.

$ python3 producent1.py

Po odoslaní správy sa objaví nasledujúci výstup.

Spustením nasledujúceho príkazu z iného terminálu vykonajte spotrebiteľský skript.

$ python3 spotrebiteľ1.py

Výstup zobrazuje názov témy a textovú správu odoslanú od výrobcu.

Čítanie údajov vo formáte JSON z Kafky

Údaje vo formáte JSON môže posielať producent Kafka a čítať ich spotrebiteľ pomocou produktu Kafka JSON modul pythonu. Ako je možné serializovať a de-serializovať údaje JSON pred odoslaním a prijatím údajov pomocou modulu python-kafka, je uvedené v tejto časti tohto tutoriálu.

Vytvorte skript v jazyku python s názvom producent2.py s nasledujúcim skriptom. Importuje sa ďalší modul s názvom JSON KafkaVýrobca modul tu. value_serializer argument sa používa s bootstrap_servers tu argument na inicializáciu objektu výrobcu Kafka. Tento argument naznačuje, že údaje JSON budú kódované pomocou znaku „utf-8'znaková sada v čase odoslania. Ďalej sa údaje vo formáte JSON odošlú na pomenovanú tému JSONtopic.

producent2.py:

# Importujte produkt KafkaProducer z knižnice Kafka
z kafka import KafkaProducer
# Importovať modul JSON na serializáciu údajov
import json
# Inicializuje premennú výrobcu a nastaví parameter pre kódovanie JSON
producent = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.skládky (v).kódovať ('utf-8'))
# Pošlite údaje vo formáte JSON
producent.send ('JSONtopic', 'name': 'fahmida', 'email': '[chránený e-mailom]')
 
# Tlačiť správu
print ("Správa odoslaná do JSONtopic")

Vytvorte skript v jazyku python s názvom spotrebiteľ2.py s nasledujúcim skriptom. Kafkaspotrebiteľ, sys a moduly JSON sa importujú do tohto skriptu. Kafkaspotrebiteľ modul slúži na čítanie dát vo formáte JSON z Kafky. Modul JSON sa používa na dekódovanie kódovaných údajov JSON odosielaných od výrobcu Kafka. Sys modul slúži na ukončenie skriptu. value_deserializer argument sa používa s bootstrap_servers definovať, ako budú dekódované údaje JSON. Ďalšie, pre slučka slúži na tlač všetkých záznamov spotrebiteľov a údajov JSON získaných z Kafky.

spotrebiteľ2.py:

# Importujte spotrebiteľa Kafka z knižnice Kafka
z kafka import KafkaConsumer
# Importovať sys modul
import sys
# Importujte modul json na serializáciu údajov
import json
# Inicializuje spotrebiteľskú premennú a nastaví vlastnosť pre dekódovanie JSON
consumer = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.zaťaženia (m.dekódovať ('utf-8')))
# Čítať údaje z kafky
pre správu u spotrebiteľa:
print ("Záznamy o spotrebiteľovi: \ n")
tlač (správa)
print ("\ nČítanie z údajov JSON \ n")
print ("Meno:", správa [6] ['meno'])
print ("Email:", správa [6] ['email'])
# Ukončite skript
sys.východ()

Výkon:

Spustením nasledujúceho príkazu z jedného terminálu vykonajte producentský skript.

$ python3 producent2.py

Po odoslaní údajov JSON skript vytlačí nasledujúcu správu.

Spustením nasledujúceho príkazu z iného terminálu vykonajte spotrebiteľský skript.

$ python3 spotrebiteľ2.py

Po spustení skriptu sa zobrazí nasledujúci výstup.

Záver:

Dáta môžu byť odosielané a prijímané v rôznych formátoch od Kafky pomocou pythonu. Dáta môžu byť tiež uložené do databázy a načítané z databázy pomocou Kafky a pythonu. Doma, tento návod pomôže používateľovi pythonu začať pracovať s Kafkou.

Hry Ako nainštalovať League Of Legends na Ubuntu 14.04
Ako nainštalovať League Of Legends na Ubuntu 14.04
Ak ste fanúšikom League of Legends, potom je to pre vás príležitosť otestovať spustenie League of Legends. Upozorňujeme, že program PlayOnLinux podpor...
Hry Nainštalujte si najnovšiu strategickú hru OpenRA na Ubuntu Linux
Nainštalujte si najnovšiu strategickú hru OpenRA na Ubuntu Linux
OpenRA je herný engine Libre / Free Real Time Strategy, ktorý obnovuje rané hry z Westwoodu, ako napríklad klasické Command & Conquer: Red Alert. Dist...
Hry Nainštalujte si najnovší Dolphin Emulator pre Gamecube a Wii na Linuxe
Nainštalujte si najnovší Dolphin Emulator pre Gamecube a Wii na Linuxe
Emulátor Dolphin vám umožní hrať vybrané hry Gamecube a Wii na osobných počítačoch Linux (PC). Emulátor Dolphin, ktorý je voľne dostupným emulátorom ...