For my next trick I'll show you all how to make a blow job scheduler 00:18 Introduction 01:22 Functional Requirements 02:33 Capacity Estimates 03:21 API Design 04:09 Database Schema 05:18 Architectural Overview
4:43 DB Schema: [jobID, s3URL, status, retryTimestamp ] status = ENUM , (enum - STARTED / NOT_STARTED / DONE / CLAIMED) . 7:00 Querying the DB. ACID compliance. Indexing should be done on timestamp - Query: select the tasks that are NOT_STARTED where timestamp< current_time 8:50 failure during job run. - MQ failure - Node failure - (9:44) :New Query: select the tasks that are NOT_STARTED where timestamp< current_time - AND - tasks that are STARTED where timestamp + enqueing time + heartbeat < current time. 10:46 : Messaging Queue choice 12:14 : Claim service /DB + zookeeper Zookeeper is to check if the node is down or not. Then we can write in the Metadata DB that it's a retryable error 14:54 : Node dies and comes back up and tries the job again = 2 nodes trying the job Distributed lock. Ending note: how toschedule jobs at a fixed rate (WEEKLY / MONTHLY ) The task-runner service itself will write to the DB, the next time the task should be run again Ex: for BI-WEEKLY schedule, it will add the next time it has to be run.
18:25 The whole flow: 1. Client uploads job -> goes to S3 and gets stored in DB with its schedule. 2. The enqueue service (1 machine) polls DB every minute for all jobs with the query mentioned at 9:44 3. Batches and sends jobs to MQ. 4. MQ sends it to multiple workers. and sends heartbeats to zookeepers. (Zookeeper was used for distributed locking of jobs being run) 5. Worker updates the STATUS if the job was completed or not. I have 1 question that's not addressed though @Jordan has no life. What if the worker completes the job but fails at the point before updating the STATUS of the job as COMPLETED in the DB?
why does the zookeeper has no arrow outwards ? should it be notifying the db the status change of the tasks? like updating the task status not complete/not complete/ etc.
This video on Job Scheduler is by far the best I've come across on RU-vid. Thank you for creating it! I have a question though: it seems here a lot of effort is made to make sure the "exactly-once" semantics, by doing retry, and having Zookeeper as well as the claim service. Would that work be eased a bit if we use Kafka? My understanding is that Kafka has better support for "exactly-once" and also uses Zookeeper internally.
How does the Job Claim Service communicate with the ZK? Does it poll ZK once in a while, get the all the running jobs' statuses, and then update our JobStatusTable?
Hi Jordan, how often job schedules will be polled from the Db, is it every second, every min? do we also need to define an SLA for picking the job from the table.
Hey Jordan! I did not get why we need a lock here? If we enqueue a task into SQS, only one consumer will pick it up anyway (I think SQS takes care of concurrency here) and for the duration of the execution we can hide the task in the queue. Also, what happens to a task in the queue? Does worker removes it from the queue or makes it invisible for the duration of execution?
Locks are important because tasks may be put in the queue again if the system thinks that it failed to execute (e.g. there is a timeout that is exceeded) - yes once a task is removed from a queue it won't be removed again, however like I mentioned it could be re-enqueued if we accidentally think that it has failed
@@jordanhasnolife5163 oh, that makes sense! And what about a task in the queue? A task can take very long to execute, so I assume make it invisible in the queue is not really an option? Does executor remove it from the queue? In which case, what if it dies, who re-queues the task?
Hey @jordan thank for the great video on scheduler design. I have a small query what will happend if we run multiple consumers for the service that will be polling data from DB and pushing it to queue? For scalability we may need to run multiple consumers and there is probability that jobs will get duplicated in queue.
If our database uses transactions we wouldn't have to worry about this, each consumer could just mark a row as "being uploaded to queue" before they attempt to upload it and other consumers won't touch it if that happens
I think data schema can be better. We can have job table which contains jobid,name,cron expression etc. There will be another table also which is job_execution table which will maintaine every execution of job.
Great video ! just a small question . When a consumer node dies (stops sending heartbeats , how do we mark job status as failed , is zookeeper holding info that which consumer node is running which job id ? )
How would we differentiate if the job timed out or is just taking long to execute? How can we prevent it from running twice or even indefinitely? Would it make more sense to use a log based queue and let it take care of retries?
To be honest, the challenging part of distributed computing is that you can never truly know. Networks aren't perfect and so nothing is certain, jobs can complete years after in theory. But, as long as you set a reasonable timeout, and make your jobs idempotent, it's ok! Using a log based queue is totally fine too, but it would still have to use timeouts somewhere
If we maintain some state in database like submitted, queued, running, success, failed, we don't need to have any distributed lock on a job, your enqueing service will only poll for states which are submitted & running for so long let's say & failed ones, and all of it can be done in a serializable isolation level in MySQL as we have opted for it the first place.
While I agree that the majority of the time, this ought to work, ACID properties aren't enough because our SQL database could go down, and unless it has strong consistency (not recommended for performance reasons/network partition tolerance), it may be possible that a claimed job may not seem claimed in the database replicas. Ultimately, we will need some sort of consensus here.
@@jordanhasnolife5163 Agreed on the database could go down part, but this is where many master slave systems(hbase for example) use consensus to elect the right master and hence we will get strong consistency. Theoretically both of our solutions has to use consensus in anyway just that you are having a distributed lock service separately. Got it. By the way your videos are great, Way to go!
What if these jobs had different priorities and we had to change the priority of a job at any point ? (Mainly concerned about when priority changed while its in the queue) For longer running jobs staying in the old priority queue might not be an option
A bit confused here when you say the queue. We could index our SQL table by priority, or we could shard multiple tables with priority. Once it's in the queue it's going to be run more or less - perhaps you could do some weird type of in memory heap but that seems a bit extra
Thanks for the excellent video. I have couple of questions. How are job ids created? Are they globally unique? When a recurring job gets another entry in the metadata db, do they get different id? How do client gets status of recurring jobs? Should there be a different db to store statuses of previous runs?
Yeah I think just creating a particular job run with a UUID is fine. Somebody else in the comments here suggested using a "JobExecutions" table which tracks the status of completed jobs as opposed to scheduled ones, I think that would work nicely here.
6:10 what is it about the message queue that doesn't allow us to get any information about the job other than 'run' or 'not run'? admittedly my knowledge of message queues is kind of shaky but couldn't we configure a log-based message broker to give us info other than 'run' or 'not run'? also if you want another video idea, system design of a doordash/grubhub type app would be pretty cool!
I'm a bit confused what you mean here - we're just placing the jobs themselves in the message queues. We keep track of the status of each job in a database so that we can request the status from a variety of other components. Sure, a message broker knows which jobs were sent to consumers, but that doesn't mean they were run successfully, and the message broker has no way of knowing this. As for the doorash point, I'd just check out my design of Uber, they're basically the same :)
Hey Jordan , can uoi please make a video on collaborative editing tools like coderpad, google doc, google sheets. Actually I guess codepad would be a super set of google doc so you can choose coderpad over google doc while designing. Thanks,
Thank you for the content. One question: what if we want to schedule jobs based on job’s resource consumption requirements and availability of resources on workers. How would you change your design?
I think that the message broker could itself maintain some internal state (or have consumers go through a proxy) which keeps track of how many jobs each has run and perhaps their hardware capabilities (maybe stored in zookeeper). Essentially a load balancer lol.
Thanks for the Great video! If zookeper stops receiving heartbeats, "we can go ahead and updated the metadata db" Curious , who would update the metadata db? Is it a) Zookeper that goes ahead and updates the metadata db? If so, is that feasible with Zookeepers capabilities for us to add such a custom logic? or b) Zookeper performs failover where it creates another worker node and has it restart this job ? Also, since zookeper will help claim service acquire distributed locks using fencing token, why do we still need ACID properties if SQL DB - why should we not use no-sql for metadata db?
@@jordanhasnolife5163 Any example design or literature that shows this design (polling zookeper for outages and implements custom logic with failover) ? I believe this is very critical, and if left unaddressed leaves the fault tolerance not solved
Looks like apache curator has some "recipes" that can be used when persistant nodes fails which can be used here. Also, curator can be used as a client with zookeper to acquire distributed locks
@@niranjhankantharaj6161 I'll do a better job addressing this in the remake. You have many options though - for example a cron job on the status table that eventually sets job status back to "not started" after a certain amount of time that the job has yet to be completed. It's certainly not trivial, but it's not overly complex either
hey Jordan, great video as always. Have a couple of questions: - Instead of using Enqueuing service could which polls jobs every minute, could we instead just add an event stream on the DB and just do filtering within the stream where we only take a look at the jobs that need to be run? - Not sure I got the argument about using in-memory queue, could you add more context on why we decided to do that instead of log-based queue?
1) We could but that's effectively just polling and I think defeats the purpose of using the stream 2) We don't care about the order in which jobs are run and want to maximize throughout, so an in memory queue with many consumers is more useful to us than a log based queue with a single consumer per partition
We don’t care about order of the jobs and we want an in-memory broker, so let’s pick Kafka. Wat. Wat a strange statement in otherwise interesting video.
how crass is this man? Such people pass googlyness round and get into google? Do people really like to work with such people with questionable character?
@@utkarshgupta2909 Can't speak to exact TPS, but I think a good rule of thumb for a queue is when something that is being uploaded needs to be sent to multiple places or there is a lot of processing that eventually has to be done on it