Загрузка страницы

Transient Cluster on AWS from Scratch using boto3 | Trigger Spark job from AWS Lambda

This video demonstrates a cost-effective and automated solution for running Spark-Jobs on the EMR cluster on a daily basis using CloudWatch, Lambda, EMR, S3 (you can add SES for sending email after completion of the process too).

Hope this will be helpful!

Steps:
----------
Step 1: Code bucket Creation
Step 2: Source Bucket Creation
Step 3: Destination bucket creation
Step 4: IAM Role for Lambda Creation--s3,emr,cloud-watch
Step 5: Lambda Function with s3 trigger creation

Lambda Function:
----------------------------
import json;
import boto3;

client = boto3.client('emr', region_name='us-west-2',aws_access_key_id='',aws_secret_access_key='')
def lambda_handler(event, context):
file_name = event['Records'][0]['s3']['object']['key']
bucketName=event['Records'][0]['s3']['bucket']['name']
print("File Name : ",file_name)
print("Bucket Name : ",bucketName)
backend_code="{backend_code_location}"
spark_submit = [
'spark-submit',
'--master', 'yarn',
'--deploy-mode', 'cluster',
backend_code,
bucketName,
file_name
]
print("Spark Submit : ",spark_submit)
cluster_id = client.run_job_flow(
Name="transient_demo_testing",
Instances={
'InstanceGroups': [
{
'Name': "Master",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm1.xlarge',
'InstanceCount': 1,
},
{
'Name': "Slave",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm1.xlarge',
'InstanceCount': 2,
}
],
'Ec2KeyName': '{key_name',
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': 'subnet-XXXXXXXX',
},
LogUri="{Specify s3 location}",
ReleaseLabel= '{Specify emr version}',
Steps=[{"Name": "testJobGURU",
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': spark_submit
}
}],
BootstrapActions=[],
VisibleToAllUsers=True,
JobFlowRole="EMR_EC2_DefaultRole",
ServiceRole="EMR_DefaultRole",
Applications = [ {'Name': 'Spark'},{'Name':'Hive'}])

Code Breakdown:
---------------------------
LogUri - The path to the Amazon S3 location where logs for this cluster are stored.

Applications - The applications installed on this cluster.Hive, and Spark have been chosen here. There are other applications available such as Hadoop,Pig, Oozie, Zookeeper, etc.
Example : If you require Hadoop , Hive, Spark then you can specify the configuration like this --
Applications = [
{'Name' : 'Hadoop'},
{'Name' : 'Hive'},
{'Name' : 'Spark'}
]

Instances - Describes the Amazon EC2 instances of the job flow.

InstanceGroups - This represents an instance group, which is a group of instances that have a common purpose. For example, the CORE instance group is used for HDFS.

Market - The marketplace to provision instances for this group. Valid values are ON_DEMAND or SPOT.

TerminationProtected - Indicates whether Amazon EMR will lock the cluster to prevent the EC2 instances from being terminated by an API call or user intervention, or in the event of a cluster error.

JobFlowRole - Also called instance profile and EC2 role. An IAM role for an EMR cluster. The EC2 instances of the cluster assume this role. The default role is EMR_EC2_DefaultRole.

ServiceRole - The IAM role that will be assumed by the Amazon EMR service to access AWS resources on your behalf.

VisibleToAllUsers-Indicates whether the cluster is visible to IAM principals in the Amazon Web Services account associated with the cluster.

ReleaseLabel -- The Amazon EMR release label, which determines the version of open-source application packages installed on the cluster.

Ec2KeyName -- The name of the EC2 key pair that can be used to connect to the master node using SSH as the user called "hadoop."

KeepJobFlowAliveWhenNoSteps -- Specifies whether the cluster should remain available after completing all steps. Defaults to true .

For details , you can refer this documentation --
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html

Backend code:
------------------------
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Oracle_to_snowflake_via_S3") \
.getOrCreate()

def main():
s3_bucket=sys.argv[1];
s3_file=sys.argv[2];
s3_location="s3a://{}/{}".format(s3_bucket,s3_file);
iris = spark.read.format("csv").option("inferSchema","true").option("header","true").load(s3_location);
ms=iris.groupBy("class").count()
ms.coalesce(1).write.format("csv").option("header", "true").save("s3a://{Destination Bucket}/{}".format(s3_file.split('.')[0]))

main()

Check this playlist for more AWS Projects in Big Data domain:
https://youtube.com/playlist?list=PLjfRmoYoxpNopPjdACgS5XTfdjyBcuGku

Видео Transient Cluster on AWS from Scratch using boto3 | Trigger Spark job from AWS Lambda канала Knowledge Amplifier
Показать
Комментарии отсутствуют
Введите заголовок:

Введите адрес ссылки:

Введите адрес видео с YouTube:

Зарегистрируйтесь или войдите с
Информация о видео
19 сентября 2021 г. 14:52:36
00:37:19
Яндекс.Метрика