Bazaprogram.ru

Новости из мира ПК
5 просмотров
Рейтинг статьи
1 звезда2 звезды3 звезды4 звезды5 звезд
Загрузка...

Java kafka consumer

Apache Kafka – мой конспект

Это мой конспект, в котором коротко и по сути затрону такие понятия Kafka как:

— Тема (Topic)
— Подписчики (consumer)
— Издатель (producer)
— Группа (group), раздел (partition)
— Потоки (streams)

Kafka — основное

При изучении Kafka возникали вопросы, ответы на которые мне приходилось эксперементально получать на примерах, вот это и изложено в этом конспекте. Как стартовать и с чего начать я дам одну из ссылок ниже в материалах.

Apache Kafka – диспетчер сообщений на Java платформе. В Kafka есть тема сообщения в которую издатели пишут сообщения и есть подписчики в темах, которые читают эти сообщения, все сообщения в процессе диспетчеризации пишутся на диск и не зависит от потребителей.

В состав Kafka входят набор утилит по созданию тем, разделов, готовые издатели, подписчики для примеров и др. Для работы Kafka необходим координатор «ZooKeeper», поэтому вначале стартуем ZooKeeper (zkServer.cmd) затем сервер Kafka (kafka-server-start.bat), командные файлы находятся в соответствующих папках bin, там же и утилиты.

Создадим тему Kafka утилитой, ходящей в состав

kafka-topics.bat —create —zookeeper localhost:2181 —replication-factor 1 —partitions 1 —topic out-topic

здесь указываем сервер zookeeper, replication-factor это количество реплик журнала сообщений, partitions – количество разделов в теме (об этом ниже) и собственно сама тема – “out-topic”.

Для простого тестирования можно использовать входящие в состав готовые приложения «kafka-console-consumer» и «kafka-console-producer», но я сделаю свои. Подписчики на практике объединяют в группы, это позволит разным приложениям читать сообщения из темы параллельно.

Для каждого приложения будет организованна своя очередь, читая из которой оно выполняет перемещения указателя последнего прочитанного сообщения (offset), это называется фиксацией (commit) чтения. И так если издатель отправит сообщение в тему, то оно будет гарантированно прочитано получателем этой темы если он запущен или, как только он подключится. Причем если есть разные клиенты (client.id), которые читают из одной темы, но в разных группах, то сообщения они получат не зависимо друг от друга и в то время, когда будут готовы.

Так можно представить последователь сообщений и независимое чтение их потребителями из одной темы.

Но есть ситуация, когда сообщения в тему могут начать поступать быстрее чем уходить, т.е. потребители обрабатывают их дольше. Для этого в теме можно предусмотреть разделы (partitions) и запускать потребителей в одной группе для этой темы.

Тогда произойдет распределение нагрузки и не все сообщения в теме и группе пойдут через одного потребителя. И тогда уже будет выбрана стратегия, как распределять сообщения по разделам. Есть несколько стратегий: round-robin – это по кругу, по хэш значению ключа, или явное указание номера раздела куда писать. Подписчики в этом случае распределяются равномерно по разделам. Если, например, подписчиков будет в группе будет больше чем разделов, то кто-то не получит сообщения. Таким образом разделы делаются для улучшения масштабируемости.

Например после создания темы с одним разделом я изменил на два раздела.

kafka-topics.bat —zookeeper localhost:2181 —alter —topic out-topic —partitions 2

my_kafka_run.cmd com.home.SimpleProducer out-topic (издатель)
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01 (первый подписчик)
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client02 (второй подписчик)

Начав вводить в издателе пары ключ: значение можно наблюдать кто их получает. Так, например, по стратегии распределения по хэшу ключа сообщение m:1 попало клиенту client01

а сообщение n:1 клиенту client02

Если начну вводить без указания пар ключ: значение (такую возможность сделал в издателе), будет выбрана стратегия по кругу. Первое сообщение «m» попало client01, а уже втрое client02.

И еще вариант с указанием раздела, например в таком формате key:value:partition

Ранее в стратегии по хэш, m:1 уходил другому клиенту (client01), теперь при явном указании раздела (№1, нумеруются с 0) — к client02.

Если запустить подписчика с другим именем группы testGroup02 и для той же темы, то сообщения будут уходить параллельно и независимо подписчикам, т.е. если первый прочитал, а второй не был активен, то он прочитает, как только станет активен.

Можно посмотреть описания групп, темы соответственно:

