Bazaprogram.ru

Новости из мира ПК
3 просмотров
Рейтинг статьи
1 звезда2 звезды3 звезды4 звезды5 звезд
Загрузка...

Kafka java example

Apache Kafka – мой конспект

Это мой конспект, в котором коротко и по сути затрону такие понятия Kafka как:

— Тема (Topic)
— Подписчики (consumer)
— Издатель (producer)
— Группа (group), раздел (partition)
— Потоки (streams)

Kafka — основное

При изучении Kafka возникали вопросы, ответы на которые мне приходилось эксперементально получать на примерах, вот это и изложено в этом конспекте. Как стартовать и с чего начать я дам одну из ссылок ниже в материалах.

Apache Kafka – диспетчер сообщений на Java платформе. В Kafka есть тема сообщения в которую издатели пишут сообщения и есть подписчики в темах, которые читают эти сообщения, все сообщения в процессе диспетчеризации пишутся на диск и не зависит от потребителей.

В состав Kafka входят набор утилит по созданию тем, разделов, готовые издатели, подписчики для примеров и др. Для работы Kafka необходим координатор «ZooKeeper», поэтому вначале стартуем ZooKeeper (zkServer.cmd) затем сервер Kafka (kafka-server-start.bat), командные файлы находятся в соответствующих папках bin, там же и утилиты.

Создадим тему Kafka утилитой, ходящей в состав

kafka-topics.bat —create —zookeeper localhost:2181 —replication-factor 1 —partitions 1 —topic out-topic

здесь указываем сервер zookeeper, replication-factor это количество реплик журнала сообщений, partitions – количество разделов в теме (об этом ниже) и собственно сама тема – “out-topic”.

Для простого тестирования можно использовать входящие в состав готовые приложения «kafka-console-consumer» и «kafka-console-producer», но я сделаю свои. Подписчики на практике объединяют в группы, это позволит разным приложениям читать сообщения из темы параллельно.

Для каждого приложения будет организованна своя очередь, читая из которой оно выполняет перемещения указателя последнего прочитанного сообщения (offset), это называется фиксацией (commit) чтения. И так если издатель отправит сообщение в тему, то оно будет гарантированно прочитано получателем этой темы если он запущен или, как только он подключится. Причем если есть разные клиенты (client.id), которые читают из одной темы, но в разных группах, то сообщения они получат не зависимо друг от друга и в то время, когда будут готовы.

Так можно представить последователь сообщений и независимое чтение их потребителями из одной темы.

Но есть ситуация, когда сообщения в тему могут начать поступать быстрее чем уходить, т.е. потребители обрабатывают их дольше. Для этого в теме можно предусмотреть разделы (partitions) и запускать потребителей в одной группе для этой темы.

Тогда произойдет распределение нагрузки и не все сообщения в теме и группе пойдут через одного потребителя. И тогда уже будет выбрана стратегия, как распределять сообщения по разделам. Есть несколько стратегий: round-robin – это по кругу, по хэш значению ключа, или явное указание номера раздела куда писать. Подписчики в этом случае распределяются равномерно по разделам. Если, например, подписчиков будет в группе будет больше чем разделов, то кто-то не получит сообщения. Таким образом разделы делаются для улучшения масштабируемости.

Например после создания темы с одним разделом я изменил на два раздела.

kafka-topics.bat —zookeeper localhost:2181 —alter —topic out-topic —partitions 2

my_kafka_run.cmd com.home.SimpleProducer out-topic (издатель)
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01 (первый подписчик)
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client02 (второй подписчик)

Начав вводить в издателе пары ключ: значение можно наблюдать кто их получает. Так, например, по стратегии распределения по хэшу ключа сообщение m:1 попало клиенту client01

а сообщение n:1 клиенту client02

Если начну вводить без указания пар ключ: значение (такую возможность сделал в издателе), будет выбрана стратегия по кругу. Первое сообщение «m» попало client01, а уже втрое client02.

И еще вариант с указанием раздела, например в таком формате key:value:partition

Ранее в стратегии по хэш, m:1 уходил другому клиенту (client01), теперь при явном указании раздела (№1, нумеруются с 0) — к client02.

