Oct 10, 2025

Snowflake database pipeline, task, procedure, etc sample

 //create or replace database scd1db


CREATE OR REPLACE STORAGE INTEGRATION SCD1_INT


  TYPE = EXTERNAL_STAGE


  STORAGE_PROVIDER = 'S3'


  ENABLED = TRUE


  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::848922562186:role/scd1role'


  STORAGE_ALLOWED_LOCATIONS = ('s3://scd1databucket-ambi/data/');


// to describe the database integration details between S3 bucket to snowflake

  DESC INTEGRATION SCD1_INT;



  CREATE OR REPLACE STAGE SCD1_DB.PUBLIC.SCD1_STAGE


STORAGE_INTEGRATION = SCD1_INT


URL= 's3://scd1databucket-ambi/data/';



CREATE OR REPLACE PIPE SCD1_DB.PUBLIC.SCD1PIPE 


AUTO_INGEST = TRUE AS


COPY INTO SCD1_DB.PUBLIC.CUSTOMER_SOURCE


FROM 


(


SELECT


$1 AS CUSTOMERNAME,


$2 AS PHONE,


$3 ASADDRESSLINE1,


$4 AS ADDRESSLINE2,


$5 AS CITY,


$6 AS STATE,


$7 AS POSTALCODE,


$8 AS COUNTRY,


$9 AS TERRITORY,


$10 AS CONTACTFIRSTNAME,


$11 AS CONTACTLASTNAME


FROM @SCD1_DB.PUBLIC.SCD1_STAGE)


FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY ='"');




USE SCHEMA SCD1_DB.PUBLIC;


SHOW PIPES;




USE ROLE ACCOUNTADMIN;


 


USE WAREHOUSE COMPUTE_WH;


 


USE SCHEMA SCD1_DB.PUBLIC;



CREATE OR REPLACE STREAM SCD1_DB.PUBLIC.CUSTOMER_STREAM


ON TABLE SCD1_DB.PUBLIC.CUSTOMER_SOURCE


APPEND_ONLY = TRUE;




SELECT * FROM SCD1_DB.PUBLIC.CUSTOMER_STREAM;




CREATE OR REPLACE TABLE SCD1_DB.PUBLIC.CUSTOMER


(


CONTACTFIRSTNAME STRING,


CONTACTLASTNAME STRING,


CUSTOMERNAME STRING,


PHONE STRING,


ADDRESSLINE1 STRING,


ADDRESSLINE2 STRING,


CITY STRING,


STATE STRING,


POSTALCODE STRING,


COUNTRY STRING,


TERRITORY STRING,


INSERT_DTS TIMESTAMP(6),


UPDATE_DTS TIMESTAMP(6)


);





CREATE OR REPLACE PROCEDURE SCD1_DB.PUBLIC.CUSTOMER_SP()


RETURNS VARCHAR(50)


LANGUAGE JAVASCRIPT


EXECUTE AS CALLER


AS


$$


