For Spark setup Hadoop is not mandatory. You can have Spark standalone setup using docker. But if you still want the same, you can clone and follow the steps from below Github repo github.com/Marcel-Jan/docker-hadoop-spark
Standalone cluster used in this tutorial is on docker. You can set it up yourself. For notebook - hub.docker.com/r/jupyter/pyspark-notebook You can use the below docker file to setup cluster github.com/subhamkharwal/docker-images/tree/master/spark-cluster-new
can accumulator variables be used to calculate avg as well? as when we are calculating the sum it can do for each executors but average wont work in the same way.
Hello Sushant, To calculate avg, the simplest approach is to use two variables one for sum and another for count. Later you can divide the sum with count to get the avg. If you like the content, please make sure to share with your network 🛜
Why you made 32 shuffle partition if you have 8core, If one partition is going to process on single core, from where it will get other remaining 24 cores?
when I am refreshing my spark ui is giving error how can be solved giving this "spark://6b16b66805db:7077" and on "localhost:4040" also not working I give this".master("spark://6b16b66805db:7077")" how can be solved please.
From the vidoe : Select, where, group by etc ate transformations. We have narrow transformation and wide transformation. Wide transformation are those when data has to move or interact with data of other partitions in next stages
@@easewithdata Thanks & I'm following you for more than a month its been a great learning experience , we want you to make End to End Project in Pyspark
one of the best explanation in depth, Thanks :) Could you please make a video on "end to end Data engineering" project, from requirement gathering to the deployment.
Hello, and thanks for the sharing of these useful videos. How to handle the writing in delta tables: Because the best practice is that the size of each parquet file should be between 128 MB to 1 GB. How to handle this situation while each batch has very less than the size that is mentioned? or how to handle to collect the number of batches and to reach the mentioned size and finally to write in deltalake.
Usually microbatch execution in Spark can write multiple small files. This requires a later stage to read all those files and write a compacted file (say for each day) of bigger size to avoid small file issue. You can use this compacted file to read data in your downstream systems.
Hi and thanks I have a question: What is the best practice in distributed mode for Spark? Is it good that the number of kafka partitions and spark executors are equal?
For me clearSource and sourceArchive is not working ,files are not getting archived and archive folder is not getting created.whta colud be the issues?
Please check this link to check if you are setting all parameters as per requirement - spark.apache.org/docs/latest/structured-streaming-programming-guide.html
How do pyspark handles consumer group and consumer of kafka , generally by converting consumer group we can start consuming data from starting of topic , how to start consuming from beginning, startingoffset as earliest , will always read from starting .
Spark Streaming utilizes checkpoint directory to handle offsets, so that it doesn't consume the offset which is already consumed. You can find more details to set the starting offset with Kafka at this link - spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
I have implemented watermarking using tumbling window, of 5 minutes to aggregate values, triggering 5 minutes and used update mode, Problem is that I have got late data again, how it should be fixed. ///////////////////////////////////////////////////////////////////////////////////////////// df = df.withWatermark("open_time", "5 minutes") # Group data into 5-minute candlesticks by symbol candlestick_df = df \ .groupBy("symbol", window("open_time", "5 minutes", startTime=0)) \ .agg( expr("first(open_price) as open_price"), expr("max(high_price) as high_price"), expr("min(low_price) as low_price"), expr("last(close_price) as close_price"), expr("sum(volume) as volume") ) query_kafka = candlestick_df \ .selectExpr("CAST(symbol AS STRING) AS key", "to_json(struct(*)) AS value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("topic", output_topic) \ .option("checkpointLocation", checkpoint_dir) \ .outputMode("update") \ .option("truncate", "false") \ .trigger(processingTime='5 minutes') \ .start()
Shuffle is only involved when the next step is a wide operation. And for KBs data, it depends on the next stage. If you have count it will first make a local count before shuffling the data (which is reduced to kbs)
Yes definitely it is. Bus its impossible to cover all functions on RU-vid. You can definitely checkout Spark documentation for more information. This series was meant to enable people with zero or basic knowledge to get started
Great explanations & effort taken. I have encountered an Py4JJavaError while executing the df_agg.writeStream.format("console").outputMode("complete").start().awaitTermination command.Could you explain why?