Oct 10, 2025

Snowflake database pipeline, task, procedure, etc sample

 //create or replace database scd1db


CREATE OR REPLACE STORAGE INTEGRATION SCD1_INT


  TYPE = EXTERNAL_STAGE


  STORAGE_PROVIDER = 'S3'


  ENABLED = TRUE


  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::848922562186:role/scd1role'


  STORAGE_ALLOWED_LOCATIONS = ('s3://scd1databucket-ambi/data/');


// to describe the database integration details between S3 bucket to snowflake

  DESC INTEGRATION SCD1_INT;



  CREATE OR REPLACE STAGE SCD1_DB.PUBLIC.SCD1_STAGE


STORAGE_INTEGRATION = SCD1_INT


URL= 's3://scd1databucket-ambi/data/';



CREATE OR REPLACE PIPE SCD1_DB.PUBLIC.SCD1PIPE 


AUTO_INGEST = TRUE AS


COPY INTO SCD1_DB.PUBLIC.CUSTOMER_SOURCE


FROM 


(


SELECT


$1 AS CUSTOMERNAME,


$2 AS PHONE,


$3 ASADDRESSLINE1,


$4 AS ADDRESSLINE2,


$5 AS CITY,


$6 AS STATE,


$7 AS POSTALCODE,


$8 AS COUNTRY,


$9 AS TERRITORY,


$10 AS CONTACTFIRSTNAME,


$11 AS CONTACTLASTNAME


FROM @SCD1_DB.PUBLIC.SCD1_STAGE)


FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY ='"');




USE SCHEMA SCD1_DB.PUBLIC;


SHOW PIPES;




USE ROLE ACCOUNTADMIN;


 


USE WAREHOUSE COMPUTE_WH;


 


USE SCHEMA SCD1_DB.PUBLIC;



CREATE OR REPLACE STREAM SCD1_DB.PUBLIC.CUSTOMER_STREAM


ON TABLE SCD1_DB.PUBLIC.CUSTOMER_SOURCE


APPEND_ONLY = TRUE;




SELECT * FROM SCD1_DB.PUBLIC.CUSTOMER_STREAM;




CREATE OR REPLACE TABLE SCD1_DB.PUBLIC.CUSTOMER


(


CONTACTFIRSTNAME STRING,


CONTACTLASTNAME STRING,


CUSTOMERNAME STRING,


PHONE STRING,


ADDRESSLINE1 STRING,


ADDRESSLINE2 STRING,


CITY STRING,


STATE STRING,


POSTALCODE STRING,


COUNTRY STRING,


TERRITORY STRING,


INSERT_DTS TIMESTAMP(6),


UPDATE_DTS TIMESTAMP(6)


);





CREATE OR REPLACE PROCEDURE SCD1_DB.PUBLIC.CUSTOMER_SP()


RETURNS VARCHAR(50)


LANGUAGE JAVASCRIPT


EXECUTE AS CALLER


AS


$$


try {


 


//Create statement BEGIN, Begins a transaction in the current session


 


snowflake.execute({sqlText:`BEGIN TRANSACTION;`});


 


//load data from Customer Stream to a temp table


    


snowflake.execute({sqlText:`CREATE OR REPLACE TEMPORARY TABLE SCD1_DB.PUBLIC.CUSTOMER_TEMP


AS


SELECT


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


CURRENT_TIMESTAMP(6) AS INSERT_DTS,


CURRENT_TIMESTAMP(6) AS UPDATE_DTS


FROM 


SCD1_DB.PUBLIC.CUSTOMER_STREAM;`});


    


//Perfom the required SCD1 logic on the Customer Target table based on the primary column


 


snowflake.execute({sqlText:`MERGE INTO SCD1_DB.PUBLIC.CUSTOMER TGT


USING SCD1_DB.PUBLIC.CUSTOMER_TEMP TMP


ON TGT.CONTACTFIRSTNAME = TMP.CONTACTFIRSTNAME


AND TGT.CONTACTLASTNAME = TMP.CONTACTLASTNAME


 


WHEN MATCHED THEN UPDATE SET    


TGT.CUSTOMERNAME = TMP.CUSTOMERNAME,


TGT.PHONE = TMP.PHONE,


TGT.ADDRESSLINE1 = TMP.ADDRESSLINE1,


TGT.ADDRESSLINE2 = TMP.ADDRESSLINE2,


TGT.CITY = TMP.CITY,


TGT.STATE = TMP.STATE,


TGT.POSTALCODE = TMP.POSTALCODE,


TGT.COUNTRY = TMP.COUNTRY,


TGT.TERRITORY = TMP.TERRITORY,


TGT.UPDATE_DTS = TMP.UPDATE_DTS


            


WHEN NOT MATCHED THEN INSERT 


(


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


INSERT_DTS,


UPDATE_DTS


)


VALUES 


(


TMP.CONTACTFIRSTNAME,


TMP.CONTACTLASTNAME,


TMP.CUSTOMERNAME,


TMP.PHONE,


TMP.ADDRESSLINE1,


TMP.ADDRESSLINE2,


TMP.CITY,


TMP.STATE,


TMP.POSTALCODE,


TMP.COUNTRY,


TMP.TERRITORY,


TMP.INSERT_DTS,


TMP.UPDATE_DTS


);`});


 


//Create statement COMMIT, Commits an open transaction in the current session


 


snowflake.execute({sqlText:`COMMIT;`});


 


//Statement returned for info and debuging purposes


 


return "Store Procedure Executed Successfully";


}


 


catch (err)


{


    result = 'Error: ' + err;


    snowflake.execute({sqlText:`ROLLBACK;`});


    throw result;


}


$$;





