U savremenim računarskim sistemima, svakodnevno se generišu milioni zapisa podataka. To obuhvata vaše finansijske transakcije, porudžbine ili podatke sa senzora vašeg automobila. Za obradu ovih tokova podataka u realnom vremenu i za pouzdan prenos zapisa događaja između različitih sistema unutar preduzeća, neophodan je Apache Kafka.
Apache Kafka predstavlja rešenje otvorenog koda za strimovanje podataka, sposobno da obradi preko milion zapisa u sekundi. Pored ovog visokog propusnog opsega, Apache Kafka nudi izuzetnu skalabilnost i dostupnost, nisko kašnjenje i mogućnost trajnog skladištenja podataka.
Kompanije kao što su LinkedIn, Uber i Netflix se oslanjaju na Apache Kafku za obradu i striming podataka u realnom vremenu. Najjednostavniji način da započnete sa korišćenjem Apache Kafke je da je pokrenete na lokalnom računaru. Na ovaj način, ne samo da ćete moći da vidite Apache Kafka server u akciji, već ćete moći i da proizvodite i trošite poruke.
Sticanjem praktičnog iskustva u pokretanju servera, kreiranju tema i pisanju Java koda pomoću Kafka klijenta, bićete spremni da iskoristite Apache Kafku za zadovoljenje svih vaših potreba u vezi sa protokom podataka.
Kako preuzeti Apache Kafka na vaš lokalni računar
Najnoviju verziju Apache Kafke možete preuzeti sa zvanične stranice. Preuzeti sadržaj će biti komprimovan u .tgz formatu. Nakon preuzimanja, potrebno je da ga raspakujete.
Ako koristite Linux, otvorite terminal. Zatim idite do lokacije na kojoj ste sačuvali komprimovanu verziju Apache Kafke. Pokrenite sledeću komandu:
tar -xzvf kafka_2.13-3.5.0.tgz
Po završetku komande, pojaviće se novi direktorijum pod nazivom kafka_2.13-3.5.0. Uđite u tu fasciklu pomoću:
cd kafka_2.13-3.5.0
Sada možete prikazati sadržaj ovog direktorijuma koristeći komandu ls.
Za korisnike operativnog sistema Windows, možete pratiti iste korake. Ako nemate komandu tar, možete koristiti alat treće strane, kao što je WinZip, za otvaranje arhive.
Kako pokrenuti Apache Kafka na vašem lokalnom računaru
Kada ste preuzeli i raspakovali Apache Kafku, vreme je da je pokrenete. Nema posebnog instalacionog programa. Možete je direktno koristiti putem komandne linije ili terminala.
Pre nego što počnete sa Apache Kafkom, uverite se da imate instaliranu Javu 8+ na vašem sistemu. Apache Kafka zahteva aktivnu Java instalaciju.
#1. Pokrenite Apache ZooKeeper server
Prvi korak je pokretanje Apache ZooKeeper-a. On se isporučuje unapred preuzet kao deo arhive. To je servis zadužen za održavanje konfiguracija i omogućavanje sinhronizacije za druge servise.
Kada ste u direktorijumu gde ste raspakovali sadržaj arhive, pokrenite sledeću komandu:
Za korisnike Linux-a:
bin/zookeeper-server-start.sh config/zookeeper.properties
Za korisnike Windows-a:
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
Datoteka zookeeper.properties pruža konfiguracije za pokretanje Apache ZooKeeper servera. Možete konfigurisati svojstva kao što su lokalni direktorijum za čuvanje podataka i port na kome će server raditi.
#2. Pokrenite Apache Kafka server
Sada kada je Apache ZooKeeper server pokrenut, vreme je da pokrenete Apache Kafka server.
Otvorite novi terminal ili komandnu liniju, i idite do direktorijuma u kom se nalaze raspakovane datoteke. Zatim, možete pokrenuti Apache Kafka server koristeći sledeću komandu:
Za korisnike Linux-a:
bin/kafka-server-start.sh config/server.properties
Za korisnike Windows-a:
bin/windows/kafka-server-start.bat config/server.properties
Sada imate pokrenut Apache Kafka server. Ako želite da izmenite podrazumevanu konfiguraciju, to možete uraditi editovanjem datoteke server.properties. Različite vrednosti su navedene u zvaničnoj dokumentaciji.
Kako koristiti Apache Kafku na vašem lokalnom računaru
Sada ste spremni da počnete da koristite Apache Kafku na svom lokalnom računaru za proizvodnju i potrošnju poruka. Pošto su Apache ZooKeeper i Apache Kafka serveri pokrenuti i aktivni, hajde da vidimo kako možete da kreirate svoju prvu temu, proizvedete svoju prvu poruku i koristite istu.
Koji su koraci za kreiranje teme u Apache Kafki?
Pre nego što kreirate prvu temu, hajde da razumemo šta je zapravo tema. U Apache Kafki, tema je logičko skladište podataka koje pomaže u strimingu podataka. Zamislite je kao kanal kroz koji se podaci prenose od jedne komponente do druge.
Tema podržava više proizvođača i više potrošača – više od jednog sistema može da piše u temu i čita iz nje. Za razliku od drugih sistema za razmenu poruka, svaka poruka iz teme može se koristiti više puta. Pored toga, možete definisati i period zadržavanja za vaše poruke.
Uzmimo za primer sistem (proizvođač) koji generiše podatke o bankarskim transakcijama. Drugi sistem (potrošač) koristi te podatke i šalje korisniku obaveštenje putem aplikacije. Da bi se ovo olakšalo, potrebna je tema.
Otvorite novi terminal ili komandnu liniju i idite do direktorijuma u kom ste raspakovali arhivu. Sledeća komanda će kreirati temu pod nazivom „transactions“:
Za korisnike Linux-a:
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
Za korisnike Windows-a:
bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092
Sada ste napravili svoju prvu temu i spremni ste da počnete sa proizvodnjom i trošenjem poruka.
Kako poslati poruku Apache Kafki?
Kada je vaša Apache Kafka tema spremna, sada možete kreirati svoju prvu poruku. Otvorite novi terminal ili komandnu liniju, ili koristite onaj koji ste koristili za kreiranje teme. Zatim, uverite se da ste u odgovarajućem direktorijumu u koji ste raspakovali sadržaj arhive. Možete koristiti komandnu liniju da kreirate svoju poruku na temi koristeći sledeću komandu:
Za korisnike Linux-a:
bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
Za korisnike Windows-a:
bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092
Kada pokrenete komandu, videćete da vaš terminal ili komandna linija čekaju na unos. Napišite svoju prvu poruku i pritisnite Enter.
> This is a transactional record for $100
Kreirali ste svoju prvu poruku za Apache Kafku na vašem lokalnom računaru. Sada ste spremni da je potrošite.
Kako potrošiti poruku iz Apache Kafke?
Ako je vaša tema kreirana i ako ste poslali poruku na vašu Kafka temu, sada možete potrošiti tu poruku.
Apache Kafka vam omogućava da povežete više potrošača na istu temu. Svaki potrošač može biti deo potrošačke grupe – logičkog identifikatora. Na primer, ako imate dva servisa koja treba da koriste iste podatke, oni mogu imati različite grupe potrošača.
Međutim, ako imate dve instance istog servisa, onda želite da izbegnete korišćenje i obradu iste poruke dva puta. U tom slučaju, oba će imati istu grupu potrošača.
U terminalu ili komandnoj liniji, uverite se da ste u odgovarajućem direktorijumu. Koristite sledeću komandu da pokrenete potrošača:
Za korisnike Linux-a:
bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Za korisnike Windows-a:
bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Videćete da se poruka koju ste ranije poslali pojavila u vašem terminalu. Sada ste koristili Apache Kafku da potrošite svoju prvu poruku.
Komanda kafka-console-consumer uzima mnoge argumente. Hajde da vidimo šta svaki od njih znači:
- –topic se odnosi na temu iz koje ćete trošiti podatke
- –from-beginning govori konzolnom potrošaču da odmah počne da čita poruke, od prve dostupne
- Vaš Apache Kafka server se pominje kroz opciju –bootstrap-server
- Pored toga, možete pomenuti grupu potrošača tako što ćete proslediti parametar –group
- Ako nema parametra za grupu potrošača, ona se automatski generiše
Kada je konzolni potrošač pokrenut, možete pokušati da proizvedete nove poruke. Videćete da su sve potrošene i da se pojavljuju u vašem terminalu.
Sada kada ste kreirali svoju temu i uspešno proizvodili i trošili poruke, hajde da to integrišemo sa Java aplikacijom.
Kako kreirati Apache Kafka proizvođača i potrošača koristeći Javu
Pre nego što počnete, uverite se da imate instaliranu Javu 8+ na vašem lokalnom računaru. Apache Kafka pruža sopstvenu klijentsku biblioteku koja vam omogućava besprekornu vezu. Ako koristite Maven za upravljanje zavisnostima, dodajte sledeću zavisnost u svoj pom.xml:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.5.0</version> </dependency>
Biblioteku možete preuzeti i sa Maven repozitorijuma i dodati je u svoju Java putanju klasa.
Kada je biblioteka na mestu, otvorite uređivač koda po izboru. Hajde da vidimo kako možete pokrenuti proizvođača i potrošača koristeći Javu.
Kreirajte Apache Kafka Java proizvođača
Sa bibliotekom kafka-clients na mestu, sada ste spremni da kreirate svog Kafka proizvođača.
Hajde da napravimo klasu pod nazivom SimpleProducer.java. Ona će biti zadužena za slanje poruka na temu koju ste ranije kreirali. Unutar ove klase, kreiraćete instancu org.apache.kafka.clients.producer.KafkaProducer. Nakon toga, koristićete ovog proizvođača za slanje poruka.
Za kreiranje Kafka proizvođača, potrebni su vam host i port vašeg Apache Kafka servera. Pošto ga pokrećete na svom lokalnom računaru, host će biti localhost. S obzirom na to da niste menjali podrazumevana svojstva pri pokretanju servera, port će biti 9092. Razmotrite sledeći kod, koji će vam pomoći da kreirate proizvođača:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } }
Možete primetiti da se postavljaju tri svojstva. Hajde da brzo prođemo kroz svako od njih:
- BOOTSTRAP_SERVERS_CONFIG vam omogućava da definišete gde se pokreće Apache Kafka server
- KEY_SERIALIZER_CLASS_CONFIG govori proizvođaču koji format da koristi za slanje ključeva poruke.
- Format za slanje stvarne poruke se definiše korišćenjem svojstva VALUE_SERIALIZER_CLASS_CONFIG.
Pošto ćete slati tekstualne poruke, oba svojstva su podešena da koriste StringSerializer.class.
Da biste zaista poslali poruku na temu, morate koristiti metod producer.send() koji uzima ProducerRecord. Sledeći kod vam daje metod koji će poslati poruku na temu i odštampati odgovor zajedno sa pomakom poruke.
public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); }
Sa kompletnim kodom, sada možete slati poruke na svoju temu. Možete koristiti glavni metod da biste ovo testirali, kao što je predstavljeno u kodu ispod:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); } public static void main(String[] args) throws Exception{ SimpleProducer producer = new SimpleProducer("localhost", "9092"); producer.produce("transactions", "This is a transactional record of $200"); } }
U ovom kodu pravite SimpleProducer koji se povezuje sa vašim Apache Kafka serverom na lokalnom računaru. Interno koristi KafkaProducer za slanje tekstualnih poruka na vašu temu.
Kreirajte Apache Kafka Java potrošača
Vreme je da napravimo potrošača Apache Kafka koristeći Java klijent. Napravite klasu pod nazivom SimpleConsumer.java. Zatim, kreiraćete konstruktor za ovu klasu, koji inicijalizuje org.apache.kafka.clients.consumer.KafkaConsumer. Za kreiranje potrošača, potrebni su vam host i port na kom radi Apache Kafka server. Pored toga, potrebna vam je grupa potrošača, kao i tema iz koje želite da čitate. Koristite sledeći isečak koda:
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } }
Slično Kafka proizvođaču, Kafka potrošač takođe uzima Properties objekat. Pogledajmo sva različita svojstva:
- BOOTSTRAP_SERVERS_CONFIG govori potrošaču gde se nalazi Apache Kafka server
- Grupa potrošača se definiše pomoću GROUP_ID_CONFIG
- Kada potrošač počne da troši podatke, AUTO_OFFSET_RESET_CONFIG vam omogućava da navedete koliko unazad želite da počnete da trošite poruke
- KEY_DESERIALIZER_CLASS_CONFIG govori potrošaču tip ključa poruke
- VALUE_DESERIALIZER_CLASS_CONFIG govori potrošaču tip stvarne poruke
Pošto ćete u vašem slučaju koristiti tekstualne poruke, svojstva deseralizacije su podešena na StringDeserializer.class.
Sada ćete trošiti poruke iz svoje teme. Da bi stvari bile jednostavne, kada se poruka potroši, odštampaćete poruku na konzoli. Hajde da vidimo kako to možete postići korišćenjem koda ispod:
private boolean keepConsuming = true; public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } }
Ovaj kod će nastaviti da ispituje temu. Kada primi bilo koji zapis, poruka će biti odštampana. Testirajte svog potrošača u akciji koristeći glavni metod. Pokrenućete Java aplikaciju koja će nastaviti da troši podatke iz teme i štampa poruke. Zaustavite Java aplikaciju da biste prekinuli potrošača.
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } } public static void main(String[] args) { SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions"); simpleConsumer.consume(); } }
Kada pokrenete kod, primetićete da on ne koristi samo poruke koje je proizveo vaš Java proizvođač, već i one koje ste proizveli putem Console Producer-a. To je zato što je svojstvo AUTO_OFFSET_RESET_CONFIG postavljeno na earliest.
Kada je SimpleConsumer pokrenut, možete koristiti konzolnog proizvođača ili SimpleProducer Java aplikaciju da kreirate dodatne poruke za temu. Videćete kako se one troše i štampaju na konzoli.
Ispunite sve svoje potrebe za protokom podataka uz Apache Kafku
Apache Kafka vam omogućava da sa lakoćom obradite sve vaše zahteve u vezi sa protokom podataka. Sa Apache Kafka podešavanjem na lokalnom računaru, možete istražiti sve različite funkcionalnosti koje Kafka pruža. Pored toga, zvanični Java klijent vam omogućava da efikasno pišete, povezujete se i komunicirate sa vašim Apache Kafka serverom.
Pošto je svestran, skalabilan i veoma efikasan sistem za striming podataka, Apache Kafka može zaista da promeni stvari. Možete ga koristiti za svoj lokalni razvoj ili ga integrisati u produkcione sisteme. Kao što je lako podesiti ga lokalno, postavljanje Apache Kafke za veće aplikacije nije težak zadatak.
Ako tražite platforme za striming podataka, možete pogledati najbolje platforme za striming podataka za analizu i obradu u realnom vremenu.