Если запустить подписчика с другим именем группы testGroup02 и для той же темы, то сообщения будут уходить параллельно и независимо подписчикам, т.е. если первый прочитал, а второй не был активен, то он прочитает, как только станет активен.

Можно посмотреть описания групп, темы соответственно:

kafka-consumer-groups.bat —bootstrap-server localhost:9092 —describe —group testGroup01

kafka-topics.bat —describe —zookeeper localhost:2181 —topic out-topic

Для запуска своих программ я сделал командный файл — my_kafka_run.cmd

my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup02 client01

Kafka Streams

Итак, потоки в Kafka это последовательность событий, которые получают из темы, над которой можно выполнять определенные операции, трансформации и затем результат отдать далее, например, в другую тему или сохранить в БД, в общем куда угодно. Операции могут быть как например фильтрации (filter), преобразования (map), так и агрегации (count, sum, avg). Для этого есть соответствующие классы KStream, KTable, где KTable можно представить как таблицу с текущими агрегированными значениями которые постоянно обновляются по мере поступления новых сообщений в тему. Как это происходит?

Например, издатель пишет в тему события (сообщения), Kafka все сообщения сохраняет в журнале сообщений, который имеет политику хранения (Retention Policy), например 7 дней. Например события изменения котировки это поток, далее хотим узнать среднее значение, тогда создадим Stream который возьмет историю из журнала и посчитает среднее, где ключом будет акция, а значением – среднее (это уже таблица с состоянием). Тут есть особенность – операции агрегирования в отличии от операций, например, фильтрации, сохраняют состояние. Поэтому вновь поступающие сообщения (события) в тему, будут подвержены вычислению, а результат будет сохраняться (state store), далее вновь поступающие будут писаться в журнал, Stream их будет обрабатывать, добавлять изменения к уже сохраненному состоянию. Операции фильтрации не требуют сохранения состояния. И тут тоже stream будет делать это не зависимо от издателя. Например, издатель пишет сообщения, а программа — stream в это время не работает, ничего не пропадет, все сообщения будут сохранены в журнале и как только программа-stream станет активной, она сделает вычисления, сохранит состояние, выполнит смещение для прочитанных сообщений (пометит что они прочитаны) и в дальнейшем она уже к ним не вернется, более того эти сообщения уйдут из журнала (kafka-logs). Тут видимо главное, чтобы журнал (kafka-logs) и его политика хранения позволило это. По умолчанию состояние Kafka Stream хранит в RocksDB. Журнал сообщений и все с ним связанное (темы, смещения, потоки, клиенты и др.) располагается по пути указанном в параметре «log.dirs=kafka-logs» файла конфигурации «configserver.properties», там же указывается политика хранения журнала «log.retention.hours=48». Пример лога

Читать еще:  Класс scanner java

А путь к базе с состояниями stream указывается в параметре приложения

Состояния хранятся по ИД приложениям независимо (StreamsConfig.APPLICATION_ID_CONFIG). Пример

Проверим теперь как работает Stream. Подготовим приложение Stream из примера, который есть поставке (с некоторой доработкой для эксперимента), которое считает количество одинаковых слов и приложение издатель и подписчик. Писать будет в тему in-topic

my_kafka_run.cmd com.home.SimpleProducer in-topic

my_kafka_run.cmd com.home.KafkaCountStream in-topic app_01

my_kafka_run.cmd com.home.SimpleProducer in-topic
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01

Начинаем вводить слова и видим их подсчет с указанием какой Stream App-ID их подсчитал

Работа будет идти независимо, можно остановить Stream и продолжать писать в тему, он потом при старте посчитает. А теперь подключим второй Stream c App- >

А теперь представим наш журнал устарел (Retention политика) или мы его удалили (что бывает надо делать) и подключаем третий stream с App- >

Если затем запустим поток app_02, то он догонит первый и они будут равны в значениях. Из примера стало понятно, как Kafka обрабатывает текущий журнал, добавляет к ранее сохраненному состоянию и так далее.

Тема Kafka очень обширна, я для себя сделал первое общее представление 🙂

Kafka Producer and Consumer Examples Using Java

In this article, a software engineer will show us how to produce and consume records/messages with Kafka brokers. Let’s get to it!

Join the DZone community and get the full member experience.

In my last article, we discussed how to setup Kafka using Zookeeper. In this article, we will see how to produce and consume records/messages with Kafka brokers.