CREATE OR REPLACE TASK SCD1_DB.PUBLIC.CUSTOMER_TASK


WAREHOUSE = COMPUTE_WH


SCHEDULE = '1 MINUTE'


WHEN SYSTEM$STREAM_HAS_DATA('SCD1_DB.PUBLIC.CUSTOMER_STREAM')


AS CALL SCD1_DB.PUBLIC.CUSTOMER_SP();




ALTER TASK SCD1_DB.PUBLIC.CUSTOMER_TASK RESUME;



SHOW TASKS;

Oct 4, 2025

Glue job sample data load python script

 import boto3

import requests

import base64

from botocore.exceptions import NoCredentialsError, PartialCredentialsError

# Example usage

github_url = "https://raw.githubusercontent.com/deacademygit/project-data/refs/heads/main/country_details.json"  # Raw file URL from GitHub

bucket_name = "sampleS3bucketname"  # The S3 bucket where the file will be uploaded

s3_key = "data/country_details.json"  # The key (path) in the S3 bucket

def fetch_data_from_github_and_upload_to_s3(github_url, bucket_name, s3_key):

    try:

        # Fetch the file content from GitHub (raw URL or GitHub API URL)

        response = requests.get(github_url)

        # Check if the request was successful

        if response.status_code == 200:

            # Get the content from the response (for raw URL, it's already plain text or binary)

            file_content = response.content

            # Initialize the S3 client

            s3_client = boto3.client('s3')

            # Upload the file content to S3

            s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=file_content)

            print(f"File uploaded successfully to s3://{bucket_name}/{s3_key}")

        else:

            print(f"Error: Failed to fetch file from GitHub. Status code {response.status_code}")

    except (NoCredentialsError, PartialCredentialsError) as e:

        print(f"Error: AWS credentials are missing or incomplete. {e}")

    except requests.exceptions.RequestException as e:

        print(f"Error fetching the file from GitHub: {e}")

    except Exception as e:

        print(f"Error uploading file to S3: {e}")

fetch_data_from_github_and_upload_to_s3(github_url, bucket_name, s3_key)

Jul 20, 2025

python script sample

 # Create a new DataFrame that includes only the title_name and genres columns, and remove the square brackets and spaces from the genres column

df = dim_titles_roku_python[['title_name', 'genres']].copy()  # make a copy to avoid modifying the original DataFrame


df.loc[:, 'genres'] = df['genres'].str.replace('[', '', regex=True).str.replace(']', '', regex=True).str.replace(' ', '', regex=True)


# df['genres'] = df['genres'].str.replace('[', '').str.replace(']', '').str.replace(' ', '')

# Use the `apply` function to split the genres column into a list of individual genres


df = df.apply(

    lambda x: pd.Series(x['genres'].split(',')),

    axis=1).stack().reset_index(level=1, drop=True)


# Use the `reset_index` function to create a new DataFrame where each genre is a separate row


df = df.reset_index(name='genre')


# Use the `groupby` and `size` functions to count the number of occurrences of each genre


df = df.groupby('genre').size().reset_index(name='frequency')


# Sort the resulting DataFrame by the frequency column in descending order


df = df.sort_values(by='frequency', ascending=False).head(10)

print(df)

Jul 18, 2025

Top & Bottom Categories each Year for Men and Woman based on Product Reviews

 df = dim_category_nike_python.copy()

#print(df)


product = dim_product_nike_python.copy()


product = product.merge(df,on='category_id',how='inner')


product = product[['order_date','gender','category_name','product_reviews']]


product['order_date'] = pd.to_datetime(product['order_date']).dt.year

product=product.groupby(['order_date','gender','category_name'])['product_reviews'].mean().reset_index()


product['top_rank']=product.groupby(['order_date','gender'])['product_reviews'].rank(ascending=False)


product['bottom_rank']=product.groupby(['order_date','gender'])['product_reviews'].rank(ascending=True)


new_df = product[(product['top_rank']==1)|(product['bottom_rank']==1)]



new_df = new_df.copy()


new_df.loc[:,'Category_type'] = new_df.apply(lambda row: 'Top' if row['top_rank']== 1 else 'Bottom',axis=1 )



grouped  =new_df.groupby(['order_date','gender','Category_type'])['category_name'].first().reset_index()



pivot_df = grouped.pivot(index='order_date',columns=['gender','Category_type'],values='category_name')



pivot_df.columns = [  f"{gender}_{abc}_Category"  for gender,abc in pivot_df.columns ]


pivot_df = pivot_df.reset_index()



column = {'men_Bottom_Category':'Men_Bottom_Category',

    'woman_Top_Category':'Woman_Top_Category',

    'woman_Bottom_Category':'Woman_Bottom_Category',

    'men_Top_Category':'Men_Top_Category',

    'order_date':'order_year'

    

}


pivot_df=pivot_df.rename(columns = column)



#print(product[product['order_date']==2020])


print(pivot_df)

python script to aggregate , merge, sort etc

 df= fact_orders_grubhub_python.merge(dim_delivery_person_grubhub_python, on='Delivery_person_ID')

# Use the `groupby` and `agg` functions to compute the average age and rating for each city_type

df = df.groupby('city_type').agg({

    'Delivery_person_Age': 'mean',

    'Delivery_person_Ratings': 'mean'

}).reset_index()

# Use the `round` function to round the age and rating columns to 2 decimal places

df['Delivery_person_Age'] = df['Delivery_person_Age'].round(2)

df['Delivery_person_Ratings'] = df['Delivery_person_Ratings'].round(2)


# Rename the columns to match the names in the SQL query

df.rename(columns={

    'Delivery_person_Age': 'avg_age',

    'Delivery_person_Ratings': 'avg_rating'

}, inplace=True)


print(df.head(5))