Водич корак по корак за подешавање и покретање

Милиони записа података се генеришу сваког дана у данашњим рачунарским системима. То укључује ваше финансијске трансакције, поруџбину или податке са сензора вашег аутомобила. За обраду ових догађаја стримовања података у реалном времену и за поуздано премештање записа догађаја између различитих система предузећа, потребно је Апацхе Кафка.

Апацхе Кафка је решење за стримовање података отвореног кода које обрађује преко милион записа у секунди. Поред ове велике пропусности, Апацхе Кафка пружа високу скалабилност и доступност, ниско кашњење и трајно складиштење.

Компаније као што су ЛинкедИн, Убер и Нетфлик ослањају се на Апацхе Кафку за обраду у реалном времену и стриминг података. Најлакши начин да почнете да користите Апацхе Кафка је да га покренете и покренете на вашој локалној машини. Ово вам омогућава не само да видите Апацхе Кафка сервер у акцији, већ вам такође омогућава да производите и конзумирате поруке.

Са практичним искуством у покретању сервера, креирању тема и писању Јава кода помоћу Кафка клијента, бићете спремни да користите Апацхе Кафку да испуните све своје потребе за цевоводом података.

Како преузети Апацхе Кафка на вашој локалној машини

Најновију верзију Апацхе Кафке можете преузети са званична веза. Преузети садржај ће бити компримован у .тгз формату. Када преузмете, мораћете да извадите исто.

Ако сте Линук, отворите свој терминал. Затим идите до локације на којој сте преузели Апацхе Кафка компресовану верзију. Покрените следећу команду:

tar -xzvf kafka_2.13-3.5.0.tgz

Након што се команда заврши, видећете да је нови директоријум под називом кафка_2.13-3.5.0. Крећите се унутар фасцикле користећи:

cd kafka_2.13-3.5.0

Сада можете да наведете садржај овог директоријума користећи команду лс.

За кориснике оперативног система Виндовс, можете пратити исте кораке. Ако не можете да пронађете команду тар, можете користити алат треће стране као што је ВинЗип да отворите архиву.

Како покренути Апацхе Кафку на вашој локалној машини

Након што сте преузели и распаковали Апацхе Кафка, време је да почнете да га покрећете. Нема инсталатера. Можете директно почети да га користите преко командне линије или прозора терминала.

Пре него што почнете са Апацхе Кафком, уверите се да имате инсталирану Јава 8+ на вашем систему. Апацхе Кафка захтева покренуту Јава инсталацију.

#1. Покрените Апацхе Зоокеепер сервер

Први корак је покретање Апацхе Зоокеепер-а. Добијате га унапред преузети као део архиве. То је услуга која је одговорна за одржавање конфигурација и обезбеђивање синхронизације за друге услуге.

Када сте у директоријуму у који сте издвојили садржај архиве, покрените следећу команду:

За кориснике Линук-а:

bin/zookeeper-server-start.sh config/zookeeper.properties

За кориснике Виндовс-а:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Датотека зоокеепер.пропертиес пружа конфигурације за покретање Апацхе Зоокеепер сервера. Можете да конфигуришете својства као што су локални директоријум где ће подаци бити ускладиштени и порт на коме ће сервер радити.

#2. Покрените Апацхе Кафка сервер

Сада када је Апацхе Зоокеепер сервер покренут, време је да покренете Апацхе Кафка сервер.

Отворите нови прозор терминала или командне линије и идите до директоријума у ​​којем се налазе екстраховане датотеке. Затим можете покренути Апацхе Кафка сервер користећи наредбу испод:

За кориснике Линук-а:

bin/kafka-server-start.sh config/server.properties

За кориснике Виндовс-а:

bin/windows/kafka-server-start.bat config/server.properties

Имате покренут Апацхе Кафка сервер. У случају да желите да промените подразумевану конфигурацију, то можете учинити изменом датотеке сервер.пропертиес. Различите вредности су присутне у званична документација.

Како користити Апацхе Кафка на вашој локалној машини

Сада сте спремни да почнете да користите Апацхе Кафка на вашој локалној машини за производњу и употребу порука. Пошто су сервери Апацхе Зоокеепер и Апацхе Кафка покренути и раде, хајде да видимо како можете да креирате своју прву тему, произведете своју прву поруку и искористите исту.

Који су кораци за креирање теме у Апацхе Кафка?

Пре него што креирате своју прву тему, хајде да разумемо шта је тема заправо. У Апацхе Кафки, тема је логичко складиште података које помаже у стримингу података. Замислите то као канал кроз који се подаци преносе од једне компоненте до друге.

Тема подржава више произвођача и више потрошача – више од једног система може да пише и чита из теме. За разлику од других система за размену порука, било која порука из теме може се користити више пута. Поред тога, можете навести и период задржавања ваших порука.

Узмимо пример система (произвођача) који производи податке за банковне трансакције. А други систем (потрошач) користи ове податке и шаље кориснику обавештење о апликацији. Да би се ово олакшало, потребна је тема.

