query = spark.sql("with cte as(select p.prescription_id,p.doctor_id,p.medication_id,m.medication_name from medications as m inner join \ prescriptions as p on m.medication_id == p.medication_id)\ select medication_name,count(distinct(doctor_id)) as diff_docot_count\ from cte group by medication_name having count(distinct(doctor_id))>=3 ")
Data & Code orders_data = [ (1, 101, '2024-01-01', 'Complete'), (2, 102, '2024-01-05', 'Pending'), (3, 103, '2024-01-07', 'Complete'), (4, 104, '2024-01-10', 'Complete'), (5, 105, '2024-01-12', 'Pending'), (6, 106, '2024-01-14', 'Complete'), (7, 107, '2024-01-15', 'Complete'), (8, 108, '2024-01-18', 'Pending'), (9, 109, '2024-01-20', 'Pending'), (10, 110, '2024-01-22', 'Complete') ] orders_data_schema = ["order_id", "customer_id", "order_date", "order_status"] orders_df = spark.createDataFrame(orders_data, orders_data_schema) order_items_data = [ (1, 1, 501, 2, 25.00), (2, 1, 502, 1, 15.00), (3, 2, 503, 3, 10.00), (4, 3, 504, 5, 20.00), (5, 4, 505, 2, 30.00), (6, 5, 506, 1, 50.00), (7, 5, 507, 4, 12.50), (8, 6, 508, 2, 40.00), (9, 7, 509, 1, 25.00), (10, 8, 510, 3, 35.00) ] order_items_data_schema = ["order_item_id", "order_id", "product_id", "quantity", "unit_price"] orders_items_df = spark.createDataFrame(order_items_data, schema = order_items_data_schema) orders_df.show() orders_items_df.show() joined_df = orders_df.join(orders_items_df, on='order_id', how= 'left') joined_df.show() res_df = joined_df.groupBy('customer_id').agg(sum(when(col('order_status')=='Complete', col('quantity')*col('unit_price')).otherwise(lit('0.00'))).alias('TotalSales')) res_df.show() res_df1 = joined_df.groupBy('customer_id').agg(sum(expr("IF(order_status=='Complete', quantity*unit_price , 0)")).alias("TotalRevenue")) res_df1.show()
Could we not use orderBy(col("Salary").desc()) Here is my code: data = [ (1, "John", 50000.00, "HR", "2022-01-15"), (2, "Alice", 80000.00, "Sales", "2021-11-25"), (3, "Bob", 60000.00, "Marketing", "2020-05-12"), (4, "Diana", 75000.00, "Sales", "2019-06-18"), (5, "Eve", 85000.00, "Marketing", "2023-03-22"), (6, "Frank", 45000.00, "IT", "2022-07-14"), (7, "Grace", 90000.00, "Sales", "2021-12-01"), (8, "Hank", 30000.00, None, "2020-08-09"), (9, "Ivy", 120000.00, "Sales", "2022-09-10"), (10, "Jake", 110000.00, "Marketing", None) ] schema = ["EmployeeID", "Name", "Salary", "Dept", "Date"] # Creating DataFrame emp_df = spark.createDataFrame(data, schema) emp_df.show() from pyspark.sql.functions import col emp_df = emp_df.withColumn("BonusAmount", when(((col('Dept')=='Sales') & (col("Salary")>50000)), col("Salary") * 0.10) \ .when((col('Dept')=='Marketing') & (col('Salary')>60000), col("Salary") * 0.50) \ .otherwise(col("Salary") * 0.05) ) emp_df.show() high_salary_employees = emp_df.filter(col("Salary")+col("BonusAmount") > 70000) high_salary_employees.show() high_salary_employees_sorted = high_salary_employees.orderBy(col("Salary").desc()) high_salary_employees_sorted.limit(5).show()
Sample Data: data = [ (1, "John", 5000.00, "HR", "2022-01-15"), (2, "Alice", 5500.00, None, "2023-03-22"), (3, "Bob", 6000.00, "IT", None), (4, "Diana", 5200.00, "Finance", "2024-07-19"), (5, "Eve", 4800.00, None, "2024-09-01"), (6, "Frank", 4900.00, "Marketing", None) ] schema = ["empid", "name", "salary", "dept", "hire_date"] emp_df = spark.createDataFrame(data, schema = schema) emp_df.show() Code: from pyspark.sql.functions import * null_counts = emp_df.agg(*(count(when(col(c).isNull(), c)).alias(c)for c in emp_df.columns)) # null_counts.show() display(null_counts) from pyspark.sql.functions import * mean_salary = emp_df.select(mean("salary")).collect()[0][0] display(mean_salary) emp_df = emp_df.withColumn("salary", when(col('salary').isNull(), mean_salary).otherwise(col("salary"))) emp_df.show() emp_df = emp_df.withColumn("dept", when(col('dept').isNull(), "Others").otherwise(col("dept"))) emp_df.show()
data = [(1, ("John", 25, ("New York", "USA"))), (2, ("Alice", 30, ("London", "UK"))), (3, ("Bob", 28, ("Sydney", "Australia")))] from pyspark.sql.types import StructType, StructField, IntegerType, StringType nested_Schema = StructType([ StructField('Id', IntegerType(), True), StructField('Details', StructType([ StructField('Name', StringType(), True), StructField('Age', IntegerType(), True), StructField('Address', StructType([StructField('City', StringType(), True), StructField('State', StringType(), True) ]) ) ]) ) ]) from pyspark.sql.functions import concat, concat_ws, lit df = df.withColumn("name", df["Details.Name"]).withColumn("Age", df["Details.Age"]) df = df.withColumn("FullAddress", concat(df["Details.Address.City"],lit(" - "), df["Details.Address.State"])) df.show()
can you also share SQL from snowflake side to create those tables
Sure will share
For 2 question it doesn't mention to calculate sum for previous day and current day ,I beleive it should be per policy type sum not specific days ,please check. If you have avg why we need sum because sum=2*avg?
We can use that too
expecting many more interview questions from you till now i have enjoyed every session of yours.
Sure I am trying to gather questions and make video on that. Glad you liked it
great video, really helpful
Glad it was helpful!
thanks for the video, but where are the map_key and map_value functions?
Will create another video on it thanks for highlighting
Hi bro.. I like your videos a lot as tgey are always very informative. I have 1 dbt like can we answer this question in SQL in interviews
Could you share that ppt with us it would be great then
Please check the below link www.linkedin.com/feed/update/urn:li:activity:7160308705211645952?updateEntityUrn=urn%3Ali%3Afs_feedUpdate%3A%28V2%2Curn%3Ali%3Aactivity%3A7160308705211645952%29
from pyspark.sql.functions import avg medications_data = [ (1, "Medication A", 1), (2, "Medication B", 1), (3, "Medication C", 2), (4, "Medication D", 2), (5, "Medication E", 3) ] medication_prices_data = [ (1, 10.50), (2, 20.75), (3, 15.25), (4, 25.00), (5, 12.50) ] medications_df = spark.createDataFrame(medications_data, ["medication_id", "medication_name", "category_id"]) medication_prices_df = spark.createDataFrame(medication_prices_data, ["medication_id", "price"]) # medications_df.show() # medication_prices_df.show() joined_df=medications_df.join(medication_prices_df,medications_df.medication_id==medication_prices_df.medication_id).select(medications_df.medication_name,medications_df.category_id,medication_prices_df.medication_id,medication_prices_df.price) # joined_df.show() # joined_df.printSchema() result_df=joined_df.groupBy("category_id").agg(avg("price").alias("Avg Price")) result_df.show()
# Find the medications that were prescribed by at least three different doctors. # Relevant DF: # df1 = medications (medication_id, medication_name), # df2 = prescriptions (prescription_id, doctor_id, medication_id) medications_data = [ (1, "Medication A"), (2, "Medication B"), (3, "Medication C"), (4, "Medication D"), (5, "Medication E") ] prescriptions_data = [ (1, 1, 1), (2, 2, 1), (3, 3, 1), (4, 1, 2), (5, 2, 2), (6, 3, 2), (7, 1, 3), (8, 2, 4), (9, 3, 4), (10, 4, 5), (11, 5, 5), (12, 6, 5) ] medications_df = spark.createDataFrame(medications_data, ["medication_id", "medication_name"]) prescriptions_df = spark.createDataFrame(prescriptions_data, ["prescription_id", "doctor_id", "medication_id"]) # medications_df.show() # medications_df.printSchema() # prescriptions_df.show() # prescriptions_df.printSchema() Most_pres_medication_df = prescriptions_df.groupBy('medication_id').count().filter('count>="3"') Most_pres_medication_df.cache().count() medications_df.cache().count() final_df=Most_pres_medication_df.join(medications_df,medications_df.medication_id==Most_pres_medication_df.medication_id,"inner").drop(Most_pres_medication_df.medication_id) final_df.show()
QUESTION1 ------------------- print('INPUT DATAFRAME') data=[(1,"Alice","123 Main Street,New York,USA"),(2,"Bob","456 Oak Avenue,San Francisco,USA"), (3,"Carol","789 Pine Road,Los Angeles,USA"),(4,"David","321 Elm Lane,Chicago,USA"), (5,"Emily","654 Maple Drive,Miami,USA")] schema=("emp_id","emp_name","emp_add") df=spark.createDataFrame(data,schema) df.show(truncate=False) print('OUTPUT DATAFRAME') from pyspark.sql.functions import split df1=df.withColumn("street_name",split("emp_add",",")[0])\ .withColumn("city_name",split("emp_add",",")[1])\ .withColumn("country_name",split("emp_add",",")[2]).drop("emp_add").show(truncate=False) print('O/p by using getItem() function') print('In PySpark, the getItem() function is used to retrieve an element from an array or a value from a map column within a DataFrame') df2=df.withColumn("street_name",split("emp_add",",").getItem(0))\ .withColumn("city_name",split("emp_add",",").getItem(1))\ .withColumn("country_name",split("emp_add",",").getItem(2)).drop("emp_add").show(truncate=False) THEORY -------------- In PySpark, the getItem() function is used to retrieve an element from an array or a value from a map column within a DataFrame. The split() function returns an array column in PySpark, which can be further processed using array functions or exploded into multiple rows if needed.Each element of the array corresponds to a substring resulting from the split operation.The limit argument in split() specifies the maximum number of splits to perform. This is particularly useful when you want to limit the number of resulting substrings after splitting the original string. QUESTION2 -------------------- from pyspark.sql.functions import row_number,col from pyspark.sql.window import Window print('INPUT DATASET') data = [ (1, 'Math', 90), (1, 'Science', 93), (1, 'History', 85), (2, 'Math', 85), (2, 'Science', 79), (2, 'History', 96), (3, 'Math', 95), (3, 'Science', 87), (3, 'History', 77), (4, 'Math', 78), (4, 'Science', 91), (4, 'History', 90), (5, 'Math', 92), (5, 'Science', 84), (5, 'History', 88), ] schema=("id","subject","Marks") df=spark.createDataFrame(data,schema) df.show() windowSpec = Window.partitionBy("subject").orderBy(col("Marks").desc()) df1 = df.withColumn("row_number",row_number().over(windowSpec)) print("OUTPUT AFTER APPLYING ROW_NUMBER() WINDOW FUNCTION") df1.show(truncate = False) print("OUTPUT OF STUDENT ID WHO SCORED TOP IN EACH SUBJECT") df1.filter(df1.row_number == 1).select("id","subject").show() THEORY ------------- PySpark Window Ranking functions row_number() window function gives the sequential row number starting from 1 to the result of each window partition. rank() window function provides a rank to the result within a window partition. This function leaves gaps in rank when there are ties. dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank() function difference being rank function leaves gaps in rank when there are ties. This is the same as the LAG function in SQL. The lag() function allows you to access a previous row’s value within the partition based on a specified offset. It retrieves the column value from the previous row, which can be helpful for comparative analysis or calculating differences between consecutive rows. This is the same as the LEAD function in SQL. Similar to lag(), the lead() function retrieves the column value from the following row within the partition based on a specified offset. It helps in accessing subsequent row values for comparison or predictive analysis.
Good information. Constructive criticism: You were explaining too fast. Chart explained can be part of 1 video and practical can be in another video. This way 2 videos of 10 to 12 minutes could have been helpful. Best of luck 👍🏻
Thank you for your feedback will keep this in mind ☺️
very well explained 👏
Thank you Abhishek
You're doing a great job👏
Thank you @nidhisingh5674
We can also order by date and then filter the latest date.
yes right
bhai data bhi description pr dal dia kro will be timesaviing while practicing
Yes sure bro in the later videos i did this
I used this for full address however the column shows up with null values. df=df.withColumn("FullAdress",concat(col("city")+col("Country")))
Hi @paruu16 please don't use + and use comma. This is a syntax error
bro can you please also post the data required to do these questions as well in the description.
Yes sure
pls try to improve sound quality.. 😢
Sure I will do this thank for your feedback
If Execution Memory and Storage Memory both supports data spilling into Disks why does the OOM issues occurs?
There can be multiple reasons for this even if spill to disk is possible There are some tasks which requires certain execution memory and if it is not present it may lead to oom Or If we cache some bigger data the it may also lead to oom.
plz make video on debugging in pyspark
Sure 😃
What tech r being used in your project?
Currently i am planning to do it with community Databricks and snowflake
@@pysparkpulse No I am asking..in ur new company..
So UnionBy is used when there is Interchanged or Switched/Swapped Columns in a given DataFrames isn't it Sir?. So that It will arrange Columns Linearly
Yes right
@@pysparkpulse Thanks Sir 🔥
keep uploading videos like this.
Sure thank you for your support 😊
Very helpful lecture sir thank you so much ❤
Thank you 😊
Doing good job beta 👏🏻😊
Thank you 😊
can you do one real time project
Yes I was also thinking about the same will create a project end to end in community Databricks
@@pysparkpulse please atleast 2hours one project should be unique from other RU-vidrs
Sure definitely it will be unique
You know any good DE projects available on RU-vid or Udemy which we can do?
In udemy there are many projects you can pick any according to the cloud of your choice
@@pysparkpulse thanks
you haven't faced any python coding question but still on safer side if we need to prepare for just python coding questions then what all topics of python u will suggest?
For python as DE you should be aware about list, dictionary, set functions and oops
@@pysparkpulse thanks
Interview Questions are good
Thank you sourav
Hey Priyam! Explain the process of thoughtwork. How u cleared? Regarding notice period,How to approach, How did you approach. Thanks and appreciated for all the contents
@prabhatgupta6415 Thank You for your appreciation. There will be in total 4 rounds. Will create a detailed video on interview procedure and all.
@@pysparkpulse do mention how did u tackle np. I do have same year of exp as you. Need some guidance on that. Thanks🤗
@@prabhatgupta6415 Sure
Can you make GitHub repo of all questions or store somewhere so it's better to get at one go and practice
Sure will do this
Explain ur interview process
@pysparkpulse 5 месяцев назад
can you pls share the ppt also. it will be very helpful for last time rivison.
Sure Will attach it
Hi Sir, pyspark: joinDf = df.alias('a').join(df.alias('b'), col('a.ManagerId')==col('b.Id')) final_df = joinDf.groupBy('b.Id', 'b.Name').agg(count(col('a.Id')).alias('No_of_reportees')).filter(col('No_of_reportees')>=3) final_df.show() %sql with cte as( select ManagerId, count(*) as no_of_reports from office group by ManagerId having no_of_reports >=5) select o.Name as Emp_Name, c.no_of_reports from cte c join office o on c.ManagerId = o.Id Emp_Name no_of_reports Jane Smith 6 David Lee 5
Great work keep going 💯
%sql with result as ( select medication_id, count(distinct(doctor_id)) as count_distinct_doc from prescriptions group by medication_id having count_distinct_doc >=3) select m.medication_name from result r join medications m on r.medication_id = m.medication_id
Yes correct 💯😀
Thank you Sir
Most welcome
Thankyou sir
Thank you for your appreciation 😊
Thank you Sir, keep doing the great work
Yes thank you 🙂
Nice Explanation! Thank you!
Thank you @swaraj
Thank you Sir, very nice Qs
Thank you @rawat 😊
Sir please provide data
Thank you Sir
Thank you 😊
thank you and please make a video on how to explain project in the interview it will really help
Sure 😊
df_bonus = df.withColumn("bonus", when((col('department') == 'Sales') & (col('salary') > 50000), 0.10 * col('salary')) .otherwise( when((col('department') == 'Marketing') & (col('salary') > 60000), 0.10 * col('salary')) .otherwise(0.5 * col('salary')) ) ) high_salary_employees = df.filter(col('salary') > 70000).orderBy(col('salary').desc()).limit(5) high_salary_employees.show()
Hi Sir, can you please add the schema and the data in future videos .. Thanks
Hi Rawat, yes I am adding these you can check my recent videos