kafka-consumer-groups.bat —bootstrap-server localhost:9092 —describe —group testGroup01

kafka-topics.bat —describe —zookeeper localhost:2181 —topic out-topic

Для запуска своих программ я сделал командный файл — my_kafka_run.cmd

my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup02 client01

Kafka Streams

Итак, потоки в Kafka это последовательность событий, которые получают из темы, над которой можно выполнять определенные операции, трансформации и затем результат отдать далее, например, в другую тему или сохранить в БД, в общем куда угодно. Операции могут быть как например фильтрации (filter), преобразования (map), так и агрегации (count, sum, avg). Для этого есть соответствующие классы KStream, KTable, где KTable можно представить как таблицу с текущими агрегированными значениями которые постоянно обновляются по мере поступления новых сообщений в тему. Как это происходит?

Например, издатель пишет в тему события (сообщения), Kafka все сообщения сохраняет в журнале сообщений, который имеет политику хранения (Retention Policy), например 7 дней. Например события изменения котировки это поток, далее хотим узнать среднее значение, тогда создадим Stream который возьмет историю из журнала и посчитает среднее, где ключом будет акция, а значением – среднее (это уже таблица с состоянием). Тут есть особенность – операции агрегирования в отличии от операций, например, фильтрации, сохраняют состояние. Поэтому вновь поступающие сообщения (события) в тему, будут подвержены вычислению, а результат будет сохраняться (state store), далее вновь поступающие будут писаться в журнал, Stream их будет обрабатывать, добавлять изменения к уже сохраненному состоянию. Операции фильтрации не требуют сохранения состояния. И тут тоже stream будет делать это не зависимо от издателя. Например, издатель пишет сообщения, а программа — stream в это время не работает, ничего не пропадет, все сообщения будут сохранены в журнале и как только программа-stream станет активной, она сделает вычисления, сохранит состояние, выполнит смещение для прочитанных сообщений (пометит что они прочитаны) и в дальнейшем она уже к ним не вернется, более того эти сообщения уйдут из журнала (kafka-logs). Тут видимо главное, чтобы журнал (kafka-logs) и его политика хранения позволило это. По умолчанию состояние Kafka Stream хранит в RocksDB. Журнал сообщений и все с ним связанное (темы, смещения, потоки, клиенты и др.) располагается по пути указанном в параметре «log.dirs=kafka-logs» файла конфигурации «configserver.properties», там же указывается политика хранения журнала «log.retention.hours=48». Пример лога

А путь к базе с состояниями stream указывается в параметре приложения

Состояния хранятся по ИД приложениям независимо (StreamsConfig.APPLICATION_ID_CONFIG). Пример

Проверим теперь как работает Stream. Подготовим приложение Stream из примера, который есть поставке (с некоторой доработкой для эксперимента), которое считает количество одинаковых слов и приложение издатель и подписчик. Писать будет в тему in-topic

my_kafka_run.cmd com.home.SimpleProducer in-topic

my_kafka_run.cmd com.home.KafkaCountStream in-topic app_01

my_kafka_run.cmd com.home.SimpleProducer in-topic
my_kafka_run.cmd com.home.SimpleConsumer out-topic testGroup01 client01

Начинаем вводить слова и видим их подсчет с указанием какой Stream App-ID их подсчитал

Работа будет идти независимо, можно остановить Stream и продолжать писать в тему, он потом при старте посчитает. А теперь подключим второй Stream c App- >

А теперь представим наш журнал устарел (Retention политика) или мы его удалили (что бывает надо делать) и подключаем третий stream с App- >

Если затем запустим поток app_02, то он догонит первый и они будут равны в значениях. Из примера стало понятно, как Kafka обрабатывает текущий журнал, добавляет к ранее сохраненному состоянию и так далее.

Тема Kafka очень обширна, я для себя сделал первое общее представление 🙂

Kafka Producer and Consumer Examples Using Java

In this article, a software engineer will show us how to produce and consume records/messages with Kafka brokers. Let’s get to it!

Join the DZone community and get the full member experience.

In my last article, we discussed how to setup Kafka using Zookeeper. In this article, we will see how to produce and consume records/messages with Kafka brokers.

Читать еще:  Java lang indexoutofboundsexception

Before starting with an example, let’s get familiar first with the common terms and some commands used in Kafka.

Record: Producer sends messages to Kafka in the form of records. A record is a key-value pair. It contains the topic name and partition number to be sent. Kafka broker keeps records inside topic partitions. Records sequence is maintained at the partition level. You can define the logic on which basis partition will be determined.