Отворите нови прозор терминала или командне линије и идите до директоријума у ​​који сте распаковали архиву. Следећа команда ће креирати тему под називом трансакције:

За кориснике Линук-а:

bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092

За кориснике Виндовс-а:

bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Сада сте направили своју прву тему и спремни сте да почнете да производите и конзумирате поруке.

Како направити поруку Апацхе Кафки?

Када је ваша тема Апацхе Кафка спремна, сада можете да креирате своју прву поруку. Отворите нови прозор терминала или командне линије или користите исти који сте користили за креирање теме. Затим се уверите да сте у одговарајућем директоријуму у који сте издвојили садржај архиве. Можете користити командну линију да креирате своју поруку о теми користећи следећу команду:

За кориснике Линук-а:

bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092

За кориснике Виндовс-а:

bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092

Када покренете команду, видећете да ваш терминал или прозор командне линије чека на унос. Напишите своју прву поруку и притисните Ентер.

> This is a transactional record for $100

Направили сте своју прву поруку Апацхе Кафки на вашој локалној машини. Након тога, сада сте спремни да искористите ову поруку.

Како искористити поруку од Апацхе Кафке?

Под условом да је ваша тема направљена и да сте произвели поруку својој Кафка теми, сада можете користити ту поруку.

Апацхе Кафка вам омогућава да повежете више корисника са истом темом. Сваки потрошач може бити део потрошачке групе – логички идентификатор. На пример, ако имате две услуге које треба да конзумирају исте податке, онда могу имати различите групе потрошача.

Међутим, ако имате две инстанце исте услуге, онда бисте желели да избегнете коришћење и обраду исте поруке двапут. У том случају, обојица ће имати исту групу потрошача.

У прозору терминала или командне линије, уверите се да сте у одговарајућем директоријуму. Користите следећу команду да покренете потрошача:

За кориснике Линук-а:

bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

За кориснике Виндовс-а:

bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Видећете да се порука коју сте претходно произвели појавила на вашем терминалу. Сада сте користили Апацхе Кафку да искористите своју прву поруку.

Команда кафка-цонсоле-цонсумер узима много аргумената који су прослеђени. Хајде да видимо шта сваки од њих значи:

  • – тема помиње тему из које ћете конзумирати
  • –од почетка говори потрошачу конзоле да почне да чита поруке одмах од прве присутне поруке
  • Ваш Апацхе Кафка сервер се помиње преко опције –боотстрап-сервер
  • Поред тога, можете поменути групу потрошача тако што ћете проследити параметар –гроуп
  • У недостатку параметра групе потрошача, он се аутоматски генерише

Када је потрошач конзоле покренут, можете покушати да произведете нове поруке. Видећете да су сви потрошени и да се појављују у вашем терминалу.

Сада када сте креирали своју тему и успешно произвели и конзумирали поруке, хајде да ово интегришемо са Јава апликацијом.

Како креирати Апацхе Кафка произвођача и потрошача користећи Јаву

Пре него што почнете, уверите се да имате инсталирану Јава 8+ на вашој локалној машини. Апацхе Кафка обезбеђује сопствену клијентску библиотеку која вам омогућава да се повежете неприметно. Ако користите Мавен за управљање својим зависностима, додајте следећу зависност у свој пом.кмл

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

Библиотеку можете преузети и са Мавен репозиториј и додајте га у своју путању Јава класа.

Када ваша библиотека буде на месту, отворите уређивач кода по вашем избору. Хајде да видимо како можете покренути свог произвођача и потрошача користећи Јаву.

Креирајте Апацхе Кафка Јава произвођача

Са библиотеком кафка-цлиентс на месту, сада сте спремни да почнете да креирате свог Кафка продуцента.

Хајде да направимо класу под називом СимплеПродуцер.јава. Ово ће бити одговорно за стварање порука о теми коју сте раније креирали. Унутар ове класе, креираћете инстанцу орг.апацхе.кафка.цлиентс.продуцер.КафкаПродуцер. Након тога ћете користити овог произвођача за слање порука.

Да бисте креирали Кафка произвођача, потребни су вам хост и порт вашег Апацхе Кафка сервера. Пошто га покрећете на својој локалној машини, хост ће бити лоцалхост. С обзиром на то да нисте променили подразумевана својства приликом покретања сервера, порт ће бити 9092. Размотрите следећи код у наставку који ће вам помоћи да креирате свог произвођача:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }
}

Приметићете да се постављају три својства. Хајде да брзо прођемо кроз сваки од њих:

  • БООТСТРАП_СЕРВЕРС_ЦОНФИГ вам омогућава да дефинишете где се Апацхе Кафка сервер покреће
  • КЕИ_СЕРИАЛИЗЕР_ЦЛАСС_ЦОНФИГ говори произвођачу који формат да користи за слање кључева поруке.
  • Формат за слање стварне поруке је дефинисан коришћењем својства ВАЛУЕ_СЕРИАЛИЗЕР_ЦЛАСС_ЦОНФИГ.

Пошто ћете слати текстуалне поруке, оба својства су подешена да користе СтрингСериализер.цласс.