try {


 


//Create statement BEGIN, Begins a transaction in the current session


 


snowflake.execute({sqlText:`BEGIN TRANSACTION;`});


 


//load data from Customer Stream to a temp table


    


snowflake.execute({sqlText:`CREATE OR REPLACE TEMPORARY TABLE SCD1_DB.PUBLIC.CUSTOMER_TEMP


AS


SELECT


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


CURRENT_TIMESTAMP(6) AS INSERT_DTS,


CURRENT_TIMESTAMP(6) AS UPDATE_DTS


FROM 


SCD1_DB.PUBLIC.CUSTOMER_STREAM;`});


    


//Perfom the required SCD1 logic on the Customer Target table based on the primary column


 


snowflake.execute({sqlText:`MERGE INTO SCD1_DB.PUBLIC.CUSTOMER TGT


USING SCD1_DB.PUBLIC.CUSTOMER_TEMP TMP


ON TGT.CONTACTFIRSTNAME = TMP.CONTACTFIRSTNAME


AND TGT.CONTACTLASTNAME = TMP.CONTACTLASTNAME


 


WHEN MATCHED THEN UPDATE SET    


TGT.CUSTOMERNAME = TMP.CUSTOMERNAME,


TGT.PHONE = TMP.PHONE,


TGT.ADDRESSLINE1 = TMP.ADDRESSLINE1,


TGT.ADDRESSLINE2 = TMP.ADDRESSLINE2,


TGT.CITY = TMP.CITY,


TGT.STATE = TMP.STATE,


TGT.POSTALCODE = TMP.POSTALCODE,


TGT.COUNTRY = TMP.COUNTRY,


TGT.TERRITORY = TMP.TERRITORY,


TGT.UPDATE_DTS = TMP.UPDATE_DTS


            


WHEN NOT MATCHED THEN INSERT 


(


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


INSERT_DTS,


UPDATE_DTS


)


VALUES 


(


TMP.CONTACTFIRSTNAME,


TMP.CONTACTLASTNAME,


TMP.CUSTOMERNAME,


TMP.PHONE,


TMP.ADDRESSLINE1,


TMP.ADDRESSLINE2,


TMP.CITY,


TMP.STATE,


TMP.POSTALCODE,


TMP.COUNTRY,


TMP.TERRITORY,


TMP.INSERT_DTS,


TMP.UPDATE_DTS


);`});


 


//Create statement COMMIT, Commits an open transaction in the current session


 


snowflake.execute({sqlText:`COMMIT;`});


 


//Statement returned for info and debuging purposes


 


return "Store Procedure Executed Successfully";


}


 


catch (err)


{


    result = 'Error: ' + err;


    snowflake.execute({sqlText:`ROLLBACK;`});


    throw result;


}


$$;





CREATE OR REPLACE TASK SCD1_DB.PUBLIC.CUSTOMER_TASK


WAREHOUSE = COMPUTE_WH


SCHEDULE = '1 MINUTE'


WHEN SYSTEM$STREAM_HAS_DATA('SCD1_DB.PUBLIC.CUSTOMER_STREAM')


AS CALL SCD1_DB.PUBLIC.CUSTOMER_SP();




ALTER TASK SCD1_DB.PUBLIC.CUSTOMER_TASK RESUME;



SHOW TASKS;

Oct 4, 2025

Glue job sample data load python script

 import boto3

import requests

import base64

from botocore.exceptions import NoCredentialsError, PartialCredentialsError

# Example usage

github_url = "https://raw.githubusercontent.com/deacademygit/project-data/refs/heads/main/country_details.json"  # Raw file URL from GitHub

bucket_name = "sampleS3bucketname"  # The S3 bucket where the file will be uploaded

s3_key = "data/country_details.json"  # The key (path) in the S3 bucket

def fetch_data_from_github_and_upload_to_s3(github_url, bucket_name, s3_key):

    try:

        # Fetch the file content from GitHub (raw URL or GitHub API URL)

        response = requests.get(github_url)

        # Check if the request was successful

        if response.status_code == 200:

            # Get the content from the response (for raw URL, it's already plain text or binary)

            file_content = response.content

            # Initialize the S3 client

            s3_client = boto3.client('s3')

            # Upload the file content to S3

            s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=file_content)

            print(f"File uploaded successfully to s3://{bucket_name}/{s3_key}")

        else:

            print(f"Error: Failed to fetch file from GitHub. Status code {response.status_code}")

    except (NoCredentialsError, PartialCredentialsError) as e:

        print(f"Error: AWS credentials are missing or incomplete. {e}")

    except requests.exceptions.RequestException as e:

        print(f"Error fetching the file from GitHub: {e}")

    except Exception as e:

        print(f"Error uploading file to S3: {e}")

fetch_data_from_github_and_upload_to_s3(github_url, bucket_name, s3_key)