Topic: Producer writes a record on a topic and the consumer listens to it. A topic can have many partitions but must have at least one.

Partition: A topic partition is a unit of parallelism in Kafka, i.e. two consumers cannot consume messages from the same partition at the same time. A consumer can consume from multiple partitions at the same time.

Offset: A record in a partition has an offset associated with it. Think of it like this: partition is like an array; offsets are like indexs.

Producer: Creates a record and publishes it to the broker.

Consumer: Consumes records from the broker.

Commands: In Kafka, a setup directory inside the bin folder is a script (kafka-topics.sh), using which, we can create and delete topics and check the list of topics. Go to the Kafka home directory.

Execute this command to see the list of all topics.

./bin/kafka-topics.sh —list —zookeeper localhost:2181 .

localhost:2181 is the Zookeeper address that we defined in the server.properties file in the previous article.

Execute this command to create a topic.

./bin/kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 1 —partitions 100 —topic demo .

replication-factor : if Kafka is running in a cluster, this determines on how many brokers a partition will be replicated. The partitions argument defines how many partitions are in a topic.

After a topic is created you can increase the partition count but it cannot be decreased. demo , here, is the topic name.

Execute this command to delete a topic.

./bin/kafka-topics.sh —zookeeper localhost:2181 —delete —topic demo .

This command will have no effect if in the Kafka server.properties file, if delete.topic.enable is not set to be true.

Execute this command to see the information about a topic.

./bin/kafka-topics.sh —describe —topic demo —zookeeper localhost:2181 .

Now that we know the common terms used in Kafka and the basic commands to see information about a topic ,let’s start with a working example.

The above snippet contains some constants that we will be using further.

The above snippet creates a Kafka producer with some properties.

BOOTSTRAP_SERVERS_CONFIG: The Kafka broker’s address. If Kafka is running in a cluster then you can provide comma (,) seperated addresses. For example: localhost:9091,localhost:9092

CLIENT_ID_CONFIG: Id of the producer so that the broker can determine the source of the request.

KEY_SERIALIZER_CLASS_CONFIG: The class that will be used to serialize the key object. In our example, our key is Long , so we can use the LongSerializer class to serialize the key. If in your use case you are using some other object as the key then you can create your custom serializer class by implementing the Serializer interface of Kafka and overriding the serialize method.

VALUE_SERIALIZER_CLASS_CONFIG: The class that will be used to serialize the value object. In our example, our value is String , so we can use the StringSerializer class to serialize the key. If your value is some other object then you create your custom serializer class. For example:

PARTITIONER_CLASS_CONFIG: The class that will be used to determine the partition in which the record will go. In the demo topic, there is only one partition, so I have commented this property. You can create your custom partitioner by implementing the CustomPartitioner interface. For example:

In above the CustomPartitioner class, I have overridden the method partition which returns the partition number in which the record will go.

The above snippet creates a Kafka consumer with some properties.

BOOTSTRAP_SERVERS_CONFIG: The Kafka broker’s address. If Kafka is running in a cluster then you can provide comma (,) seperated addresses. For example: localhost:9091,localhost:9092 .

GROUP_ID_CONFIG: The consumer group id used to identify to which group this consumer belongs.

KEY_DESERIALIZER_CLASS_CONFIG: The class name to deserialize the key object. We have used Long as the key so we will be using LongDeserializer as the deserializer class. You can create your custom deserializer by implementing the Deserializer interface provided by Kafka.

VALUE_DESERIALIZER_CLASS_CONFIG: The class name to deserialize the value object. We have used String as the value so we will be using StringDeserializer as the deserializer class. You can create your custom deserializer. For example:

MAX_POLL_RECORDS_CONFIG: The max count of records that the consumer will fetch in one iteration.

ENABLE_AUTO_COMMIT_CONFIG: When the consumer from a group receives a message it must commit the offset of that record. If this configuration is set to be true then, periodically, offsets will be committed, but, for the production level, this should be false and an offset should be committed manually.

AUTO_OFFSET_RESET_CONFIG: For each consumer group, the last committed offset value is stored. This configuration comes handy if no offset is committed for that group, i.e. it is the new group created.

Setting this value to earliest will cause the consumer to fetch records from the beginning of offset i.e from zero.

Setting this value to latest will cause the consumer to fetch records from the new records. By new records mean those created after the consumer group became active.

The above snippet explains how to produce and consume messages from a Kafka broker. If you want to run a producer then call the runProducer function from the main function. If you want to run a consumeer, then call the runConsumer function from the main function.