Да бисте заправо послали поруку вашој теми, морате да користите метод продуцер.сенд() који узима ПродуцерРецорд. Следећи код вам даје метод који ће послати поруку теми и одштампати одговор заједно са померањем поруке.

public void produce(String topic, String message) throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    final Future<RecordMetadata> send = this.producer.send(record);
    final RecordMetadata recordMetadata = send.get();
    System.out.println(recordMetadata);
}

Са комплетним кодом на месту, сада можете слати поруке на своју тему. Можете користити главни метод да бисте ово тестирали, као што је представљено у коду испод:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }

    public void produce(String topic, String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        final Future<RecordMetadata> send = this.producer.send(record);
        final RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

    public static void main(String[] args) throws Exception{
       SimpleProducer producer = new SimpleProducer("localhost", "9092");
       producer.produce("transactions", "This is a transactional record of $200");
    }
}

У овом коду правите СимплеПродуцер који се повезује са вашим Апацхе Кафка сервером на вашој локалној машини. Интерно користи КафкаПродуцер за производњу текстуалних порука на вашу тему.

Креирајте Апацхе Кафка Јава потрошача

Време је да направимо потрошача Апацхе Кафка користећи Јава клијент. Направите класу под називом СимплеЦонсумер.јава. Затим ћете креирати конструктор за ову класу, који иницијализује орг.апацхе.кафка.цлиентс.цонсумер.КафкаЦонсумер. Да бисте креирали потрошача, потребни су вам хост и порт на коме ради Апацхе Кафка сервер. Поред тога, потребна вам је група потрошача, као и тема из које желите да читате. Користите исечак кода дат у наставку:

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }
}

Слично Кафка произвођачу, Кафка потрошач такође узима објекат Пропертиес. Погледајмо сва различита својства:

  • БООТСТРАП_СЕРВЕРС_ЦОНФИГ говори потрошачу где Апацхе Кафка сервер ради
  • Група потрошача се помиње помоћу ГРОУП_ИД_ЦОНФИГ
  • Када потрошач почне да конзумира, АУТО_ОФФСЕТ_РЕСЕТ_ЦОНФИГ вам омогућава да наведете колико уназад желите да почнете да конзумирате поруке од
  • КЕИ_ДЕСЕРИАЛИЗЕР_ЦЛАСС_ЦОНФИГ говори потрошачу тип кључа поруке
  • ВАЛУЕ_ДЕСЕРИАЛИЗЕР_ЦЛАСС_ЦОНФИГ говори о типу потрошача стварне поруке

Пошто ћете у вашем случају користити текстуалне поруке, својства десеријализатора су подешена на СтрингДесериализер.цласс.

Сада ћете конзумирати поруке из своје теме. Да би ствари биле једноставне, када се порука потроши, одштампаћете поруку на конзоли. Хајде да видимо како то можете постићи користећи код у наставку:

private boolean keepConsuming = true;

public void consume() {
    while (keepConsuming) {
        final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
        if (consumerRecords != null && !consumerRecords.isEmpty()) {
            consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                System.out.println(consumerRecord.value());
            });
        }
    }
}

Овај код ће наставити да испитује тему. Када примите било коју евиденцију потрошача, порука ће бити одштампана. Тестирајте свог потрошача у акцији користећи главни метод. Покренућете Јава апликацију која ће наставити да користи тему и штампа поруке. Зауставите Јава апликацију да бисте прекинули корисника.

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }

    public void consume() {
        while (keepConsuming) {
            final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
            if (consumerRecords != null && !consumerRecords.isEmpty()) {
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    System.out.println(consumerRecord.value());
                });
            }
        }
    }

    public static void main(String[] args) {
        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions");
        simpleConsumer.consume();
    }
}

Када покренете код, приметићете да он не користи само поруке које је произвео ваш Јава произвођач, већ и оне које сте произвели преко Цонсоле Продуцер. То је зато што је својство АУТО_ОФФСЕТ_РЕСЕТ_ЦОНФИГ постављено на најраније.

Када је СимплеЦонсумер покренут, можете користити произвођача конзоле или СимплеПродуцер Јава апликацију да креирате даље поруке за тему. Видећете како се конзумирају и штампају на конзоли.

Испуните све своје потребе за цевоводом података уз Апацхе Кафка

Апацхе Кафка вам омогућава да са лакоћом рукујете свим вашим захтевима за цевоводе података. Са Апацхе Кафка подешавањем на вашој локалној машини, можете истражити све различите функције које Кафка пружа. Поред тога, званични Јава клијент вам омогућава да ефикасно пишете, повезујете се и комуницирате са вашим Апацхе Кафка сервером.

Будући да је свестран, скалабилан и веома ефикасан систем за стриминг података, Апацхе Кафка може заиста да промени игру за вас. Можете га користити за свој локални развој или га чак интегрисати у своје производне системе. Баш као што је лако поставити локално, постављање Апацхе Кафке за веће апликације није велики задатак.

Ако тражите платформе за стриминг података, можете погледати најбоље платформе за стриминг података за анализу и обраду у реалном времену.