Тёмный
No video :(

Building Serverless Data Stream pipeline using Kinesis data streams and Firehose for Snowflake 

Knowledge Amplifier
Подписаться 28 тыс.
Просмотров 4,4 тыс.
50% 1

One of the toughest challenges data professionals face today is streaming data for real-time analytics.
A main barrier to real-time insights remains the complexity of the data itself, where companies do not have the tools and infrastructure to ingest and process structured and unstructured data.
Organizations today need a data warehouse that is able to handle all data types and scale quickly to address growth.
Here is one complete pipeline on Velocity component in Big Data where I have explained how to create Streaming pipeline from scratch
Steps:
-----------
Step 1:Create the Lambda Role
Step 2:Create the Lambda Function to read the data from API Gateway & put in Kinesis Data Stream
Step 3: Create API Gateway & make the integration with AWS Lambda created in Step 2
Step 4:Create the Kinesis Data Stream to consume data from AWS Lambda created in Step 2
Step 5:Create Lambda for processing the data before s3 dump
Step 6:Create firehose Destination s3 bucket
Step 7:Create Kinesis Firehose
Step 8:Create Snowflake Role
Lambda for Step 2:
-------------------------------
import json
import datetime
import random
import boto3
client = boto3.client('kinesis')
def lambda_handler(event, context):
TODO implement
data = json.dumps(event['body'])
client.put_record(StreamName="hellotesting", Data=data, PartitionKey="1")
print("Data Inserted")
Lambda for Firehose Transformation(Step 5):
-------------------------------------------------------------------------
import json
import boto3
import base64
output = []
def lambda_handler(event, context):
print(event)
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)
row_w_newline = payload + "
"
print('row_w_newline type:', type(row_w_newline))
row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
Snowflake Code:
---------------------------
--Specify the role
use role ACCOUNTADMIN;
drop database if exists s3_to_snowflake;
--Database Creation
create database if not exists s3_to_snowflake;
--Specify the active/current database for the session.
use s3_to_snowflake;
--Storage Integration Creation
create or replace storage integration s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '{}'
STORAGE_ALLOWED_LOCATIONS = ('s3://{}')
COMMENT = 'Testing Snowflake getting refresh or not';
--Describe the Integration Object
DESC INTEGRATION s3_int;
--External Stage Creation
create stage mystage
url = 's3://{}'
storage_integration = s3_int;
list @mystage;
--File Format Creation
create or replace file format my_json_format
type = json;
--Table Creation
create or replace external table s3_to_snowflake.PUBLIC.Person with location = @mystage file_format ='my_json_format';
--Query the table
select parse_json(VALUE):Age as Age , trim(parse_json(VALUE):Name,'"') as Name from s3_to_snowflake.PUBLIC.Person;
Note:
----------
1)Please delete all used AWS resources if not in use else it will be creating billing!
2)As this is POC , so I gave full access for many roles creation , while moving to Production make sure to provide only that much access which is required!
3)parse_json in Snowflake Interprets an input string as a JSON document, producing a VARIANT value.
Check this playlist for more AWS Projects in Big Data domain:
• Demystifying Data Engi...
Know more about AWS Kinesis :
---------------------------------------------------
aws.amazon.com...

Опубликовано:

 

5 сен 2024

Поделиться:

Ссылка:

Скачать:

Готовим ссылку...

Добавить в:

Мой плейлист
Посмотреть позже
Комментарии : 17   
@shayankabasi160
@shayankabasi160 2 года назад
your channel is a gem to learn real time data engineering
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Thank you for your inspiring comment Shayan Kabasi! Happy Learning :-)
@srinivasch5661
@srinivasch5661 2 года назад
Its a detailed video with all the explanation, learnt how to stream , thank you so much.
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Glad to hear that srinivas ch! Happy Learning :-)
@lexact1497
@lexact1497 Год назад
you are amazing, thank you so much for sharing these videos
@KnowledgeAmplifier1
@KnowledgeAmplifier1 Год назад
Thank you Lexact! Happy Learning
@manojt7012
@manojt7012 2 года назад
Thanks a lot. it was so helpful
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
Glad to know the video helped you Manoj T! Happy Learning :-)
@anandbabu01
@anandbabu01 10 месяцев назад
AWS APIGW can directly write to Kenesis streams, lamda function not mandatory
@fullstackdataengineering7199
@fullstackdataengineering7199 2 года назад
Really a nice video on streaming. One basic question I am having is whether OLAP systems are used for real-time streaming. Generally we use OLTP databases or NOSQL databases for quick response. Please lemme know whether my understanding is correct or not?
@Buzzingfact
@Buzzingfact 2 года назад
Hello Sir, What minimum tools we should learn with Snowflake for better career growth
@KnowledgeAmplifier1
@KnowledgeAmplifier1 2 года назад
There is no limitation in software engineering 🙂 Learn as much as possible diff integration tools with Snowflake , it's advanced feature of integration like Snowpipe for streaming data ingestion , Snowflake connector with Python , Spark etc are also used many times in many pipelines
@parikshithshivaprakash5523
@parikshithshivaprakash5523 2 года назад
can we send images or binary files to stream it
@chinrasufamily
@chinrasufamily 2 года назад
Yes. you can ingest real-time data such as video, audio, application logs, website clickstreams, and IoT telemetry data
@parikshithshivaprakash5523
@parikshithshivaprakash5523 2 года назад
@@chinrasufamily bro how to send large files to s3 using api
@chinrasufamily
@chinrasufamily 2 года назад
@@parikshithshivaprakash5523 By having an API call which triggers a Lambda function to return a pre-signed URL to the client which will allow upload directly to S3. That way you won't have to deal with the limit in API Gateway.
@parikshithshivaprakash5523
@parikshithshivaprakash5523 2 года назад
@@chinrasufamily bro upto 5gb it can handle that’s it above that it cannot
Далее
AWS Lambda function as a Kinesis consumer
13:45
Просмотров 3,7 тыс.
Snowflake Snowpipe - Email Alert Mechanism
22:54
Просмотров 5 тыс.
Data Masking in Snowflake
17:10
Просмотров 5 тыс.