The offset of records can be committed to the broker in both asynchronous and synchronous ways. Using the synchronous way, the thread will be blocked until an offset has not been written to the broker.

Conclusion

We have seen how Kafka producers and consumers work. You can check out the whole project on my GitHub page. If you are facing any issues with Kafka, please ask in the comments. In next article, I will be discussing how to set up monitoring tools for Kafka using Burrow.

Kafka Tutorial: Creating a Kafka Consumer in Java

Kafka Tutorial: Writing a Kafka Consumer in Java

In this tutorial, you are going to create simple Kafka Consumer. This consumer consumes messages from the Kafka Producer you wrote in the last tutorial. This tutorial demonstrates how to process records from a Kafka topic with a Kafka Consumer.

This tutorial describes how Kafka Consumers in the same group divide up and share partitions while each consumer group appears to get its own copy of the same data.

Before you start

The prerequisites to this tutorial are

This tutorial picks up right where Kafka Tutorial: Creating a Kafka Producer in Java left off. In the last tutorial, we created simple Java example that creates a Kafka producer. We also created replicated Kafka topic called my-example-topic , then you used the Kafka producer to send records (synchronously and asynchronously). Now, the consumer you create will consume those messages.

Construct a Kafka Consumer

Just like we did with the producer, you need to specify bootstrap servers. You also need to define a group.id that identifies which consumer group this consumer belongs. Then you need to designate a Kafka record key deserializer and a record value deserializer. Then you need to subscribe the consumer to the topic you created in the producer tutorial.

Читать еще:  Java lang securityexception

Kafka Consumer imports and constants

Next, you import the Kafka packages and define a constant for the topic and a constant to set the list of bootstrap servers that the consumer will connect.

KafkaConsumerExample.java — imports and constants

Notice that KafkaConsumerExample imports LongDeserializer which gets configured as the Kafka record key deserializer, and imports StringDeserializer which gets set up as the record value deserializer. The constant BOOTSTRAP_SERVERS gets set to localhost:9092,localhost:9093,localhost:9094 which is the three Kafka servers that we started up in the last lesson. Go ahead and make sure all three Kafka servers are running. The constant TOPIC gets set to the replicated Kafka topic that you created in the last tutorial.

Create Kafka Consumer using Topic to Receive Records

Now, that you imported the Kafka classes and defined some constants, let’s create the Kafka consumer.

KafkaConsumerExample.java — Create Consumer to process Records

To create a Kafka consumer, you use java.util.Properties and define certain properties that we pass to the constructor of a KafkaConsumer .

Above KafkaConsumerExample.createConsumer sets the BOOTSTRAP_SERVERS_CONFIG (“bootstrap.servers”) property to the list of broker addresses we defined earlier. BOOTSTRAP_SERVERS_CONFIG value is a comma separated list of host/port pairs that the Consumer uses to establish an initial connection to the Kafka cluster. Just like the producer, the consumer uses of all servers in the cluster no matter which ones we list here.

The GROUP_ID_CONFIG identifies the consumer group of this consumer.

The KEY_DESERIALIZER_CLASS_CONFIG (“key.deserializer”) is a Kafka Deserializer class for Kafka record keys that implements the Kafka Deserializer interface. Notice that we set this to LongDeserializer as the message ids in our example are longs.

The VALUE_DESERIALIZER_CLASS_CONFIG (“value.deserializer”) is a Kafka Serializer class for Kafka record values that implements the Kafka Deserializer interface. Notice that we set this to StringDeserializer as the message body in our example are strings.

Important notice that you need to subscribe the consumer to the topic consumer.subscribe(Collections.singletonList(TOPIC)); . The subscribe method takes a list of topics to subscribe to, and this list will replace the current subscriptions if any.

Process messages from Kafka with Consumer

Now, let’s process some records with our Kafka Producer.

KafkaConsumerExample.java — Process records from Consumer

Notice you use ConsumerRecords which is a group of records from a Kafka topic partition. The ConsumerRecords class is a container that holds a list of ConsumerRecord(s) per partition for a particular topic. There is one ConsumerRecord list for every topic partition returned by a the consumer.poll() .

Notice if you receive records ( consumerRecords.count()!=0 ), then runConsumer method calls consumer.commitAsync() which commit offsets returned on the last call to consumer.poll(…) for all the subscribed list of topic partitions.

Kafka Consumer Poll method