Before starting with an example, let’s get familiar first with the common terms and some commands used in Kafka.

Record: Producer sends messages to Kafka in the form of records. A record is a key-value pair. It contains the topic name and partition number to be sent. Kafka broker keeps records inside topic partitions. Records sequence is maintained at the partition level. You can define the logic on which basis partition will be determined.

Topic: Producer writes a record on a topic and the consumer listens to it. A topic can have many partitions but must have at least one.

Partition: A topic partition is a unit of parallelism in Kafka, i.e. two consumers cannot consume messages from the same partition at the same time. A consumer can consume from multiple partitions at the same time.

Offset: A record in a partition has an offset associated with it. Think of it like this: partition is like an array; offsets are like indexs.

Producer: Creates a record and publishes it to the broker.

Consumer: Consumes records from the broker.

Commands: In Kafka, a setup directory inside the bin folder is a script (kafka-topics.sh), using which, we can create and delete topics and check the list of topics. Go to the Kafka home directory.

Execute this command to see the list of all topics.

./bin/kafka-topics.sh —list —zookeeper localhost:2181 .

localhost:2181 is the Zookeeper address that we defined in the server.properties file in the previous article.

Execute this command to create a topic.

./bin/kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 1 —partitions 100 —topic demo .

replication-factor : if Kafka is running in a cluster, this determines on how many brokers a partition will be replicated. The partitions argument defines how many partitions are in a topic.

After a topic is created you can increase the partition count but it cannot be decreased. demo , here, is the topic name.

Execute this command to delete a topic.

./bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic demo .

This command will have no effect if in the Kafka server.properties file, if delete.topic.enable is not set to be true.

Execute this command to see the information about a topic.

./bin/kafka-topics.sh —describe —topic demo —zookeeper localhost:2181 .

Now that we know the common terms used in Kafka and the basic commands to see information about a topic ,let’s start with a working example.

The above snippet contains some constants that we will be using further.

The above snippet creates a Kafka producer with some properties.

BOOTSTRAP_SERVERS_CONFIG: The Kafka broker’s address. If Kafka is running in a cluster then you can provide comma (,) seperated addresses. For example: localhost:9091,localhost:9092

CLIENT_ID_CONFIG: Id of the producer so that the broker can determine the source of the request.

KEY_SERIALIZER_CLASS_CONFIG: The class that will be used to serialize the key object. In our example, our key is Long , so we can use the LongSerializer class to serialize the key. If in your use case you are using some other object as the key then you can create your custom serializer class by implementing the Serializer interface of Kafka and overriding the serialize method.

VALUE_SERIALIZER_CLASS_CONFIG: The class that will be used to serialize the value object. In our example, our value is String , so we can use the StringSerializer class to serialize the key. If your value is some other object then you create your custom serializer class. For example:

Читать еще:  Java lang runtimeexception

PARTITIONER_CLASS_CONFIG: The class that will be used to determine the partition in which the record will go. In the demo topic, there is only one partition, so I have commented this property. You can create your custom partitioner by implementing the CustomPartitioner interface. For example:

In above the CustomPartitioner class, I have overridden the method partition which returns the partition number in which the record will go.

The above snippet creates a Kafka consumer with some properties.

BOOTSTRAP_SERVERS_CONFIG: The Kafka broker’s address. If Kafka is running in a cluster then you can provide comma (,) seperated addresses. For example: localhost:9091,localhost:9092 .

GROUP_ID_CONFIG: The consumer group id used to identify to which group this consumer belongs.

KEY_DESERIALIZER_CLASS_CONFIG: The class name to deserialize the key object. We have used Long as the key so we will be using LongDeserializer as the deserializer class. You can create your custom deserializer by implementing the Deserializer interface provided by Kafka.

VALUE_DESERIALIZER_CLASS_CONFIG: The class name to deserialize the value object. We have used String as the value so we will be using StringDeserializer as the deserializer class. You can create your custom deserializer. For example:

MAX_POLL_RECORDS_CONFIG: The max count of records that the consumer will fetch in one iteration.

ENABLE_AUTO_COMMIT_CONFIG: When the consumer from a group receives a message it must commit the offset of that record. If this configuration is set to be true then, periodically, offsets will be committed, but, for the production level, this should be false and an offset should be committed manually.

