Тёмный

Kafka consumer. Метод poll 

Сергей Петрелевич
Подписаться 2,4 тыс.
Просмотров 4,5 тыс.
50% 1

Начинаем изучать детали работы Kafka.
Сегодня поговорим про некоторые особенности метода poll.
Код примеров:
github.com/pet...
Чат в телеге для вопросов и обсуждений:
t.me/jvm_home

Опубликовано:

 

4 окт 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 11   
@user-eg5uc6xu8u
@user-eg5uc6xu8u 2 года назад
Огромное вам спасибо, благодаря вашему уроку мне удалось побить задачу, которую мне поставили на работе☺☺☺☺☺
@ekaterinagalkina7303
@ekaterinagalkina7303 10 месяцев назад
Смущает фраза из доки "that max.poll.records does not impact the underlying fetching behavior". Т.е. звучит так, что сетевые запросы - это одно, а poll() - другое, и на сетевые запросы к кафке настройка не влияет. Т.е. запрошенный объем данных где-то еще кэшируется, а that max.poll.records - всего лишь размер массива. max.poll.records The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll. А для fetch другие настройки есть - максимальный размер в байтах и т.д. Но тогда непонятно, как по poll кафка определяет, что консьюмер упал. Правда, есть еще другая настройка heartbeat.interval.ms для пинга от консьюмера.
@petrelevich
@petrelevich 10 месяцев назад
Параметры запроса к брокеру определяются конфигом: package org.apache.kafka.clients.consumer.internals; public class FetchConfig { final int fetchSize; final int maxPollRecords; где this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); this.maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); сам запрос формируется тут: package org.apache.kafka.clients.consumer.internals; public abstract class AbstractFetch один из его параметров .setMaxBytes(fetchConfig.maxBytes) а fetchSize используется в вычитывании из внутреннего буфера. Вот с этим буфером еще надо поразбираться.
@ВладиславЛебаков
А есть ли способ одной командой вычитать весь топик и таким образом его очистить?
@petrelevich
@petrelevich Год назад
Т.е. нужен метод poll, который не сдвинет offset, а именно удалит? Причем метод consumer-а?
@valera3146
@valera3146 Год назад
Здравствуйте. Если метод poll отдает постоянно 0. Это означает, что топик не содержит сообщения? Или например нет соединения или ошиблись с названием топика?
@petrelevich
@petrelevich Год назад
Это значит, что нечего отдавать, т.е. в топике нет сообщений. А вот сообщений может не быть, например, если топик не верно указан. Если нет соединения, то будет ошибка.
@valera3146
@valera3146 Год назад
@@petrelevich спасибо
@user-eg5uc6xu8u
@user-eg5uc6xu8u 2 года назад
Добрый день, как вытащить определенное количество сообщений с конца или с начала с помощью консьюмера?
@petrelevich
@petrelevich 2 года назад
чтобы вытащить определенное кол-во сообщений с конца, используйте параметр MAX_POLL_RECORDS_CONFIG. Как в примере в видео. А сначала читать нельзя, это очередь, но можно offset сместить на начало и все перечитать.
@user-eg5uc6xu8u
@user-eg5uc6xu8u 2 года назад
@@petrelevich большое спасибо)
Далее
Serializer и Deserializer в Kafka
6:32
Просмотров 1,2 тыс.
БАГ ЕЩЕ РАБОТАЕТ?
00:26
Просмотров 236 тыс.
kafka в приложениях на SpringBoot
1:12:39
Просмотров 3,5 тыс.
Про Kafka (основы)
49:23
Просмотров 398 тыс.