Отличная лекция, спасибо. Но мне кажется лучше просто в записи кидать. И было бы не плохо, если б немного больше теории. В частности здесь в целом что такое ThreadLocal, для чего он, а затем уже код.
Одно непонятно что делать если у нас две разные базы данных для финансов и для бд с сохранением состояния Кафки тогда завернуть в транзакцию не получится и нужно будет табличку з записями про обработку данных из портиции тащить в базу з платежами.
А как лучше работать с кешем? Использовать абстракцию в виде аннотаций типа @Cacheble и др? Или работать с помощью методов напрямую, как у вас в примере?
Спасибо за лекцию. А как Вы запускали docker-compose.yml ? Правильно я понимаю, что мне надо установить docker desktop (под windows) и там уже есть docker compose ? P.S. в указанном чате отсутсnвует данная лекция.
Так а в чем прикол? Томкат чтобы обработать 200 реквестов запустит 200 трэдов. А нэтти если прилетят 200 риквестов одновременно - что сделает? Эвент лупом примет эти 200 реквествов, а затем запустит 200 трэдов, которые будут обрабатывать их... я не прав?
Netty все эти 200 запросов будет обрабатывать ограниченным числом потоков, в соответствии с конфигурацией. Скорее всего, их будет столько, сколько доступно ядер. Т.е. для обработки 200 запросов, надо будет не 200 потоков, а порядка 5-10. Но эти запросы должны быть неблокирующие.
@@petrelevich спасибо за быстрый ответ. Но немного не ясно. Представим вэб приложение, которое обращается к базе, выполняет сиквел, отдает ответ клиенту. И вот это приложение получило 200 риквестов, и соответственно должно сделать 200 запросов в базу. Т.е. если моё понимание верно, то вот эти обращения к базе будут выполнены не эвент луп трэдом, а другими трэдами, назовём их воркер трэдами. Если предположить что,запросы к базе долго играющие и их одновременно 200, не значит ли это, что система помимо 5-10 эвент луп трэдов запустит ещё 200 трэдов для коммуникации с базой данных? Заранее спасибо
@@rainqirimli5457 работа с базой - дело особое. 200 сессий может и не выдержать. Да и connection pool обычно настроен на более скромные лимиты. Далее вопрос: база данных на блокирующем драйвере или реактивном. Если на блокирующем, как пример, jdbc, то все 200 запросов уйдут в очередь, у которой кол-во поток будет равно кол-ву connection-ов в пуле. Если база данных реактивная, то все равно, скорее всего образуется очередь, т.к. все равно будет какой-то connection pool и на нем будет ограничение кол-ва подключений.
Вопрос по коду. Создание KafkaReceiver (класс ReactiveReceiver) KafkaReceiver.create(options) .receiveAutoAck() .concatMap(consumerRecordFlux -> { log.info("consumerRecordFlux done, commit"); return consumerRecordFlux; }) .retryWhen(Retry.backoff(3, Duration.of(10L, ChronoUnit.SECONDS))); 1. Почему выбран метод concatMap? Какая его логика? Мы могли бы использовать например doOnNext для логирования очередного элемента потока. 2. Почему используется backoff в retryWhen, а не fixedDelay например?
1. concatMap преобразует Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() в Flux<ConsumerRecord<K, V>>, doOnNext не подойдет 2. backoff реализует exponential backoff strategy, т.е. каждая следующая попытка будет выполняться с большим интервалом, а fixedDelay задает интервал фиксированный.
Сори, не смог до конца досмотреть, такое монотонное объяснение, тяжело очень для восприятия, несколько раз перематывал, так как терял логическую цепочку, что происходит. Ужасное объяснение, хотя структура прекрасная абсолютно
@@konstantinchvilyov9602 Например, exectly-once точно не надо в логгировании, телеметрии, социальных сетях и т.е., когда что-то иногда можно потерять. И exectly-once почти всегда обязателен в платежах.
Классный проект (особенно порадовал production-ready подход к разработке). Очень интересно будет следить за его развитием в полноценную систему. Такие видеосериалы с постепенным построением готовой неучебной платформы, на мой взгляд, как раз лучшие для обучения начинающего: позволяют понять для чего каждый элемент и чем отличаются подходы P.S. кстати, не понял, почему в качестве мапперов для таких объемных дто не использовался mapstruct, он легковесный и одной строкой заменяет все эти полотна кода?
>кстати, не понял, почему в качестве мапперов для таких объемных дто не использовался mapstruct, он легковесный и одной строкой заменяет все эти полотна кода? да, можно его использовать. И это еще один вопрос - а нужно ли тут вообще dto.
@@petrelevich ну сами по себе иммутабельные ДТО часто бывают полезны (помогают при формализации необходимого контракта для клиента, и полезны для его документирования, помогают при валидации данных, если модель анемична, могут часто помочь при дальнейших доработках, уменьшая связанность между моделью и интерфейсом клиента и т.п.), и при этом они "есть не просят" особо, как говорится, поэтому лично я полагаю, что лучше с ними, чем без них
Хорошее видео, спасибо :) Нет ли в планах создания полноценного курса по какой-либо из тем? Или если есть может кто-то сказать название и где можно приобрести?
Отличное видео, пересмотрел второй раз спустя время и всё понял. Замечательный пример про бармена. Пожалуйста, продолжайте использовать такие бытовые примеры для объяснения - очень хорошо запоминается.
Из разряда "как нарисовать сову": 1. рисуем три кружка; 2. дорисовываем до совы. Отус такой ценник за свои курсы лупит, а лайф-кодинг поленился сделать даже на вводном уроке. Смотрите - это парсер, ну парсер и парсер, ничего особенного 😅 Спринг так же объясняете?) Это бин, ну бин и бин, ничего особенного, поехали дальше))
откуда при раскладе 3 сек на источник и 2 сек на обработчик может возникнуть очередь? источник генерит медленнее чем обработчик обрабатывает. 1,2,3 источник 4,5 обработчик 4,5,6 источник 7,8 обработчик 7,8,9 источник
@@Евгений-1 Пересмотрел, начиная с этого момента ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-UDGZV0tzPQ8.htmlsi=WwyOF8Sv5okuUib_&t=4743 Ничего не услышал про "очередь".
@@petrelevich там в комментах задан вопрос и вот текстовая расшифровка: 1:20:57 у вас есть некий некая задержка а на 1:21:15 звучит "и в итоге тут должно больше наплодиться.... чем больше работаем тем больше накапливается разница" это где будет накопление? и откуда возьмется накопление учитывая что генерация медленнее чем обработка?
@@Евгений-1 Теперь понятно, в чем вопрос. В классе DataProducerStringReactor данные генерируются без ограничения скорости, но потом добавляется задержка delayElements(Duration.ofSeconds(3)). Чтобы согласовать быстрый источник и медленный обработчик включается механизм backpressure. Это приводит к тому, что данные создаются в генераторе и буферизируются (накапливаются) пока потребитель не сможет их обработать. После обработки создается новая порция данных и они буферизируются, пока не обработаются. Понятно пояснил? Надо, наверное, видос на эту тему записать?
спасибо очень информативно, только вот вопрос, как скажем из CI задать выбор энва? Определить две configMap и как-то выбирать между ними или же как-то по другому ?
Спасибо огромное) шикарное видео! Честно говоря, не помню чтобы встречал разорванное текстовое сообщение. Насколько знаю, текстовые сообщения передаются по протоколу TCP, а этот протокол должен ГАРАНТИРОВАТЬ целостность отправленных пакетов.
Пакеты придут полностью, но длинные сообщения могут прийти по частям или наоборот вместе. Посмотрите предыдущее видео про нагрузочное тестирование. Там всплывали подобные моменты. И еще момент - в этом видео речь идет уже о "бизнес - логике", тут получение по частям максимально вероятно из-за буферизации на стороне "tcp-сервера".
Спасибо! Довольно познавательный цикл получился. Не так давно, нечто подобное у себя на проекте реализовывали с командой. За идею с применением концепции конечных автоматов в кейсе с парсингом, прям большой респект!
Вопрос к Сергею, не первый раз уже слышу его мысль о важности groupID для консьюмера, т.е. если скопировать код из другого проекта и не изменить имя группы, то он (консьюмер) будет "красть" не свои сообщения. И вот тут у меня непонятки: ведь мы при настройке консюмера указываем еще и имя топика, на который он подписан. Я так понимал, для того чтобы он забирал по ошибке сообщения из другого проекта, нужно чтобы и имя группы, и имя топика совпало, а это еще более малый шанс
Да, имя топика должно быть то же. И конечно, тот же инстанс kafka. Вероятность "кражи" зависит от характера приложения. Представьте, что это очередь заказов в магазине и куча систем работают с этой очередью: кто-то аналитику делает, кто-то отправку организует, кто-то уведомления рассылает. Для kafaka типовая ситуация, когда с одним топиком работает много систем.
Смущает фраза из доки "that max.poll.records does not impact the underlying fetching behavior". Т.е. звучит так, что сетевые запросы - это одно, а poll() - другое, и на сетевые запросы к кафке настройка не влияет. Т.е. запрошенный объем данных где-то еще кэшируется, а that max.poll.records - всего лишь размер массива. max.poll.records The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll. А для fetch другие настройки есть - максимальный размер в байтах и т.д. Но тогда непонятно, как по poll кафка определяет, что консьюмер упал. Правда, есть еще другая настройка heartbeat.interval.ms для пинга от консьюмера.
Параметры запроса к брокеру определяются конфигом: package org.apache.kafka.clients.consumer.internals; public class FetchConfig<K, V> { final int fetchSize; final int maxPollRecords; где this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); this.maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); сам запрос формируется тут: package org.apache.kafka.clients.consumer.internals; public abstract class AbstractFetch<K, V> один из его параметров .setMaxBytes(fetchConfig.maxBytes) а fetchSize используется в вычитывании из внутреннего буфера. Вот с этим буфером еще надо поразбираться.