AUTO_OFFSET_RESET_CONFIG: For each consumer group, the last committed offset value is stored. This configuration comes handy if no offset is committed for that group, i.e. it is the new group created.

Setting this value to earliest will cause the consumer to fetch records from the beginning of offset i.e from zero.

Setting this value to latest will cause the consumer to fetch records from the new records. By new records mean those created after the consumer group became active.

The above snippet explains how to produce and consume messages from a Kafka broker. If you want to run a producer then call the runProducer function from the main function. If you want to run a consumeer, then call the runConsumer function from the main function.

The offset of records can be committed to the broker in both asynchronous and synchronous ways. Using the synchronous way, the thread will be blocked until an offset has not been written to the broker.

Conclusion

We have seen how Kafka producers and consumers work. You can check out the whole project on my GitHub page. If you are facing any issues with Kafka, please ask in the comments. In next article, I will be discussing how to set up monitoring tools for Kafka using Burrow.

Kafka java example

GitHub is home to over 40 million developers working together to host and review code, manage projects, and build software together.

java-kafka-example / src / main / java / KafkaExample.java

Users who have contributed to this file

import org.apache.kafka.clients.consumer.ConsumerRecord ;
import org.apache.kafka.clients.consumer.ConsumerRecords ;
import org.apache.kafka.clients.consumer.KafkaConsumer ;
import org.apache.kafka.clients.producer.KafkaProducer ;
import org.apache.kafka.clients.producer.Producer ;
import org.apache.kafka.clients.producer.ProducerRecord ;
import org.apache.kafka.common.serialization.StringSerializer ;
import org.apache.kafka.common.serialization.StringDeserializer ;
import java.util.Arrays ;
import java.util.Date ;
import java.util.Properties ;
public class KafkaExample <
private final String topic;
private final Properties props;
public KafkaExample ( String brokers , String username , String password ) <
this . topic = username + » -default » ;
String jaasTemplate = » org.apache.kafka.common.security.scram.ScramLoginModule required username= » %s » password= » %s » ; » ;
String jaasCfg = String . format(jaasTemplate, username, password);
String serializer = StringSerializer . >. getName();
String deserializer = StringDeserializer . >. getName();
props = new Properties ();
props . put( » bootstrap.servers » , brokers);
props . put( » group. >» , username + » -consumer » );
props . put( » enable.auto.commit » , » true » );
props . put( » auto.commit.interval.ms » , » 1000 » );
props . put( » auto.offset.reset » , » earliest » );
props . put( » session.timeout.ms » , » 30000 » );
props . put( » key.deserializer » , deserializer);
props . put( » value.deserializer » , deserializer);
props . put( » key.serializer » , serializer);
props . put( » value.serializer » , serializer);
props . put( » security.protocol » , » SASL_SSL » );
props . put( » sasl.mechanism » , » SCRAM-SHA-256 » );
props . put( » sasl.jaas.config » , jaasCfg);
>
public void consume () <
KafkaConsumer String , String > consumer = new KafkaConsumer<> (props);
consumer . subscribe( Arrays . asList(topic));
while ( true ) <
ConsumerRecords String , String > records = consumer . poll( 1000 );
for ( ConsumerRecord String , String > record : records) <
System . out . printf( » %s [%d] offset=%d, key=%s, value= » %s «n » ,
record . topic(), record . partition(),
record . offset(), record . key(), record . value());
>
>
>
public void produce () <
Thread one = new Thread () <
public void run () <
try <
Producer String , String > producer = new KafkaProducer<> (props);
int i = 0 ;
while ( true ) <
Date d = new Date ();
producer . send( new ProducerRecord<> (topic, Integer . toString(i), d . toString()));
Thread . sleep( 1000 );
i ++ ;
>
> catch ( InterruptedException v) <
System . out . println(v);
>
>
>;
one . start();
>
public static void main ( String [] args ) <
String brokers = System . getenv( » CLOUDKARAFKA_BROKERS » );
String username = System . getenv( » CLOUDKARAFKA_USERNAME » );
String password = System . getenv( » CLOUDKARAFKA_PASSWORD » );
KafkaExample c = new KafkaExample (brokers, username, password);
c . produce();
c . consume();
>
>
  • © 2020 GitHub, Inc.
  • Terms
  • Privacy
  • Security
  • Status
  • Help

