Тёмный
pysparkpulse
pysparkpulse
pysparkpulse
Подписаться
Welcome to PySparkPulse, your go-to channel for mastering PySpark from beginner to advanced levels! Join me as we explore comprehensive tutorials, breaking down key concepts with practical examples. Dive into real-world applications, and gear up for the toughest interviews with our in-depth coverage of PySpark and data engineering interview questions. Whether you're a novice or seasoned pro, PySparkPulse is your gateway to mastering the art of big data processing. Subscribe now to stay ahead in the dynamic world of PySpark and elevate your data engineering skills. Let's spark innovation together! #PySpark #DataEngineering #PySparkPulse
Комментарии
@rahuldave6699
@rahuldave6699 2 дня назад
query = spark.sql("with cte as(select p.prescription_id,p.doctor_id,p.medication_id,m.medication_name from medications as m inner join \ prescriptions as p on m.medication_id == p.medication_id)\ select medication_name,count(distinct(doctor_id)) as diff_docot_count\ from cte group by medication_name having count(distinct(doctor_id))>=3 ")
@KaunteyaShaw
@KaunteyaShaw 14 дней назад
Data & Code orders_data = [ (1, 101, '2024-01-01', 'Complete'), (2, 102, '2024-01-05', 'Pending'), (3, 103, '2024-01-07', 'Complete'), (4, 104, '2024-01-10', 'Complete'), (5, 105, '2024-01-12', 'Pending'), (6, 106, '2024-01-14', 'Complete'), (7, 107, '2024-01-15', 'Complete'), (8, 108, '2024-01-18', 'Pending'), (9, 109, '2024-01-20', 'Pending'), (10, 110, '2024-01-22', 'Complete') ] orders_data_schema = ["order_id", "customer_id", "order_date", "order_status"] orders_df = spark.createDataFrame(orders_data, orders_data_schema) order_items_data = [ (1, 1, 501, 2, 25.00), (2, 1, 502, 1, 15.00), (3, 2, 503, 3, 10.00), (4, 3, 504, 5, 20.00), (5, 4, 505, 2, 30.00), (6, 5, 506, 1, 50.00), (7, 5, 507, 4, 12.50), (8, 6, 508, 2, 40.00), (9, 7, 509, 1, 25.00), (10, 8, 510, 3, 35.00) ] order_items_data_schema = ["order_item_id", "order_id", "product_id", "quantity", "unit_price"] orders_items_df = spark.createDataFrame(order_items_data, schema = order_items_data_schema) orders_df.show() orders_items_df.show() joined_df = orders_df.join(orders_items_df, on='order_id', how= 'left') joined_df.show() res_df = joined_df.groupBy('customer_id').agg(sum(when(col('order_status')=='Complete', col('quantity')*col('unit_price')).otherwise(lit('0.00'))).alias('TotalSales')) res_df.show() res_df1 = joined_df.groupBy('customer_id').agg(sum(expr("IF(order_status=='Complete', quantity*unit_price , 0)")).alias("TotalRevenue")) res_df1.show()
@KaunteyaShaw
@KaunteyaShaw 14 дней назад
Could we not use orderBy(col("Salary").desc()) Here is my code: data = [ (1, "John", 50000.00, "HR", "2022-01-15"), (2, "Alice", 80000.00, "Sales", "2021-11-25"), (3, "Bob", 60000.00, "Marketing", "2020-05-12"), (4, "Diana", 75000.00, "Sales", "2019-06-18"), (5, "Eve", 85000.00, "Marketing", "2023-03-22"), (6, "Frank", 45000.00, "IT", "2022-07-14"), (7, "Grace", 90000.00, "Sales", "2021-12-01"), (8, "Hank", 30000.00, None, "2020-08-09"), (9, "Ivy", 120000.00, "Sales", "2022-09-10"), (10, "Jake", 110000.00, "Marketing", None) ] schema = ["EmployeeID", "Name", "Salary", "Dept", "Date"] # Creating DataFrame emp_df = spark.createDataFrame(data, schema) emp_df.show() from pyspark.sql.functions import col emp_df = emp_df.withColumn("BonusAmount", when(((col('Dept')=='Sales') & (col("Salary")>50000)), col("Salary") * 0.10) \ .when((col('Dept')=='Marketing') & (col('Salary')>60000), col("Salary") * 0.50) \ .otherwise(col("Salary") * 0.05) ) emp_df.show() high_salary_employees = emp_df.filter(col("Salary")+col("BonusAmount") > 70000) high_salary_employees.show() high_salary_employees_sorted = high_salary_employees.orderBy(col("Salary").desc()) high_salary_employees_sorted.limit(5).show()
@KaunteyaShaw
@KaunteyaShaw 14 дней назад
Sample Data: data = [ (1, "John", 5000.00, "HR", "2022-01-15"), (2, "Alice", 5500.00, None, "2023-03-22"), (3, "Bob", 6000.00, "IT", None), (4, "Diana", 5200.00, "Finance", "2024-07-19"), (5, "Eve", 4800.00, None, "2024-09-01"), (6, "Frank", 4900.00, "Marketing", None) ] schema = ["empid", "name", "salary", "dept", "hire_date"] emp_df = spark.createDataFrame(data, schema = schema) emp_df.show() Code: from pyspark.sql.functions import * null_counts = emp_df.agg(*(count(when(col(c).isNull(), c)).alias(c)for c in emp_df.columns)) # null_counts.show() display(null_counts) from pyspark.sql.functions import * mean_salary = emp_df.select(mean("salary")).collect()[0][0] display(mean_salary) emp_df = emp_df.withColumn("salary", when(col('salary').isNull(), mean_salary).otherwise(col("salary"))) emp_df.show() emp_df = emp_df.withColumn("dept", when(col('dept').isNull(), "Others").otherwise(col("dept"))) emp_df.show()
@KaunteyaShaw
@KaunteyaShaw 15 дней назад
data = [(1, ("John", 25, ("New York", "USA"))), (2, ("Alice", 30, ("London", "UK"))), (3, ("Bob", 28, ("Sydney", "Australia")))] from pyspark.sql.types import StructType, StructField, IntegerType, StringType nested_Schema = StructType([ StructField('Id', IntegerType(), True), StructField('Details', StructType([ StructField('Name', StringType(), True), StructField('Age', IntegerType(), True), StructField('Address', StructType([StructField('City', StringType(), True), StructField('State', StringType(), True) ]) ) ]) ) ]) from pyspark.sql.functions import concat, concat_ws, lit df = df.withColumn("name", df["Details.Name"]).withColumn("Age", df["Details.Age"]) df = df.withColumn("FullAddress", concat(df["Details.Address.City"],lit(" - "), df["Details.Address.State"])) df.show()
@subedi04
@subedi04 2 месяца назад
can you also share SQL from snowflake side to create those tables
@pysparkpulse
@pysparkpulse Месяц назад
Sure will share
@sathyamoorthy2362
@sathyamoorthy2362 2 месяца назад
For 2 question it doesn't mention to calculate sum for previous day and current day ,I beleive it should be per policy type sum not specific days ,please check. If you have avg why we need sum because sum=2*avg?
@pysparkpulse
@pysparkpulse Месяц назад
We can use that too
@tanmaykapil5362
@tanmaykapil5362 2 месяца назад
expecting many more interview questions from you till now i have enjoyed every session of yours.
@pysparkpulse
@pysparkpulse Месяц назад
Sure I am trying to gather questions and make video on that. Glad you liked it
@ourgourmetkitchen1774
@ourgourmetkitchen1774 2 месяца назад
great video, really helpful
@pysparkpulse
@pysparkpulse 2 месяца назад
Glad it was helpful!
@dolstoi4206
@dolstoi4206 3 месяца назад
thanks for the video, but where are the map_key and map_value functions?
@pysparkpulse
@pysparkpulse 2 месяца назад
Will create another video on it thanks for highlighting
@uandmahesh6096
@uandmahesh6096 3 месяца назад
Hi bro.. I like your videos a lot as tgey are always very informative. I have 1 dbt like can we answer this question in SQL in interviews
@pysparkpulse
@pysparkpulse 2 месяца назад
Yes
@dineshughade6741
@dineshughade6741 3 месяца назад
Could you share that ppt with us it would be great then
@pysparkpulse
@pysparkpulse 2 месяца назад
Please check the below link www.linkedin.com/feed/update/urn:li:activity:7160308705211645952?updateEntityUrn=urn%3Ali%3Afs_feedUpdate%3A%28V2%2Curn%3Ali%3Aactivity%3A7160308705211645952%29
@ShubhamRai06
@ShubhamRai06 3 месяца назад
from pyspark.sql.functions import avg medications_data = [ (1, "Medication A", 1), (2, "Medication B", 1), (3, "Medication C", 2), (4, "Medication D", 2), (5, "Medication E", 3) ] medication_prices_data = [ (1, 10.50), (2, 20.75), (3, 15.25), (4, 25.00), (5, 12.50) ] medications_df = spark.createDataFrame(medications_data, ["medication_id", "medication_name", "category_id"]) medication_prices_df = spark.createDataFrame(medication_prices_data, ["medication_id", "price"]) # medications_df.show() # medication_prices_df.show() joined_df=medications_df.join(medication_prices_df,medications_df.medication_id==medication_prices_df.medication_id).select(medications_df.medication_name,medications_df.category_id,medication_prices_df.medication_id,medication_prices_df.price) # joined_df.show() # joined_df.printSchema() result_df=joined_df.groupBy("category_id").agg(avg("price").alias("Avg Price")) result_df.show()
@ShubhamRai06
@ShubhamRai06 3 месяца назад
# Find the medications that were prescribed by at least three different doctors. # Relevant DF: # df1 = medications (medication_id, medication_name), # df2 = prescriptions (prescription_id, doctor_id, medication_id) medications_data = [ (1, "Medication A"), (2, "Medication B"), (3, "Medication C"), (4, "Medication D"), (5, "Medication E") ] prescriptions_data = [ (1, 1, 1), (2, 2, 1), (3, 3, 1), (4, 1, 2), (5, 2, 2), (6, 3, 2), (7, 1, 3), (8, 2, 4), (9, 3, 4), (10, 4, 5), (11, 5, 5), (12, 6, 5) ] medications_df = spark.createDataFrame(medications_data, ["medication_id", "medication_name"]) prescriptions_df = spark.createDataFrame(prescriptions_data, ["prescription_id", "doctor_id", "medication_id"]) # medications_df.show() # medications_df.printSchema() # prescriptions_df.show() # prescriptions_df.printSchema() Most_pres_medication_df = prescriptions_df.groupBy('medication_id').count().filter('count>="3"') Most_pres_medication_df.cache().count() medications_df.cache().count() final_df=Most_pres_medication_df.join(medications_df,medications_df.medication_id==Most_pres_medication_df.medication_id,"inner").drop(Most_pres_medication_df.medication_id) final_df.show()
@mr.chicomalo4003
@mr.chicomalo4003 3 месяца назад
QUESTION1 ------------------- print('INPUT DATAFRAME') data=[(1,"Alice","123 Main Street,New York,USA"),(2,"Bob","456 Oak Avenue,San Francisco,USA"), (3,"Carol","789 Pine Road,Los Angeles,USA"),(4,"David","321 Elm Lane,Chicago,USA"), (5,"Emily","654 Maple Drive,Miami,USA")] schema=("emp_id","emp_name","emp_add") df=spark.createDataFrame(data,schema) df.show(truncate=False) print('OUTPUT DATAFRAME') from pyspark.sql.functions import split df1=df.withColumn("street_name",split("emp_add",",")[0])\ .withColumn("city_name",split("emp_add",",")[1])\ .withColumn("country_name",split("emp_add",",")[2]).drop("emp_add").show(truncate=False) print('O/p by using getItem() function') print('In PySpark, the getItem() function is used to retrieve an element from an array or a value from a map column within a DataFrame') df2=df.withColumn("street_name",split("emp_add",",").getItem(0))\ .withColumn("city_name",split("emp_add",",").getItem(1))\ .withColumn("country_name",split("emp_add",",").getItem(2)).drop("emp_add").show(truncate=False) THEORY -------------- In PySpark, the getItem() function is used to retrieve an element from an array or a value from a map column within a DataFrame. The split() function returns an array column in PySpark, which can be further processed using array functions or exploded into multiple rows if needed.Each element of the array corresponds to a substring resulting from the split operation.The limit argument in split() specifies the maximum number of splits to perform. This is particularly useful when you want to limit the number of resulting substrings after splitting the original string. QUESTION2 -------------------- from pyspark.sql.functions import row_number,col from pyspark.sql.window import Window print('INPUT DATASET') data = [ (1, 'Math', 90), (1, 'Science', 93), (1, 'History', 85), (2, 'Math', 85), (2, 'Science', 79), (2, 'History', 96), (3, 'Math', 95), (3, 'Science', 87), (3, 'History', 77), (4, 'Math', 78), (4, 'Science', 91), (4, 'History', 90), (5, 'Math', 92), (5, 'Science', 84), (5, 'History', 88), ] schema=("id","subject","Marks") df=spark.createDataFrame(data,schema) df.show() windowSpec = Window.partitionBy("subject").orderBy(col("Marks").desc()) df1 = df.withColumn("row_number",row_number().over(windowSpec)) print("OUTPUT AFTER APPLYING ROW_NUMBER() WINDOW FUNCTION") df1.show(truncate = False) print("OUTPUT OF STUDENT ID WHO SCORED TOP IN EACH SUBJECT") df1.filter(df1.row_number == 1).select("id","subject").show() THEORY ------------- PySpark Window Ranking functions row_number() window function gives the sequential row number starting from 1 to the result of each window partition. rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties. dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank() function difference being rank function leaves gaps in rank when there are ties. This is the same as the LAG function in SQL. The lag() function allows you to access a previous row’s value within the partition based on a specified offset. It retrieves the column value from the previous row, which can be helpful for comparative analysis or calculating differences between consecutive rows. This is the same as the LEAD function in SQL. Similar to lag(), the lead() function retrieves the column value from the following row within the partition based on a specified offset. It helps in accessing subsequent row values for comparison or predictive analysis.
@rockroll28
@rockroll28 4 месяца назад
Good information. Constructive criticism: You were explaining too fast. Chart explained can be part of 1 video and practical can be in another video. This way 2 videos of 10 to 12 minutes could have been helpful. Best of luck 👍🏻
@pysparkpulse
@pysparkpulse 4 месяца назад
Thank you for your feedback will keep this in mind ☺️
@abhishekmalvadkar206
@abhishekmalvadkar206 4 месяца назад
very well explained 👏
@pysparkpulse
@pysparkpulse 4 месяца назад
Thank you Abhishek
@nidhisingh5674
@nidhisingh5674 4 месяца назад
You're doing a great job👏
@pysparkpulse
@pysparkpulse 4 месяца назад
Thank you @nidhisingh5674
@bolisettisaisatwik2198
@bolisettisaisatwik2198 4 месяца назад
We can also order by date and then filter the latest date.
@pysparkpulse
@pysparkpulse 4 месяца назад
yes right
@Paruu16
@Paruu16 5 месяцев назад
bhai data bhi description pr dal dia kro will be timesaviing while practicing
@pysparkpulse
@pysparkpulse 4 месяца назад
Yes sure bro in the later videos i did this
@Paruu16
@Paruu16 5 месяцев назад
I used this for full address however the column shows up with null values. df=df.withColumn("FullAdress",concat(col("city")+col("Country")))
@pysparkpulse
@pysparkpulse 4 месяца назад
Hi @paruu16 please don't use + and use comma. This is a syntax error
@Paruu16
@Paruu16 5 месяцев назад
bro can you please also post the data required to do these questions as well in the description.
@pysparkpulse
@pysparkpulse 4 месяца назад
Yes sure
@Manojkumar__
@Manojkumar__ 5 месяцев назад
pls try to improve sound quality.. 😢
@pysparkpulse
@pysparkpulse 5 месяцев назад
Sure I will do this thank for your feedback
@eternalsilvers5961
@eternalsilvers5961 5 месяцев назад
If Execution Memory and Storage Memory both supports data spilling into Disks why does the OOM issues occurs?
@pysparkpulse
@pysparkpulse 5 месяцев назад
There can be multiple reasons for this even if spill to disk is possible There are some tasks which requires certain execution memory and if it is not present it may lead to oom Or If we cache some bigger data the it may also lead to oom.
@shivamchandan50
@shivamchandan50 5 месяцев назад
plz make video on debugging in pyspark
@pysparkpulse
@pysparkpulse 5 месяцев назад
Sure 😃
@prabhatgupta6415
@prabhatgupta6415 5 месяцев назад
What tech r being used in your project?
@pysparkpulse
@pysparkpulse 5 месяцев назад
Currently i am planning to do it with community Databricks and snowflake
@prabhatgupta6415
@prabhatgupta6415 5 месяцев назад
@@pysparkpulse No I am asking..in ur new company..
@sundipperumallapalli
@sundipperumallapalli 5 месяцев назад
So UnionBy is used when there is Interchanged or Switched/Swapped Columns in a given DataFrames isn't it Sir?. So that It will arrange Columns Linearly
@pysparkpulse
@pysparkpulse 5 месяцев назад
Yes right
@sundipperumallapalli
@sundipperumallapalli 5 месяцев назад
@@pysparkpulse Thanks Sir 🔥
@0adarsh101
@0adarsh101 5 месяцев назад
keep uploading videos like this.
@pysparkpulse
@pysparkpulse 5 месяцев назад
Sure thank you for your support 😊
@princyjain9323
@princyjain9323 5 месяцев назад
Very helpful lecture sir thank you so much ❤
@pysparkpulse
@pysparkpulse 5 месяцев назад
Thank you 😊
@AgamJain-vq2ub
@AgamJain-vq2ub 5 месяцев назад
Doing good job beta 👏🏻😊
@pysparkpulse
@pysparkpulse 5 месяцев назад
Thank you 😊
@avinash7003
@avinash7003 5 месяцев назад
can you do one real time project
@pysparkpulse
@pysparkpulse 5 месяцев назад
Yes I was also thinking about the same will create a project end to end in community Databricks
@avinash7003
@avinash7003 5 месяцев назад
@@pysparkpulse please atleast 2hours one project should be unique from other RU-vidrs
@pysparkpulse
@pysparkpulse 5 месяцев назад
Sure definitely it will be unique
@0adarsh101
@0adarsh101 5 месяцев назад
You know any good DE projects available on RU-vid or Udemy which we can do?
@pysparkpulse
@pysparkpulse 5 месяцев назад
In udemy there are many projects you can pick any according to the cloud of your choice
@0adarsh101
@0adarsh101 5 месяцев назад
@@pysparkpulse thanks
@0adarsh101
@0adarsh101 5 месяцев назад
you haven't faced any python coding question but still on safer side if we need to prepare for just python coding questions then what all topics of python u will suggest?
@pysparkpulse
@pysparkpulse 5 месяцев назад
For python as DE you should be aware about list, dictionary, set functions and oops
@0adarsh101
@0adarsh101 5 месяцев назад
@@pysparkpulse thanks
@souravdas-kt7gg
@souravdas-kt7gg 5 месяцев назад
Interview Questions are good
@pysparkpulse
@pysparkpulse 5 месяцев назад
Thank you sourav
@prabhatgupta6415
@prabhatgupta6415 6 месяцев назад
Hey Priyam! Explain the process of thoughtwork. How u cleared? Regarding notice period,How to approach, How did you approach. Thanks and appreciated for all the contents
@pysparkpulse
@pysparkpulse 5 месяцев назад
@prabhatgupta6415 Thank You for your appreciation. There will be in total 4 rounds. Will create a detailed video on interview procedure and all.
@prabhatgupta6415
@prabhatgupta6415 5 месяцев назад
@@pysparkpulse do mention how did u tackle np. I do have same year of exp as you. Need some guidance on that. Thanks🤗
@pysparkpulse
@pysparkpulse 5 месяцев назад
@@prabhatgupta6415 Sure
@mahir14_
@mahir14_ 5 месяцев назад
Can you make GitHub repo of all questions or store somewhere so it's better to get at one go and practice
@pysparkpulse
@pysparkpulse 5 месяцев назад
Sure will do this
@prabhatgupta6415
@prabhatgupta6415 6 месяцев назад
Explain ur interview process
@pysparkpulse
@pysparkpulse 5 месяцев назад
Sure
@shwetadawkhar4815
@shwetadawkhar4815 6 месяцев назад
How many rounds of interview will be there for Data Engineers?
@pysparkpulse
@pysparkpulse 5 месяцев назад
@shwetadawkhar4815 Will be creating a detailed on this on interview process and how to crack DE interviews.
@aadil8409
@aadil8409 6 месяцев назад
can you pls share the ppt also. it will be very helpful for last time rivison.
@pysparkpulse
@pysparkpulse 5 месяцев назад
Sure Will attach it
@rawat7203
@rawat7203 6 месяцев назад
Hi Sir, pyspark: joinDf = df.alias('a').join(df.alias('b'), col('a.ManagerId')==col('b.Id')) final_df = joinDf.groupBy('b.Id', 'b.Name').agg(count(col('a.Id')).alias('No_of_reportees')).filter(col('No_of_reportees')>=3) final_df.show() %sql with cte as( select ManagerId, count(*) as no_of_reports from office group by ManagerId having no_of_reports >=5) select o.Name as Emp_Name, c.no_of_reports from cte c join office o on c.ManagerId = o.Id Emp_Name no_of_reports Jane Smith 6 David Lee 5
@pysparkpulse
@pysparkpulse 6 месяцев назад
Great work keep going 💯
@rawat7203
@rawat7203 6 месяцев назад
%sql with result as ( select medication_id, count(distinct(doctor_id)) as count_distinct_doc from prescriptions group by medication_id having count_distinct_doc >=3) select m.medication_name from result r join medications m on r.medication_id = m.medication_id
@pysparkpulse
@pysparkpulse 6 месяцев назад
Yes correct 💯😀
@rawat7203
@rawat7203 6 месяцев назад
Thank you Sir
@pysparkpulse
@pysparkpulse 5 месяцев назад
Most welcome
@rawat7203
@rawat7203 6 месяцев назад
Thankyou sir
@pysparkpulse
@pysparkpulse 6 месяцев назад
Thank you for your appreciation 😊
@rawat7203
@rawat7203 6 месяцев назад
Thank you Sir, keep doing the great work
@pysparkpulse
@pysparkpulse 6 месяцев назад
Yes thank you 🙂
@swarajrandhvan9057
@swarajrandhvan9057 6 месяцев назад
Nice Explanation! Thank you!
@pysparkpulse
@pysparkpulse 6 месяцев назад
Thank you @swaraj
@rawat7203
@rawat7203 7 месяцев назад
Thank you Sir, very nice Qs
@pysparkpulse
@pysparkpulse 6 месяцев назад
Thank you @rawat 😊
@rawat7203
@rawat7203 7 месяцев назад
Sir please provide data
@rawat7203
@rawat7203 7 месяцев назад
Thank you Sir
@pysparkpulse
@pysparkpulse 7 месяцев назад
Thank you 😊
@vishaldeshatwad8690
@vishaldeshatwad8690 7 месяцев назад
thank you and please make a video on how to explain project in the interview it will really help
@pysparkpulse
@pysparkpulse 7 месяцев назад
Sure 😊
@rawat7203
@rawat7203 7 месяцев назад
df_bonus = df.withColumn("bonus", when((col('department') == 'Sales') & (col('salary') > 50000), 0.10 * col('salary')) .otherwise( when((col('department') == 'Marketing') & (col('salary') > 60000), 0.10 * col('salary')) .otherwise(0.5 * col('salary')) ) ) high_salary_employees = df.filter(col('salary') > 70000).orderBy(col('salary').desc()).limit(5) high_salary_employees.show()
@rawat7203
@rawat7203 7 месяцев назад
Hi Sir, can you please add the schema and the data in future videos .. Thanks
@pysparkpulse
@pysparkpulse 7 месяцев назад
Hi Rawat, yes I am adding these you can check my recent videos