Тёмный

Java Concurrency Interview Question: Multi-threaded Message Queue like Kafka, SQS, RabbitMQ 

Udit Agarwal
Подписаться 21 тыс.
Просмотров 35 тыс.
50% 1

In this video, we are going to create multi-threading based message queue. This message queue acts as a broker between publishers and subscribers. Publishers publish message in a topic in the queue and the subscribers who are subscribed to that topic will receive it. We need to take care of multi-threading in this problem since requirement is that multiple subscribers should receive the message in parallel.
This is one of the most popular interview questions which is asked by many companies.
Thanks for watching. Hope you find this video helpful. Let me know your feedback in the comments.
Do subscribe to the channel!!
I have uploaded the full source code of the solution in my GitHub account. It can be found here: github.com/anomaly2104/low-le...
LLD System Design Playlist: • Low Level System Design
You can follow me on:
LinkedIn: / anomaly2104
Instagram: / anomaly2104
Facebook: / anomaly2104
Twitter: / anomaly2104
Blog: blog.uditagarwal.com
#system #design #interviews #interview #coding #programming #faang #tech #technology #developer #coder #code #java #systemdesign #architecture #software #sde #sdi #systemdesigninterview #software #development #program #lowleveldesign #lld #oops #designpatterns #designprinciples

Опубликовано:

 