The poll method returns fetched records based on current partition offset. The poll method is a blocking method waiting for specified time in seconds. If no records are available after the time period specified, the poll method returns an empty ConsumerRecords.

When new records become available, the poll method returns straight away.

You can can control the maximum records returned by the poll() with props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); . The poll method is not thread safe and is not meant to get called from multiple threads.

Running the Kafka Consumer

Next you define the main method.

KafkaConsumerExample.java — Running the Consumer

The main method just calls runConsumer .

Try running the consumer and producer

Run the consumer from your IDE. Then run the producer from the last tutorial from your IDE. You should see the consumer get the records that the producer sent.

Logging set up for Kafka

If you don’t set up logging well, it might be hard to see the consumer get the messages.

Kafka like most Java libs these days uses sl4j . You can use Kafka with Log4j, Logback or JDK logging. We used logback in our gradle build ( compile ‘ch.qos.logback:logback-classic:1.2.2’ ).

Notice that we set org.apache.kafka to INFO, otherwise we will get a lot of log messages. You should run it set to debug and read through the log messages. It gives you a flavor of what Kafka is doing under the covers. Leave org.apache.kafka.common.metrics or what Kafka is doing under the covers is drowned by metrics logging.

Try this: Three Consumers in same group and one Producer sending 25 messages

Run the consumer example three times from your IDE. Then change Producer to send 25 records instead of 5. Then run the producer once from your IDE. What happens? The consumers should share the messages.

Producer Output

Notice the producer sends 25 messages.

Consumer 0 in same group

Consumer 1 in same group

Consumer 2 in same group

Can you answer these questions?

Which consumer owns partition 10?

How many ConsumerRecords objects did Consumer 0 get?

What is the next offset from Partition 11 that Consumer 2 should get?

Why does each consumer get unique messages?

Which consumer owns partition 10?

Consumer 2 owns partition 10.

How many ConsumerRecords objects did Consumer 0 get?

What is the next offset from Partition 11 that Consumer 2 should get?

Why does each consumer get unique messages?

Each gets its share of partitions for the topic.

Try this: Three Consumers in different Consumer group and one Producer sending 5 messages

Modify the consumer, so each consumer processes will have a unique group id.

Stop all consumers and producers processes from the last run.

Then execute the consumer example three times from your IDE. Then change producer to send five records instead of 25. Then run the producer once from your IDE. What happens? The consumers should each get a copy of the messages.

First, let’s modify the Consumer to make their group id unique as follows:

KafkaConsumerExample — Make the Consumer group id unique

Notice, to make the group id unique you just add System.currentTimeMillis() to it.

Producer Output

Notice the producer sends 25 messages.

Consumer 0 in own group

Consumer 1 in unique consumer group

Consumer 2 in its own consumer group

Can you answer these questions?

Which consumer owns partition 10?

How many ConsumerRecords objects did Consumer 0 get?

What is the next offset from Partition 11 that Consumer 2 should get?

Why does each consumer get unique messages?

Which consumer owns partition 10?

They all do! Since they are all in a unique consumer group, and there is only one consumer in each group, then each consumer we ran owns all of the partitions.

How many ConsumerRecords objects did Consumer 0 get?

What is the next offset from Partition 11 that Consumer 2 should get?

Why does each consumer get the same messages?

They do because they are each in their own consumer group, and each consumer group is a subscription to the topic.

Conclusion Kafka Consumer example

You created a simple example that creates a Kafka consumer to consume messages from the Kafka Producer you created in the last tutorial. We used the replicated Kafka topic from producer lab. You created a Kafka Consumer that uses the topic to receive messages. The Kafka consumer uses the poll method to get N number of records.

Читать еще:  Java net inetaddress

Consumers in the same group divide up and share partitions as we demonstrated by running three consumers in the same group and one producer. Each consumer groups gets a copy of the same data. More precise, each consumer group really has a unique set of offset/partition pairs per.

Review Kafka Consumer

How did we demonstrate Consumers in a Consumer Group dividing up topic partitions and sharing them?

We ran three consumers in the same consumer group, and then sent 25 messages from the producer. We saw that each consumer owned a set of partitions.

How did we demonstrate Consumers in different Consumer Groups each getting their own offsets?

We ran three consumers each in its own unique consumer group, and then sent 5 messages from the producer. We saw that each consumer owned every partition.

How many records does poll get?

However many you set in with props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); in the properties that you pass to KafkaConsumer .

Does a call to poll ever get records from two different partitions?

Related content

About Cloudurable

