Тёмный
No video :(

Shuffle Partition Spark Optimization: 10x Faster! 

Afaque Ahmad
Подписаться 5 тыс.
Просмотров 7 тыс.
50% 1

Опубликовано:

 

22 авг 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 64   
@narutomaverick
@narutomaverick 7 дней назад
Your channel is so underrated, Please dont stop
@user-pq9tx6ui2t
@user-pq9tx6ui2t 7 дней назад
i like very much of your videos, it's insightful. can you please make series/videos on Spark interview oriented questions. Thanks in advance
@akshayshinde3703
@akshayshinde3703 7 месяцев назад
Really good explanation Afaque. Thank you for making such in depth videos.😊
@dileepkumar-nd1fo
@dileepkumar-nd1fo 4 месяца назад
Have watched all your videos. Seriously Gold content. Requesting not to stop making videos.
@afaqueahmad7117
@afaqueahmad7117 4 месяца назад
Thank you @dileepkumar-nd1fo for the kind words, it means a lot to me :)
@tahiliani22
@tahiliani22 7 месяцев назад
Commenting so that you continue making such informative videos. Great work!
@rgv5966
@rgv5966 Месяц назад
I don't think this kind of videos are available on Spark anywhere else. Great work Afaque!
@afaqueahmad7117
@afaqueahmad7117 Месяц назад
Appreciate it @rgv5966, thank you!
@anandchandrashekhar2933
@anandchandrashekhar2933 2 месяца назад
Wow! Thank you Afaque, this is incredible content and very helpful!
@afaqueahmad7117
@afaqueahmad7117 2 месяца назад
Appreciate it @anandchandrashekhar2933, thank you!
@Momofrayyudoodle
@Momofrayyudoodle 3 месяца назад
Thank you so much. Keep up the good work. Looking forward for more such videos to learn Spark
@afaqueahmad7117
@afaqueahmad7117 3 месяца назад
Thank you @Momofrayyu-pw9ki, really appreciate it :)
@purnimasharma9734
@purnimasharma9734 2 месяца назад
Very nice explanation! Thank you for making this video.
@afaqueahmad7117
@afaqueahmad7117 2 месяца назад
Thanks @purnimasharma9734, appreciate it :)
@sureshpatchigolla3438
@sureshpatchigolla3438 4 месяца назад
Great work brother......... Thank you for explaining concepts in detail ❤❤
@afaqueahmad7117
@afaqueahmad7117 4 месяца назад
Appreciate it, @sureshpatchigolla3438 :)
@2412_Sujoy_Das
@2412_Sujoy_Das 7 месяцев назад
Great share sir, the optimal shuffle size..... Please bring more scenario basef Questions as well as best production based practises!!!!
@mahendranarayana1744
@mahendranarayana1744 14 дней назад
Great explanation, Thank you, But how would we know how to configure exact (at least best) "spark.sql.shuffle.partitions" at run time? Because each run/day the volume of the data is going to be changed. So, how do we determine the data volume at run time to set the shuffle.partitions number?
@puneetgupta003
@puneetgupta003 3 месяца назад
Nice content Afaque !
@afaqueahmad7117
@afaqueahmad7117 3 месяца назад
Appreciate it @puneetgupta003, thank you!
@asokanramasamy2087
@asokanramasamy2087 4 месяца назад
Clearly explained!!
@afaqueahmad7117
@afaqueahmad7117 4 месяца назад
Appreciate it :)
@tanushreenagar3116
@tanushreenagar3116 Месяц назад
perfect video
@fitness_thakur
@fitness_thakur 24 дня назад
could you please make video on stack overflow like what are scenario when it can occur and how to fix it
@afaqueahmad7117
@afaqueahmad7117 9 дней назад
Are you referring to OOM (out of memory errors) - Driver & Executor?
@fitness_thakur
@fitness_thakur 8 дней назад
@@afaqueahmad7117 No, basically when we have multiple layers under single session then at that time stack memory getting full so to break it we have to make sure we are using one session per layer. e.g- suppose we have 3 layers (internal, external, combined) and if you run these in single session then it will throw stackoverflow error at any place whenever its stack get overflow. We tried to increase stack as well but that was not working. Hence at the last we come up with approach like will run one layer and then close session likewise
@Rafian1924
@Rafian1924 5 месяцев назад
Learning from masters❤❤❤
@ashokreddyavutala8684
@ashokreddyavutala8684 7 месяцев назад
thanks a bunch for the great content again....
@vinothvk2711
@vinothvk2711 7 месяцев назад
Great Explanation!
@showbhik9700
@showbhik9700 2 месяца назад
Lovely!
@Wonderscope1
@Wonderscope1 7 месяцев назад
Thanks for video, very informative
@leonardopetraglia6040
@leonardopetraglia6040 17 дней назад
Thanks for the video! I also have a question: when I execute complex query, there will be multiple stage with different shuffle write sizes, which do I have to take in consideration for the computation of the optimal number of shuffle partitions?
@dasaratimadanagopalan-rf9ow
@dasaratimadanagopalan-rf9ow 26 дней назад
Thanks for the content, really appreciate it. My understanding is AQE take care of Shuffle Partition Optimization and we don't need to manually intervene (starting spark 3) to optimize shuffle partitions. Could you clarify this please?
@iamkiri_
@iamkiri_ 6 месяцев назад
Nice explanation -)
@abdulwahiddalvi7119
@abdulwahiddalvi7119 2 месяца назад
@Afaque thank you for making these videos. Very helpful. I have questions how do we estimate the data size? We run our batches/jobs on spark and each batches could be processing varying size of data. Some batches could be dealing with 300Gb and some could be 300Mb. How do we calculate optimal number of shuffle partitions?
@ravikrish006
@ravikrish006 7 месяцев назад
Thank you it is very useful
@tandaibhanukiran4828
@tandaibhanukiran4828 5 месяцев назад
Thank you for explaining in detail.You are the best guy around. Can you also please explain me if there is a way to dynamically update the shuffle partition with the help of dynamic calculations of size and no. of cores in the cluster(if in case the cluster is altered in future). Thanks in advance.
@afaqueahmad7117
@afaqueahmad7117 5 месяцев назад
@tandaibhanukiran4828 I believe it's challenging to be able to dynamically configure shuffle partitions only knowing the size of your data and cluster configuration. The most important input is the "Shuffle Write". Estimating shuffle write is not very clear-cut as it depends on several factors (skew, transformation code complexity i.e. joins, aggregations, dynamic execution plans etc..) If you have historical data, or similar jobs (using same/similar) datasets with similar operations i.e. joins aggregations, you could use those "Shuffle Partition" numbers and apply the logic (as demonstrated in the scenarios) to dynamically get the number of shuffle partitions. However, I would stress to use this approach with caution.
@tandaibhanukiran4828
@tandaibhanukiran4828 5 месяцев назад
Thank you very much.@@afaqueahmad7117
@nikhillingam4630
@nikhillingam4630 Месяц назад
Consider a scenario where my first data shuffle size is 100gb then giving more shuffle partitions make sense now in the last shuffle data size is drastically reduced to 10gb according to calculations how would be to give shuffle partitions giving 1500 would benefit for the first shuffle and not for the last shuffle. How do one approach this scenario
@arunkindra832
@arunkindra832 6 месяцев назад
Hi Afaque, in 1st case, when you have configured 1500 shuffle partitions, but initially you have said 1000 cores available in a cluster, and you have also mentioned about one partition per core. Then from where we got rest 500 partitions? Another doubt, do we need to configure no of cores consumed by a job according to the shuffle partitions we provide? Also, please explain a case where we don't have enough cores available in the cluster.
@afaqueahmad7117
@afaqueahmad7117 6 месяцев назад
Hey @arunkindra832, in scenario 1, referring to the diagram, there are 20 cores in the cluster (5 executors * 4 core each). 1500 shuffle partitions are going to be processed by a 20 core cluster. Assuming each core is going to take the same amount of time to process a shuffle partition and the distribution of shuffle partitions is uniform, there's approximately going to be 1500/20 = 75 rounds. In 1 round, 20 shuffle partitions are going to be processed.
@user-er4qt3pc5s
@user-er4qt3pc5s 6 месяцев назад
@afaque shuffle partition will consist of both the shuffled data (keys that were not originally present in the executor and were shuffled to the partition) and the non-shuffled data (keys that were already present in the executor and were not shuffled). So, the size of the shuffle partition cannot be directly calculated from the shuffle write data alone,as it also depends on the distribution of the data across the partitions ?
@Kiedi7
@Kiedi7 6 месяцев назад
Hey man, awesome series so far. I noticed in your videos that you share your mac screen but use an apple pencil on your ipad to annotate? Can you describe that setup on how you’re able to annotate on your Jupyter Notebook (presented on mac) but from an ipad instead? Thanks in advance appreciate it
@afaqueahmad7117
@afaqueahmad7117 6 месяцев назад
Thank you! I use Ecamm Live so adding both iPad and Mac as a screen helps me navigate easily between both. For the ones, where I’ve annotated on Notebooks, I’ve used Zoom to share my screen and annotate on Zoom.
@akshaybaura
@akshaybaura 7 месяцев назад
in scenario 1, how exactly is reducing the size of partitions beneficial ? Correct me if I'm wrong, in case we let a core process 1.5 GB, most of the data will spill to disk for computation which will increase IO and hence increase time taken for completion. However, in case we reduce the partition size, we increase the number of partitions as a result which again would increase the time taken for job completion.
@kartikthakur6461
@kartikthakur6461 7 месяцев назад
I am assuming Disk computation will take much longer to complete.
@afaqueahmad7117
@afaqueahmad7117 7 месяцев назад
Hey @akshaybaura, @kartikthakur6461, you're on the right track. Reducing each partition from 1.5g to 200mb does increase the number of partitions, but it is beneficial for the following important reasons: 1. Reduced memory pressure: when a core processes a large partition (1.5g), it's more likely to run out of memory and start spilling to disk. This spill-over is going to cause increased IO operations which in turn is significantly going to slow down processing. However, I would still emphasize on the fact that spill would depend on the memory per core. If memory per core is > 1.5g, spills won't happen but processing is going to be significantly slow. 2. Better resource utilisation: increasing the number of partitions allows better distribution of workload across the cores. 3. Balanced workload: Smaller partitions help in achieving a more balanced workload across cores. The importance is in ensuring that each core is given to process a "manageable" amount of data. However, the goal is to strike a balance - partition sizes should be small enough to avoid excessive memory usage and I/O spillover but large enough to ensure that the overhead of managing many partitions doesn’t outweigh the benefits.
@Revnge7Fold
@Revnge7Fold 2 месяца назад
I think its a bit dumb for spark to keep this value static... why not rather have a "target shuffle size(mb/gb)" config in spark. I wish the spark planner was a bit more sophisticated.
@afaqueahmad7117
@afaqueahmad7117 Месяц назад
You could get a similar effect by turning on AQE and setting "spark.sql.adaptive.advisoryPartitionSizeInBytes" to your desired size. Documentation here: spark.apache.org/docs/latest/sql-performance-tuning.html
@Revnge7Fold
@Revnge7Fold Месяц назад
@@afaqueahmad7117 Awesome! Thanks for the advice! Your videos have been really helpful!
@kaushikghosh-po1ew
@kaushikghosh-po1ew 3 месяца назад
i have a question in this. Let's say that the data volume that i am processing varies on daily basis i..,e someday it can be 50gb someday it can be 10gb. keeping in mind the 200mb per shuffle partition limit the num of partition for optimum partition should change on each run in that case. But it;s not practically possible to change the code every time to have a proper shuffle partition. How should this scenario be handled ? i read about a parameter sql.files.maxPartitionBytes which is defaulted to 128mb. Should i change this to 200 and let the number of shuffle partition be calculated automatically ? In that case will the value under sql.shuffle.partitions be ignored ?
@tsaha100
@tsaha100 7 месяцев назад
@Afaque : Very good video. So in real life for varying work load size ( shuffle write size 50mb - 300GB) you have to change the shuffle partition size programmatically ? How do you figure out the shuffle write in the code which find in the spark UI? Is there any solution?
@afaqueahmad7117
@afaqueahmad7117 6 месяцев назад
Thanks for the kind works @tsaha100. I don't think there's a clear way to estimate the shuffle write statically which is shown on the Spark UI using code , because of dynamic nature of Spark's execution. If you would like to log the Shuffle write metrics when your task completes, you could try attaching the SparkListener to your SparkContext and override onTaskEnd method to capture shuffle write metrics, but I believe it's just easier to run and refer to the Spark UI. You can refer: books.japila.pl/apache-spark-internals/SparkListenerInterface/#ontaskgettingresult
@vikastangudu712
@vikastangudu712 3 месяца назад
Thanks for the explanantion, But Isn't the parameter(spark.sql.shuffle.partitions) is no way dependent on the cradinality of the group by/ join column ?
@crepantherx
@crepantherx 7 месяцев назад
can you please cover bucketing handson in adb(handson with file view). In your last video it is working in your IDE but not in databricks. (delta bucketing not allowed)
@atifiu
@atifiu 7 месяцев назад
@afaque how can I calculate total shuffle data size without executing the code before hand. Also size in disk is not same as in memory as memory data is uncompressed.
@afaqueahmad7117
@afaqueahmad7117 6 месяцев назад
Hey @atifiu, calculating total shuffle data size without executing the code can be challenging due to dynamic nature of Spark's execution. There are several things which would come into picture for example: data distribution, skew, nature of transformations (wide vs narrow) depending on which you may / may not get an accurate shuffle data size;
@ramvel814
@ramvel814 6 месяцев назад
Can you tell how to resolve Python worker exited unexpectedly (crashed)
@user-dx9qw3cl8w
@user-dx9qw3cl8w 7 месяцев назад
thanks again for the topic. i worried that you stopped ... because this deapth knowledge can not even get from colleauges
@afaqueahmad7117
@afaqueahmad7117 7 месяцев назад
@user-dx9qw3cl8w More to come! :)
@omairaparveen2001
@omairaparveen2001 7 месяцев назад
waiting eagerly :)
@vinothvk2711
@vinothvk2711 7 месяцев назад
In Scnario 2 - Finally, we are distributing 4.2 mb for each core. In this case whats the spark.sql.shuffle.partitions . Is it 12?
@afaqueahmad7117
@afaqueahmad7117 7 месяцев назад
Yes @vinothvk2711, it's 12. As explained, this is going to ensure 100% utilisation of the cluster.
Далее
The TRUTH About High Performance Data Partitioning
22:18
before you code, learn how computers work
7:05
Просмотров 252 тыс.
КТО ЛЮБИТ ГРИБЫ?? #shorts
00:24
Просмотров 1,1 млн
I gave 127 interviews. Top 5 Algorithms they asked me.
8:36
Data Validation with Pyspark || Real Time Scenario
37:34
Why Data Skew Will Ruin Your Spark Performance
12:36
Просмотров 4,7 тыс.