Тёмный

Transient Cluster on AWS from Scratch using boto3 | Trigger Spark job from AWS Lambda 

Knowledge Amplifier
Подписаться 27 тыс.
Просмотров 8 тыс.
50% 1

This video demonstrates a cost-effective and automated solution for running Spark-Jobs on the EMR cluster on a daily basis using CloudWatch, Lambda, EMR, S3 (you can add SES for sending email after completion of the process too).
Hope this will be helpful!
Steps:
----------
Step 1: Code bucket Creation
Step 2: Source Bucket Creation
Step 3: Destination bucket creation
Step 4: IAM Role for Lambda Creation--s3,emr,cloud-watch
Step 5: Lambda Function with s3 trigger creation
Lambda Function:
----------------------------
import json;
import boto3;
client = boto3.client('emr', region_name='us-west-2',aws_access_key_id='',aws_secret_access_key='')
def lambda_handler(event, context):
file_name = event['Records'][0]['s3']['object']['key']
bucketName=event['Records'][0]['s3']['bucket']['name']
print("File Name : ",file_name)
print("Bucket Name : ",bucketName)
backend_code="{backend_code_location}"
spark_submit = [
'spark-submit',
'--master', 'yarn',
'--deploy-mode', 'cluster',
backend_code,
bucketName,
file_name
]
print("Spark Submit : ",spark_submit)
cluster_id = client.run_job_flow(
Name="transient_demo_testing",
Instances={
'InstanceGroups': [
{
'Name': "Master",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm1.xlarge',
'InstanceCount': 1,
},
{
'Name': "Slave",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm1.xlarge',
'InstanceCount': 2,
}
],
'Ec2KeyName': '{key_name',
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': 'subnet-XXXXXXXX',
},
LogUri="{Specify s3 location}",
ReleaseLabel= '{Specify emr version}',
Steps=[{"Name": "testJobGURU",
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': spark_submit
}
}],
BootstrapActions=[],
VisibleToAllUsers=True,
JobFlowRole="EMR_EC2_DefaultRole",
ServiceRole="EMR_DefaultRole",
Applications = [ {'Name': 'Spark'},{'Name':'Hive'}])
Code Breakdown:
---------------------------
LogUri - The path to the Amazon S3 location where logs for this cluster are stored.
Applications - The applications installed on this cluster.Hive, and Spark have been chosen here. There are other applications available such as Hadoop,Pig, Oozie, Zookeeper, etc.
Example : If you require Hadoop , Hive, Spark then you can specify the configuration like this --
Applications = [
{'Name' : 'Hadoop'},
{'Name' : 'Hive'},
{'Name' : 'Spark'}
]
Instances - Describes the Amazon EC2 instances of the job flow.
InstanceGroups - This represents an instance group, which is a group of instances that have a common purpose. For example, the CORE instance group is used for HDFS.
Market - The marketplace to provision instances for this group. Valid values are ON_DEMAND or SPOT.
TerminationProtected - Indicates whether Amazon EMR will lock the cluster to prevent the EC2 instances from being terminated by an API call or user intervention, or in the event of a cluster error.
JobFlowRole - Also called instance profile and EC2 role. An IAM role for an EMR cluster. The EC2 instances of the cluster assume this role. The default role is EMR_EC2_DefaultRole.
ServiceRole - The IAM role that will be assumed by the Amazon EMR service to access AWS resources on your behalf.
VisibleToAllUsers-Indicates whether the cluster is visible to IAM principals in the Amazon Web Services account associated with the cluster.
ReleaseLabel -- The Amazon EMR release label, which determines the version of open-source application packages installed on the cluster.
Ec2KeyName -- The name of the EC2 key pair that can be used to connect to the master node using SSH as the user called "hadoop."
KeepJobFlowAliveWhenNoSteps -- Specifies whether the cluster should remain available after completing all steps. Defaults to true .
For details , you can refer this documentation --
boto3.amazonaws.com/v1/docume...
Backend code:
------------------------
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Oracle_to_snowflake_via_S3") \
.getOrCreate()
def main():
s3_bucket=sys.argv[1];
s3_file=sys.argv[2];
s3_location="s3a://{}/{}".format(s3_bucket,s3_file);
iris = spark.read.format("csv").option("inferSchema","true").option("header","true").load(s3_location);
ms=iris.groupBy("class").count()
ms.coalesce(1).write.format("csv").option("header", "true").save("s3a://{Destination Bucket}/{}".format(s3_file.split('.')[0]))
main()
Check this playlist for more AWS Projects in Big Data domain:
• Demystifying Data Engi...

Наука

Опубликовано:

 