We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.

Check out our new GoLang course. We provide onsite Go Lang training which is instructor led.

Java kafka consumer

Kafka Java Consumer

Kafka provides a Java Kafka Client to communicate with it. It’s a greate lib which is very versatile and flexible, but many things may go wrong if you use it without good care or good understanding about Kafka internals. We will talk about some of the common pitfalls on the consumer side which are easily to encounter with and this lib is used to help you to overcome them peacefully.

Usually, after we have subscribed the consumer to some topics, we need a loop to do these things:

  • Fetch records from Kafka broker by using poll method on KafkaConsumer ;
  • Process the fetched records;
  • Commit the offset of these fetched records, so they will not be consumed again;

We need to call poll constantly and ensure that the interval between each call should not too long, otherwise after a session timeout or a poll timeout, the broker may think our consumer is not alive and revoke every partitions assigned to our consumer. If we need to do a lot of things with the records we fetched, we may need to set the Kafka consumer configuration max.poll.interval.ms to a comparatively larger value to give us enough time to process all these records. But it’s not trival to set max.poll.interval.ms to a large value. The larger the max.poll.interval.ms value is, the longer time it’s needed for a broker to realize that a consumer is dead when something wrong with the consuemr. In addition to tune the max.poll.interval.ms configuration, we can spare the polling thread only to poll records from broker and submit all the fetched records to a thread pool which is taking charge of processing these records. But to do it in this way, we need to pause the partitions of all the fetched records before processing them to prevent the polling threads from polling more records while the previous records are still processing. Of course, we should remember to resume a paused parition after we have processed all records from that partition. Futher more, after a partition reassignment, we should remember which partition we paused before the partition reassignemnt, and pause the paused partition again.

Kafka Client provides a synchronous and a asynchronous way to commit offset of records. In addition to them, Kafka Client also provides a way to commit for specific partition and offset, and a way to commit all the records fetched at once. We should remember to commit all the processed records from a partition before this partition is revoked. We should remember to commit all the processed records before the consuemr shutdown. If we commit offset for a specific record, we should remember to plus one to the offset of that record, such as assuming the record to commit have partition 0 and offset 100, we should commit partition 0 to 101 instead of 100, otherwise that processed records will be fetched again. If a consumer were assigned a parition which have no records for a long time, we should still remember to commit the committed offset of that partition periodically, otherwise after the commit log of that partition was removed from broker, because of retention timeout, broker will not remember where the commit offset of that partition for the consumer was. If the consumer set Kafka configuration auto.offset.reset to earliest, after a reboot, the cosumer will poll all the records from the partition for which broker forgot where we committed and process all of them over again.

All in all, Kafka Client is not a tool which can be used directly without good care and doing some research. But with the help of this lib, you can consume records from a subscribed topic and process them with or without a deidicated thread pool more safely and easily. It encapsulates loads of best practices to acheive that goal.

Firstly, we need configurations for Kafka consumer. For example:

Then, define how you need to handle a record consumed from Kafka. Here we just log the consumed record:

Next, we need to choose the type of consumer to use. We have five kinds of consumers and each of them have different committing policy. Here is a simple specification for them:

commit policydescription
automatic commitCommit offsets of records fetched from broker automatically in a fixed interval.
sync commitCommit offsets synchronously only when all the fetched records have been processed.
async commitCommit offsets asynchronously only when all the fetched records have been processed. If there are too many pending async commit requests or the last async commit request was failed, it’ll switch to synchronous mode to commit synchronously and switch back when the next synchoronous commit success.
partial sync commitWhenever there is a processed consumer record, only those records that have already been processed are committed synchronously, leaving the ones that have not been processed yet to be committed.
partial async commitWhenever there is a processed consumer record, only those records that have already been processed are committed asynchronously, leaving the ones that have not been processed yet to be committed. If there are too many pending async commit requests or the last async commit request was failed, it’ll switch to synchronous mode to commit synchronously and switch back when the next synchoronous commit success.

Taking sync-committing consumer as an example, you can create a consumer with a thread pool and subscribe it to a topic like this:

Please note that we passed a ExecutorService to build the LcKafkaConsumer , all the records consumed from the subscribed topic will be handled by this ExecutorService using the input ConsumerRecordHandler .

When we are done with this consumer, we need to close it:

For all the APIs and descriptions of all the kinds of consumers, please refer to the Java Doc.

Copyright 2020 LeanCloud. Released under the MIT License.

Ссылка на основную публикацию
Adblock
detector