This channel is a resource for you to learn about modern data technologies and practices from a Data Engineer perspective, from kickstart tutorials to tips to go to the next level.
Dustin Vannoy is a consultant in data analytics and engineering. His specialties are modern data pipelines, data lakes, and data warehouses. He loves to share knowledge with the data engineering and science community.
Great video ! Is there a way to overide variables defined in the databricks.yml in each of the job yml definition so that the variable has a different value for that job only ?
Thanks a lot, @DustinVannoy for this great presentation! I have a question: which is the better approach for project structuration: one bundle yml config file for all my sub-projects or each sub-project have its own Databricks and bundle yml file? Thanks again :)
Once the code is deployed it gets uploaded in the shared folder can't we store that some where else like an artifact or storage account because there are chances that someone may deleted that bundle from shared folder. It is always like with databricks deployment before and after asset bundles.
You can set permissions on the workspace folder and I recommend also having it all checked into version control such as GitHub in case you ever need to recover an older version.
Loving bundles so far. Only issue so far I've had is the databricks vscode extension seems to be modifying my bundles yml file behind the scenes. For example when I attach to a cluster in the extension it will override my job cluster to use that attached cluster when I deploy to the dev target in development mode.
Run driver program using multithreads using this as well. from threading import * # import threading from time import * # for demonstration we have added time module workerCount = 3 # number to control the program using threads def display(tablename): # function to read & load tables from X schema to Y Schema try: #spark.table(f'{tablename}').write.format('delta').mode('overwrite').saveAsTable(f'{tablename}'+'target') print(f'Data Copy from {tablename} -----To----- {tablename}_target is completed.') except : print("Data Copy Failed.") sleep(3) list = ['Table1','Table2','Table3','Table4','Table5', 'Table3', 'Table7', 'Table8'] # list of tables to process tablesPair = zip(list,list) # 1st list used for creating object & 2nd list used as table name & thread name counter = 0 for obj,value in tablesPair: obj = Thread(target=display, args=(value,), name=value) # creating Thread obj.start() # Starting Thread counter += 1 if counter % workerCount == 0: obj.join() # Hold untill 3rd Thread completes counter = 0
Hey Dustin, Thanks for the tutorial! I've successfully integrated the init script and have been receiving logs. However, I'm finding it challenging to identify the most useful logs and create meaningful dashboards. Could you create a video tutorial focusing on identifying the most valuable logs and demonstrating how to build dashboards from them? I think this would be incredibly helpful for myself and others navigating through the data. Looking forward to your insights!
This is what I have plus the related blog posts. ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-92oJ20XeQso.htmlsi=OS-WZ_QrL-_kkwWu We mostly used out custom logs for driving dashboards but also evaluated some of the heap memory metrics regularly as well.
Hello Dustin, Thank you for posting this video. This was very helpful!!! Pardon my ignorance but I have a question about initializing the Databricks bundle. The first step when you initialize the databricks bundle through CLI, does it create the required files in the databricks workspace folder. Additionally do we push the files from the databricks workspace to our git feature branch so that we can clone it to your local so that we can make the change in the configurations and push it back to git for deployment.
Hello, sir, Thank you for this tutorial. I successfully integrated with log analytics. Could you please show me what we can do with these logs and how to create dashboards? I am eagerly awaiting your response. Please guide me.
Hi Dustin, Thank you for sharing this approach I am going to use it for training spark ml models. I had a question on using daemon option. My understanding is that these threads will never terminate until a script ends. When do they in this example? Do they terminate at the end of the cell? Or after .join()? So when all items in the queue have completed. I really appreciate any explanation you provide.
Hi thanks for the informative video! I have a question, instead of sending a list to the notebook, I send a single table to the notebook using a for each activity (synapse can do maximum 50 concurrent iterations). What would the difference be? Which would be more efficient? And what is best practice in this case? Thanks in advance!
is it possible to add approvers in asset bundle based code promotion ? Say one does not want the same dev to promote to prod, as prod could be maintained by other teams; or if the dev has to do cod promotion, it should go through an approval process. Also is it possible to add code scanning using something like sonarcube ?
Like @gardnmi, I also used the map method threadpool has. Didn't need a queue. I created a new cluster (tagged for the appropriate billing category) and set the max workers on both the cluster and threadpool: from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=137) as threadpool: s3_bucket_path = 's3://mybucket/' threadpool.map(lambda table_name: create_bronze_tables(s3_bucket_path, table_name), tables_list)
How long does it take to deploy the python wheel for you? For me it takes about 15 mins which makes me consider making wheel project separate from rest of the solution.
I was having so many issues using the other Threadpool library in a notebook, It cut my notebook runtime down by 70% but I couldn't get it to run in a databricks job. Your solution worked perfectly! Thank you so much!
Yes. If the transformations are different per source table you may want to provide the correct transformation function as an argument also. Or have something like a dictionary that maps source table to transformation logic.
I'm not sure of a way to do this, but I haven't put too much time into it. I do not believe the library used in this video can do that, but if you figure out how to get it to write to log4j also then it will go to Azure Monitor / Log Analytics with the approach shown.
Hey Dustin, Thank you so much for the video, I still have one doubt, I've been running a streaming query in a notebook for over 10 hours. The streaming query statistics only show specific time intervals. How can I view input rate, process rate, and other stats for different timings or for the entire 10 hours to facilitate debugging?
Check out how to use Query Listener from this video and see if that covers what you are after. ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-iqIdmCvSwwU.html
I tried this. However, I noticed a issue when I have single notebook which creates multiple threads, where each thread is calling a function which creates the spark localtempviews, the views get overwritten by the second thread as it essentially is same spark session. How do I get around this?
@@DustinVannoyyea i had that in mind, unfortunately i cannot as the existing jobs are stable in production. However, this is definitely useful for new implementation
Check out this video and the related blog for latest tested versions. It may work with 14 also but only tested with LTS runtimes. ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-CVzGWWSGWGg.html
I am close to finalizing a video on how to do this for newer runtimes and i build it on windows this time. I use WSL to build this on windows. For Databricks Runtimes 11.3 and above there is a branch named l4jv2 that works.
Hi Dustin, i want to send a dataframe with streaming logs that im listening from an eventhub and send them to log analytics, but im no recieving any data on the log analytics workspace or azure monitor? which may be the problem? do i need to create a custom table before hand? DCR or MMA? I dont know why im not getting any data or what im doing wrong...
Is this still an issue? If so, is it related to using spark-monitoring library? I have a quick mention of how to troubleshoot that towards the end of this new video: ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-CVzGWWSGWGg.html
Only 2min in and he already lost me. 1:53 can't see the referenced screen 😆?! For future videos: it would be greatly appreciated if the necessary prerequisites could be at least listed in the description box. this -> ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-M7C-MyVHyrU.html
Hi Dustin, Thank you so much for sharing this demo with us. While trying to adapt it to my environment (I am using Synapse), I am facing an issue that I hope you could help me resolve: when the target delta table does not exist, I noticed that after I create it, CDF shows being enabled only with version 1 and not 0. The initial version 0 is for the initial WRITE only, no CDF enabled. Consequently, I cannot use your trick to load everything from version 0 if the table does not exist. I tried to use the "SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;" but Synapse seems to ignore it completely. I also tried to include the option of enabling CDF while saving the delta table like shown below, but again, CDF gets only enabled with version 1: df_records.write.format('delta').option("delta.enableChangeDataFeed", "true").save(target_path) Any clue? Thanks!
Well, I just discovered that when you create a delta table, adding option("delta.enableChangeDataFeed", "true") is not enough. When creating the temnp view to switch to SQL, then you also need to add the delta.enableChangeDataFeed = true option to the TBLPROPERTIES when issuing the CREATE OR REPLACE TABLE statement, and this works. Still, the question about enabling by default CDF in Synapse remains, if ever you have a clue. Thanks!
how does this work within a team with multiple projects? How do I apply multiple projects in github actions? Am I creating a bundle folder for project? Or do I have a mono folder with everything databricks in it?
You can have different subfolders in your repo each with their own bundle yaml or you could have one at a root level and import different resource yaml files. It should only deploy the assets that have changed so I tend to suggest one bundle if everything can be deployed at the same time.
Great Video, ! What shoud be the best approach to switch between dev and prod inside the codes ? example: df_test.write.format('delta').mode('overwrite').saveAsTable("dev_catalog.schema.table") how can i parametrize this to automatically change to this: df_test.write.format('delta').mode('overwrite').saveAsTable("prod_catalog.schema.table")