20 июл 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 93   
@anomaly2104
@anomaly2104 3 месяца назад
Become Zero to Hero in Multi-threading by Registering here: enginebogie.com/u/anomaly2104/offerings/PATH/e9522ac1-4e4c-4217-92ba-f691f34c321b Become Zero to Hero in LLD by Registering here: enginebogie.com/u/anomaly2104/offerings/PATH/e6cce7f1-6a56-4fe3-bb82-48e1876e4596 Connect with me at: enginebogie.com/u/anomaly2104
@sagartyagi2450
@sagartyagi2450 3 года назад
Have an uber sde2 interview in few days, can't even imagine of solving this in an interview.
@vyshnavramesh9305
@vyshnavramesh9305 2 года назад
Same here. Another popular Uber Machine Coding Round problem is In memory Job Scheduler'. No online resource exists for this. But I guess it should be the same as internal implementation of SheduledThreadPool in Java.
@shubhamrane2918
@shubhamrane2918 2 года назад
@@vyshnavramesh9305 What questions you were asked?
@vyshnavramesh9305
@vyshnavramesh9305 2 года назад
@@shubhamrane2918 a) Word ladder 1 or 2. b) Given a string, find max length of duplicate substring. (Both are LC)
@bruvhellnah
@bruvhellnah 2 года назад
@@vyshnavramesh9305 Both these questions were for SDE-2? Also, did you have a sys design round? If yes, what all were you asked?
@vyshnavramesh9305
@vyshnavramesh9305 2 года назад
@@bruvhellnah SDE2. I didnt have sys design. Didn't get through after 2 algos
@vishnusingh4118
@vishnusingh4118 3 года назад
Amazing content! This is what differentiates you man. Pressed like before even watching it, and not disappointed at all. Keep up the great work. More hands-on, work related industry best practices videos please. More power to you!
@anomaly2104
@anomaly2104 3 года назад
Thanks a lot :)
@vaibhav4196
@vaibhav4196 7 месяцев назад
Fantastic breakdown of LLD! Just came across your LLD videos, and I must say, your explanations and implementations are top-notch-haven't encountered such clarity and detail before. Looking forward to more LLD content from you! Cheers! :)
@anujpratapsinghyadav1914
@anujpratapsinghyadav1914 3 года назад
Really Great Content for LLD! Subscribed before watching :)
@ayushupadhyaya7656
@ayushupadhyaya7656 Месяц назад
Overall the code and explaination I found is good, but what I found missing was the overal explanation on the UML relationship between different classes. Why a particular class is having a "is-a" and "has-a" relationship. What is the underlying low level design decison that was made due to which we arrived a certain class structure. If explanation regarding this is added, then I beleive this would be a great video for multi threaded message queue.
@priyankamishra5704
@priyankamishra5704 2 года назад
once again great quality content!! Please continue this series on multithreading scenarios.
@yuganderkrishansingh3733
@yuganderkrishansingh3733 2 года назад
Thanks for the video. Learned a hell lot of things from your videos man.. Keep up with these..A million Thanks
@kanishkverma9776
@kanishkverma9776 2 месяца назад
Thanks for making such awesome and easy to follow videos . One suggestion is if you could add a class diagram also then it would help to understand flow in between but otherwise thanks for the efforts :)
@codetech802
@codetech802 2 года назад
Great great content. Please keep posting machine coding questions.
@greenmug1058
@greenmug1058 2 года назад
God bless you, for creating this channel
@developerabhishek7099
@developerabhishek7099 3 года назад
Your content is very unique. Thanks for sharing the knowledge
@alphaninja5106
@alphaninja5106 2 года назад
Please add a Class Diagram also here. It will be easier to follow with different mappings also.
@dashrathyadav1476
@dashrathyadav1476 Год назад
Just Amazing. Thanks a lot Udit.
@amanpandey9181
@amanpandey9181 3 года назад
Hi Udit! Awesome video man, this was long awaited. One question though: 1. at 18:27, can you explain the reason why in SubscriberWorker wakeUpIfNeeded() method is made synchronized? In what case will this method be accessed by multiple threads? Follow up question: Why does it have synchronized block then? I mean, why both synchronized block and method level synchronization is needed here?
@anomaly2104
@anomaly2104 3 года назад
Oops yeah. Method one is not needed. I think I forgot to remove it later. Thanks for pointing it.
@radhikachandak5246
@radhikachandak5246 2 года назад
Hi Udit, Thanks for sharing the detailed design. I had a doubt - In the run method for the SubscriberWorker, why did you call the wait method on the topicSubscriber? Can you explain how that would work since topicSubscriber is in no way linked to our thread (Same doubt for the wakeUpIfNeeded method as well).
@mujahidpasha4440
@mujahidpasha4440 3 года назад
Amazing content just what I wanted, one request it would be great if you code along while explaining even if video goes long that would really help us understand the thought process as well. Thank you very much for the efforts. Keep up🙏
@pkksingh5
@pkksingh5 Год назад
Amazing video. Awesome content!
@snehasishroy39
@snehasishroy39 3 года назад
Hey Udit, Great video. Couple of questions. 1. Ideally shouldn't you return a Future/CompletableFuture from the publish() method so that the client can choose to wait on the future until all the messages have been published? 2. In subscribeWorker, why does the while loop have >= condition? Shouldn't it be just curOffset == topic.getMessages().size() Because curOffset can never exceed the no of messages in a topic as currently you don't support deletion of messages in the topic. Also I think you should have highlighted the importance of using while loop for wait() check, most people tend to forget putting wait() check in a while loop. Nice explanation of compareAndSet though :)
@anomaly2104
@anomaly2104 3 года назад
Hey Snehasish, 1. Yeah we can do that. 2. Yeah ideally the condition should just be curOffset == topic.getMessages().size() but I just did >= for the safer side. Actually, this I coded during an actual interview and at that time I thought this condition would be more safer. Wait in while loop is really required to save cpu cycles. I should cover this in future videos.
@yashgarg9580
@yashgarg9580 2 года назад
Hello sir, thank you for such a great video. I just had one small doubt - the startSubscriberWorker method, if concurrently accesses by two threads can create more than one instance of SubscriberWorker for a TopicSubscriber during lazy creation. Can that be an issue. What should we do to handle this, should we make this method synchronized, or can we use concurrent hashmap here ?
@renon3359
@renon3359 3 года назад
Hey Udit great video again, wanted to ask why did you go for barebone threads and not something like executor service? Was there any particular reason or it's just a personal choice?
@anomaly2104
@anomaly2104 3 года назад
So, its just I used it for the demo in this example. Since I knew I will not be creating too many threads so it was fine. But in production kind of systems, you should actually use a thread pool using an ExecutorService kind of thing so that you dont overload your service by creating lots of threads.
@renon3359
@renon3359 3 года назад
@@anomaly2104 All right!
@krishnadwypayan601
@krishnadwypayan601 3 года назад
Hi Udit, Thanks for the great video! :) Couple of questions: 1. The consumers are pushed the message whenever a message is published. How can we incorporate a pull-based consumer mechanism? I understand that the requirements state messages to be pushed, but speaking in terms of scalability for a topic having a huge number of consumers, can we implement a pull-based (or a hybrid of both) mechanism? If that's the case, how would this design change? 2. While publishing a message, spawning new threads might be costly, if we are looking at messages being published at higher rates (~1M per min). Probably making use of submitting tasks to ExecutorService might help?
@anomaly2104
@anomaly2104 3 года назад
1. For pull based also, you would keep an offset for each subscriber and you will update the offset after getting some acknowledgement from the subscriber that they have consumed the message. In case you want to make it work like kafka where only one subscriber of a topic should get the message, then in that case, to prevent sending the same message to multiple consumers, you will use something like visibility timeout. 2. Yes, in production scenario, you will use something like ExecutorService or some other thread pooling mechanism.
@garimadhanania1853
@garimadhanania1853 2 месяца назад
great content as always!
@jineshdhruv6151
@jineshdhruv6151 2 года назад
Thanks for the awesome video 🙌🏻
@PrateekMehtaABDFAN
@PrateekMehtaABDFAN 2 года назад
Great content sir , thanks for the videos. Quick questions : How is the implementation of Producer consumer pattern different than Apache Kafka pub sub model H How does messaging queues make system more scalable ?
@akkhilgupta7640
@akkhilgupta7640 3 года назад
You are GEM man
@lokeshratusaria6656
@lokeshratusaria6656 2 года назад
Hey udit ! Great video man One Doubt: If i am not wrong you are creating a new thread every time you are publishing a message in the topic new Thread(() -> topicProcessors.get(topic.getTopicId()).publish()).start(); so multiple threads are publishing a message in the topic at the same time but your subscriberWorkers Map is not synchronized that means multiple threads can see that a worker is not created for a subscriber and end up corrupting subscriberWorkers Map
@avinash2899
@avinash2899 Год назад
Thanks for this video!! Can you please explain why you chose to have a dedicated thread per subscriber? How will this scale? Isn't thread pool a better option?
@vulturebeast
@vulturebeast 3 года назад
Can you put more video as well thanks for such lld content
@vyshnavramesh9305
@vyshnavramesh9305 2 года назад
How is the sequence between messages ( read FIFO) maintained?
@manishsakariya4595
@manishsakariya4595 2 года назад
should we use ArrayBlockingQueue as a queue for a topic which gives random access as well as concurrency?
@kritikasomani8670
@kritikasomani8670 3 года назад
Hi, thanks for the video. However, I have a doubt. The run() method in SubscriberWorker has a synchronized block and all the functioning happens in that. Now synchronized means that only one thread can have access to that block at any point. Then why do we need AtomicInteger to deal with offsets? Won't a normal Integer value be sufficient since we've wrapped this code inside synchronized block?
@timmy5362
@timmy5362 3 года назад
Had the same question! I guess you're right!
@rujhanarora7892
@rujhanarora7892 2 года назад
The offset can be reset in another thread
@vyshnavramesh9305
@vyshnavramesh9305 2 года назад
True, seems redundant. But hey is it just safe guarding from TopicSubscriber class level like an extra protection (scenario: TopicSubscriber is written by developer 1 and SubscriberWorker is written by developer 2)?
@goku6272
@goku6272 2 года назад
This is Because If reset happens current value changes . thats why we use CompareAndSet first it compare with previous value
@abhishekshahu1187
@abhishekshahu1187 2 года назад
Hi..I think the reset is happening from the main thread in main class that too without acquiring the any lock so it’s not safe to blindly update the offset to next value
@murali1790able
@murali1790able Год назад
Great job, thank you. can you also explain what happens when same consumer register for two separate topic. I guess two separate threads will be created.
@nehasht2
@nehasht2 2 года назад
can we use DelayQueue , in order to set retention of messages also ?
@ShyamKumar-ub5wv
@ShyamKumar-ub5wv 3 года назад
Hi Udit, Could you please share the resources as to have better hold on it. Thank you such a good video.
@praveenjain183
@praveenjain183 3 года назад
Quite informative video Udit. can you also enhance to include storage/msg persistence in it with both pull and push mechanism. Thanks
@anomaly2104
@anomaly2104 3 года назад
Sure, will try. Thanks :)
@Subudhdh
@Subudhdh 3 месяца назад
Seems very little synchronization was needed, how are messages synchronized between producer and consumer, can you help me understand how many different threads are getting created here; and how producer and consumer are ensuring the message is used only by one of them at any given time
@jayprakashdatani9405
@jayprakashdatani9405 2 года назад
God bless you ❤️
@abhishekshahu1187
@abhishekshahu1187 2 года назад
I have a doubt please someone help me out : Isn’t it possible that the read is happening at subscriber’s end and it is about to enter the wait state but then at same moment publish has happened for same Topic and then the subscriber moves to wait state .So won’t be the latest publish is missed for consumption until the next publish
@SuperSam4y0u
@SuperSam4y0u 3 года назад
Hey Udit, thanks for the video! Just one thing, isnt returning the topic to the client a overhead for the client as the client practically never does anything with it directly and just passes it to the queue
@anomaly2104
@anomaly2104 3 года назад
Hey, Yeah while designing APIs, it might look like the client might not need the return value. But we should not rely on a specific client. Like in this case, creating a topic is a CRUD operation and after creating resource, its always best to return the resource back in the body so that client can use id of the created resource if it needs it. Another way I have seem people doing this is, instead of sending the resource data in the body, you send a header with a link from where the created resource can be fetched. In both the approaches, the idea is we need to give client some mechanism to fetch the details of the created resource if it wants to.
@jayprakashdatani9405
@jayprakashdatani9405 2 года назад
Hi Udit, how can we support retry messages in this?
@monishchhadwa777
@monishchhadwa777 5 месяцев назад
How do you extend it for a consumer group?
@shubhamkalla6489
@shubhamkalla6489 7 дней назад
in subscriber worker why have we used synchronised on topicSubscriber?
@himanshugoyal9846
@himanshugoyal9846 Год назад
"synchronised" inside run() isn't redundant?
@indiegypsy
@indiegypsy Год назад
@anomaly2104, thank you so much for the detailed video. I have a question: when you write synchronized on topicSubscriber, is the offset required to be AtomicInteger? Can't we keep it as simple int as topicSubscriber is already locked by the current thread which is in the synchronized block?
@kanishkverma9776
@kanishkverma9776 2 месяца назад
i have the same doubt
@indiegypsy
@indiegypsy 2 месяца назад
@@kanishkverma9776 I think it must be Atomic because atomic updates the value of the variable from any thread and also provides the latest updated value to all threads.
@sudhiryadav3036
@sudhiryadav3036 3 месяца назад
Hi Udit, first of all thank you so much for creating such a meaningful content. But I have doubt when you are using compareandset method in order to increase the offset value but if you are using the synchronized block then there would be only one thread at a time to change the offset right? So do we stillneed to use the compareandset mehtod or can we simply increase the value of offset
@anomaly2104
@anomaly2104 3 месяца назад
Yes, you are right
@rydmerlin
@rydmerlin 17 дней назад
I would have started with the interface definitions to show the api. putIfAbsent
@akashgoyal2567
@akashgoyal2567 2 месяца назад
Instead of taking lock on the topic during write can we use mutex so that our reads don’t need to be synchronised since we are supporting only adding messages in the topic not updating it
@umangmalhotra1222
@umangmalhotra1222 3 года назад
Why did you name it as ISubscriber in the public_interfaces? Is there any behind the scene/kind of software practice?
@anomaly2104
@anomaly2104 3 года назад
Hey Umang, Not any specific reason or standard practice. Its just that it makes thing easy to read. Like now we know client facing APIs are there in this package. It helps at times.
@Gaurav569SinghOfficial
@Gaurav569SinghOfficial Год назад
What's the reasonable time for completion with discussion at let's say Uber
@kaal_bhairav_23
@kaal_bhairav_23 10 месяцев назад
you are great
@jmitesh01
@jmitesh01 3 года назад
Hi Udit, thanks for putting informative and to the point LLD discussion. I’ve one query on usage of topics and queue keywords since semantically (or as per general accepted notion) topics on stream tends makes more sense or topic with subscriptions. But topic on a queue and then subscriber of topics feels a bit unnatural. Please correct me if this is not the case.
@anomaly2104
@anomaly2104 3 года назад
Yeah! it might be better to call it message broker instead of a queue.
@PushkarGupta97
@PushkarGupta97 3 месяца назад
Can someone explain the lambda expression here ?
@HimankAgrawal
@HimankAgrawal Год назад
I think pull mechanism will be better for subscriber. If the subscriber id down and comes up after few hrs, pull mechanism serves well. if we do the reset of offset when all the messages are done consuming.. then the subs1 will not start because you call consume on subscriber only when message is published. this is not correct
@gunishjha4030
@gunishjha4030 3 года назад
can we call this a clone of Kafka/kinesis implementation?
@anomaly2104
@anomaly2104 3 года назад
Yeah you can call it a mini kafka kind of system. Just Kafka, etc. do lot of other things too.
@gunishjha4030
@gunishjha4030 3 года назад
@@anomaly2104 Hmm okay , Got it , Thanks & great content by the way .
@vyshnavramesh9305
@vyshnavramesh9305 2 года назад
For AWS users: Pub Sub Messaging = AWS SNS Message Queue = AWS SQS
@jainso
@jainso 3 года назад
Amazing Content. Just one Question. How the case for a consumer subscribing multiple topics will be handled.
@vyshnavramesh9305
@vyshnavramesh9305 2 года назад
Is that a valid scenaio?
@nehasht2
@nehasht2 2 года назад
@@vyshnavramesh9305 yes it is
@koustav6884
@koustav6884 Год назад
Good content but bottom up approach is easy to understand than this.
@AdityaGupta-fz8mr
@AdityaGupta-fz8mr 4 месяца назад
good question, but so many bugs in implementation.
@anomaly2104
@anomaly2104 4 месяца назад
Hi Aditya Thanks for your feedback. Can you explain more in detail? Like what bugs do you see exactly?
@theawarescorpio
@theawarescorpio 2 года назад
Hey Udit, Thanks for the efforts in making this good content. However, I could see some potential to refactor the code to inculcate SOLID principles, I have raised a PR to your branch. Please check once you get the time. Overall, I made the code more extensible and modular without affecting the correctness.
@AkshayKumar-xh2ob
@AkshayKumar-xh2ob Год назад
Ye to lag raha bina khud se code kiye samajh nahi aane wala...
@karun4663
@karun4663 2 года назад
There is a bug in the code topic subscriber.wait(); is a blocking call, when there is notify on a topicSubscriber it will resume the execution while (curOffset >= topic.getMessages().size()) { topicSubscriber.wait(); } the above code does not check for the updated curOffset then it would again stay in the while loop in the blocked state
@yuganderkrishansingh3733
@yuganderkrishansingh3733 2 года назад
+1 on the same. Add a `currOffset = subscriber.getOffset().get();` afetr the wait so that when thread resumes it sees the updated offset if there is any resetOffset call.
@itssud0464
@itssud0464 2 года назад
There is a bug in your code - When a new subscriber is added to a topic, it doesn't start consuming the messages immediately and only when new message is published to that topic. This can be solved by adding this "new Thread(() -> topicHandlerMap.get(topic.getTopicId()).startSubscriberHandler(topicSubscriber)).start();" at the end of your public void subscribe(final ISubscriber subscriber, final Topic topic) method
Далее
вот тебе и бэйбик👴🏻
01:00
Просмотров 323 тыс.
5 Design Patterns That Are ACTUALLY Used By Developers
9:27
System Design Interview - Distributed Message Queue
26:28