18 сен 2021

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 48   
@nishantsingh8477
@nishantsingh8477 2 года назад
Your transient node was amazing.This would be an underrated video
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Thank you Nishant Singh! Happy Learning :-)
@user-ee6hu4ec5v
@user-ee6hu4ec5v 29 дней назад
How come this video has too less views, this is so well explained. Salute to your effort brother.
@KnowledgeAmplifier1
@KnowledgeAmplifier1 29 дней назад
Thank you for your kind words @user-ee6hu4ec5v! Happy Learning
@parasgadhiya5356
@parasgadhiya5356 Год назад
What an effort, Brother... Really Appreciate it.
@KnowledgeAmplifier1
@KnowledgeAmplifier1 Год назад
Thank you PARAS GADHIYA! Happy Learning
@singhsVP
@singhsVP 8 месяцев назад
Great Explanation and demo, thank you
@KnowledgeAmplifier1
@KnowledgeAmplifier1 8 месяцев назад
You are welcome! If you want to implement the same using Airlfow , have a watch in this video too -- ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-hK4kPvJawv8.htmlsi=FpmG3YjcAp-nB8Hc Happy Learning!
@aniketjadhav32
@aniketjadhav32 Год назад
Great tutorial
@amarnadhchithari8992
@amarnadhchithari8992 2 года назад
very good explanation.....thank you
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
You are welcome Amarnadh Chithari! Happy Learning :-)
@manojt7012
@manojt7012 2 года назад
Hey bro.. The content was really useful. Thanks a lot
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Glad it helped! Happy Learning :-)
@atulbisht9019
@atulbisht9019 Год назад
thank you soo much for this video
@KnowledgeAmplifier1
@KnowledgeAmplifier1 10 месяцев назад
Most welcome 😊
@thekamaalpashashow2091
@thekamaalpashashow2091 Год назад
Great video. Top notch content. Thank you so much for these!
@KnowledgeAmplifier1
@KnowledgeAmplifier1 Год назад
Thank you The Kamaal Pasha Show for your inspiring comment ! Happy Learning
@simonzhang8668
@simonzhang8668 2 года назад
nice!
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Thank you Simon Zhang! Happy Learning :-)
@alexbessette232
@alexbessette232 2 года назад
Awesome video, very helpful only note is that when initializing the client all you need is boto.client('emr') aws will automatically do the rest.
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Yes correct Alex , using Lambda roles we can avoid the access key , secret key part..
@joseneto6558
@joseneto6558 Год назад
Thanks a lot. You indian guys are the smarter of the world. I just have a question. How can I choose to use Spot instances when launching the EMR cluster from the Lambda trigger code? Is there a parameter for that?
@vishalrana302
@vishalrana302 2 года назад
Hi, how can i use bootStrapAction parameter in this lambda code
@duskbbd
@duskbbd Год назад
How to pass 2 spark submit s one after one in same single Cluster I don't want to create Cluster again for 2nd spark submit
@KnowledgeAmplifier1
@KnowledgeAmplifier1 10 месяцев назад
Hello @duskbbd, you can submit two Spark jobs sequentially in the same EMR cluster by using the following approach: Submit the first Spark job using 'spark-submit' on your EMR cluster. After submitting the first job, you can poll its status to check if it has completed. You can use AWS CLI, AWS SDKs, or EMR APIs to monitor the status of the EMR step associated with your Spark job. Once you receive confirmation that the first job has completed, you can then submit the second Spark job using 'spark-submit' on the same EMR cluster. This way, you can run multiple Spark jobs one after the other without the need to create a new EMR cluster for each job. To implement this , you can use AWS Step or Airflow for orchestration , for details you can refer this video -- ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-hK4kPvJawv8.htmlsi=WqUMzesnPzxiZjWi Happy Learning
@raghavendrahs7695
@raghavendrahs7695 2 года назад
Thank you so much. This is very helpful. I have one question regarding the job flow. How is Lambda getting triggered automatically after we place the file into the S3 bucket? Can we modify this to just run the job only once a day and process all the files available in the S3 bucket?
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Hello Raghavendra H S , answer to your first question -- Ques.)How is Lambda getting triggered automatically after we place the file into the S3 bucket? Ans.)Amazon S3 can send an event to a Lambda function when an object is created or deleted and based on that event-trigger lambda code is executed .For details you can check this link : docs.aws.amazon.com/lambda/latest/dg/with-s3.html answer to your second question -- Ques.)Can we modify this to just run the job only once a day and process all the files available in the S3 bucket? Ans.) Yes , you can do that , you can schedule Lambda using Cloudwatch or Eventbridge . For details , you can check this video -- ru-vid.com/video/%D0%B2%D0%B8%D0%B4%D0%B5%D0%BE-2cLwSTsBzJQ.html Hope this will be helpful ! Happy Learning 😊✌
@raghavendrahs7695
@raghavendrahs7695 2 года назад
@@KnowledgeAmplifier1 Thanks for your quick response. Below details are very helpful and it is very clear to me now.
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
@@raghavendrahs7695 Glad to know the resources were helpful to you ! Happy Learning :-)
@joseneto6558
@joseneto6558 Год назад
Is there anyone with clusters taking too long to start? My clusters are stuck for more than 15 minutes in the "Master: bootstrapping". How can I fix that?
@Gaurav-wy2wm
@Gaurav-wy2wm 2 года назад
Hey I have tried this but getting error that “Provided region_name ‘us_west_2’ doesn’t match a supported format
@Gaurav-wy2wm
@Gaurav-wy2wm 2 года назад
Even I have putted correct Ec2SubnetId also
@ipsitachatterjee2173
@ipsitachatterjee2173 2 года назад
Hi Sir , Do you give any classes on AWS and it's integration of other services?
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Hello Ipsita ,না, আমি কোন প্রাইভেট ক্লাস রাখি না , but , you can check this link for Complete Snowflake with AWS Course --doc.clickup.com/37466271/d/h/13qc4z-104/d4346819bd8d510 And for Data Engineering with AWS , you can check this playlist -- ru-vid.com/group/PLjfRmoYoxpNopPjdACgS5XTfdjyBcuGku Hope this will be helpful! Happy Learning :-)
@ipsitachatterjee2173
@ipsitachatterjee2173 2 года назад
@@KnowledgeAmplifier1 great,might post some queries if I face any issue real time
@uvannarayanan1048
@uvannarayanan1048 2 года назад
"ClientError: An error occurred (ValidationException) when calling the RunJobFlow operation: Invalid InstanceProfile: EMR_EC2_DefaultRole" I am getting this error in the cloudwatch logevents, what should i do?
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
You should be having EMR_EC2_DefaultRole & EMR_DefaultRole in the Roles section in your AWS Account, Please make sure that is there , or else create the roles .. as a shortcut , what you can do , try to launch one EMR Cluster manually , AWS will automatically generate the roles and then terminate the cluster and use those roles in launching transient cluster :-)
@uvannarayanan1048
@uvannarayanan1048 2 года назад
@@KnowledgeAmplifier1 Thank you for your immediate reply.. now im getting another error, "when calling the RunJobFlow operation: User: arn:aws:iam::671876216699:user/caplambda is not authorized to perform: elasticmapreduce:RunJobFlow on resource: arn:aws:elasticmapreduce:us-east-1:671876216699:cluster/* because no identity-based policy allows the elasticmapreduce:RunJobFlow action" here caplambda is the user that i created to get access key id and secret access key, i have given s3 full access, EMR full access and cloud watch full acces(for the caplambda user).. help me sir
@uvannarayanan1048
@uvannarayanan1048 2 года назад
@@KnowledgeAmplifier1 Actually after searching it in google, I got to know that we need to add "AmazonElasticMapReduce" policy also, now my cluster is running but I'm not getting the output.. I tried with pyspark commands, for example, customer_data = spark.read.format("csv").option("inferSchema","true").option("header","true").load(s3_location); customer_data = customer_data.withColumn("transactionAmount", customer_data.transactionAmount.substr(2,6)) from pyspark.sql.types import FloatType customer_data = customer_data.withColumn("transactionAmount", customer_data["transactionAmount"].cast(FloatType())) I gave comments like this and the cluster failed.. is there anyother way that i can give pyspark commands or can i upload a python notebook(.ipynb) file? If you have any suggestions pls let me know
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
@@uvannarayanan1048 you can upload the pyspark code in s3 as I demonstrated in the video and then best way to run the code in Transient cluster is first do regression testing of the same code in a dummy persistent EMR Cluster and make sure it is working as expected , then try running that in Transient cluster..
@joseneto6558
@joseneto6558 Год назад
Fact is that the Ec2KeyName must be the name of an EC2 key pair that you created beforehand.
@umangsinghal9320
@umangsinghal9320 Год назад
I followed your code step by step, cluster is also running but job is getting failed with this error, Exception in thread "main" org.apache.spark.SparkException: Application application_1666264815714_0001 finished with failed status, could you help us to know why this is happening
@jatinkr300
@jatinkr300 7 месяцев назад
If someone knows the ans please share as I am also facing same issue
Далее
Китайка Шрек поймал Зайца😂😆
00:20
Это iPhone 16
00:52
Просмотров 630 тыс.
Советы на всё лето 4 @postworkllc
00:23
AWS Lambda Layers Python | Snowflake-lambda-layer
29:45
Intro to Amazon EMR - Big Data Tutorial using Spark
22:02
Coding Interviews Be Like
5:31
Просмотров 6 млн
How to submit Spark jobs to EMR cluster from Airflow
14:38