You can’t perform that action at this time.

You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.

Introduction to KafkaStreams in Java

Last modified: September 15, 2018

Читать еще:  Java lang number

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

In the 9 years of running Baeldung, I’ve never, ever done a «sale».
But. we’ve also not been through anything like this pandemic either.
And, if making my courses more affordable for a while is going to help a company stay in business, or a developer land a new job, make rent or be able to provide for their family — then it’s well worth doing.
Effective immediately, all Baeldung courses are 33% off their normal prices!
You’ll find all three courses in the menu, above, or here.

1. Overview

In this article, we’ll be looking at the KafkaStreams library.

KafkaStreams is engineered by the creators of Apache Kafka. The primary goal of this piece of software is to allow programmers to create efficient, real-time, streaming applications that could work as Microservices.

KafkaStreams enables us to consume from Kafka topics, analyze or transform data, and potentially, send it to another Kafka topic.

To demonstrate KafkaStreams, we’ll create a simple application that reads sentences from a topic, counts occurrences of words and prints the count per word.

Important to note is that the KafkaStreams library isn’t reactive and has no support for async operations and backpressure handling.

2. Maven Dependency

To start writing Stream processing logic using KafkaStreams, we need to add a dependency to kafka-streams and kafka-clients:

We also need to have Apache Kafka installed and started because we’ll be using a Kafka topic. This topic will be the data source for our streaming job.

We can download Kafka and other required dependencies from the official website.

3. Configuring KafkaStreams Input

The first thing we’ll do is the definition of the input Kafka topic.

We can use the Confluent tool that we downloaded – it contains a Kafka Server. It also contains the kafka-console-producer that we can use to publish messages to Kafka.

To get started let’s run our Kafka cluster:

Once Kafka starts, we can define our data source and name of our application using APPLICATION_ID_CONFIG:

A crucial configuration parameter is the BOOTSTRAP_SERVER_CONFIG. This is the URL to our local Kafka instance that we just started:

Next, we need to pass the type of the key and value of messages that will be consumed from inputTopic:

Stream processing is often stateful. When we want to save intermediate results, we need to specify the STATE_DIR_CONFIG parameter.

In our test, we’re using a local file system:

4. Building a Streaming Topology

Once we defined our input topic, we can create a Streaming Topology – that is a definition of how events should be handled and transformed.

In our example, we’d like to implement a word counter. For every sentence sent to inputTopic, we want to split it into words and calculate the occurrence of every word.

We can use an instance of the KStreamsBuilder class to start constructing our topology:

To implement word count, firstly, we need to split the values using the regular expression.

The split method is returning an array. We’re using the flatMapValues() to flatten it. Otherwise, we’d end up with a list of arrays, and it’d be inconvenient to write code using such structure.

Finally, we’re aggregating the values for every word and calling the count() that will calculate occurrences of a specific word.

5. Handling Results

We already calculated the word count of our input messages. Now let’s print the results on the standard output using the foreach() method:

On production, often such streaming job might publish the output to another Kafka topic.

We could do this using the to() method:

The Serde class gives us preconfigured serializers for Java types that will be used to serialize objects to an array of bytes. The array of bytes will then be sent to the Kafka topic.

We’re using String as a key to our topic and Long as a value for the actual count. The to() method will save the resulting data to outputTopic.

6. Starting KafkaStream Job

Up to this point, we built a topology that can be executed. However, the job hasn’t started yet.

We need to start our job explicitly by calling the start() method on the KafkaStreams instance:

Note that we are waiting 30 seconds for the job to finish. In a real-world scenario, that job would be running all the time, processing events from Kafka as they arrive.

We can test our job by publishing some events to our Kafka topic.

Let’s start a kafka-console-producer and manually send some events to our inputTopic:

This way, we published two events to Kafka. Our application will consume those events and will print the following output:

We can see that when the first message arrived, the word pony occurred only once. But when we sent the second message, the word pony happened for the second time printing: “word: pony -> 2″.

6. Conclusion

This article discusses how to create a primary stream processing application using Apache Kafka as a data source and the KafkaStreams library as the stream processing library.

All these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

Ссылка на основную публикацию
Adblock
detector