Oct 10, 2025

Snowflake database pipeline, task, procedure, etc sample

 //create or replace database scd1db


CREATE OR REPLACE STORAGE INTEGRATION SCD1_INT


  TYPE = EXTERNAL_STAGE


  STORAGE_PROVIDER = 'S3'


  ENABLED = TRUE


  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::848922562186:role/scd1role'


  STORAGE_ALLOWED_LOCATIONS = ('s3://scd1databucket-ambi/data/');


// to describe the database integration details between S3 bucket to snowflake

  DESC INTEGRATION SCD1_INT;



  CREATE OR REPLACE STAGE SCD1_DB.PUBLIC.SCD1_STAGE


STORAGE_INTEGRATION = SCD1_INT


URL= 's3://scd1databucket-ambi/data/';



CREATE OR REPLACE PIPE SCD1_DB.PUBLIC.SCD1PIPE 


AUTO_INGEST = TRUE AS


COPY INTO SCD1_DB.PUBLIC.CUSTOMER_SOURCE


FROM 


(


SELECT


$1 AS CUSTOMERNAME,


$2 AS PHONE,


$3 ASADDRESSLINE1,


$4 AS ADDRESSLINE2,


$5 AS CITY,


$6 AS STATE,


$7 AS POSTALCODE,


$8 AS COUNTRY,


$9 AS TERRITORY,


$10 AS CONTACTFIRSTNAME,


$11 AS CONTACTLASTNAME


FROM @SCD1_DB.PUBLIC.SCD1_STAGE)


FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY ='"');




USE SCHEMA SCD1_DB.PUBLIC;


SHOW PIPES;




USE ROLE ACCOUNTADMIN;


 


USE WAREHOUSE COMPUTE_WH;


 


USE SCHEMA SCD1_DB.PUBLIC;



CREATE OR REPLACE STREAM SCD1_DB.PUBLIC.CUSTOMER_STREAM


ON TABLE SCD1_DB.PUBLIC.CUSTOMER_SOURCE


APPEND_ONLY = TRUE;




SELECT * FROM SCD1_DB.PUBLIC.CUSTOMER_STREAM;




CREATE OR REPLACE TABLE SCD1_DB.PUBLIC.CUSTOMER


(


CONTACTFIRSTNAME STRING,


CONTACTLASTNAME STRING,


CUSTOMERNAME STRING,


PHONE STRING,


ADDRESSLINE1 STRING,


ADDRESSLINE2 STRING,


CITY STRING,


STATE STRING,


POSTALCODE STRING,


COUNTRY STRING,


TERRITORY STRING,


INSERT_DTS TIMESTAMP(6),


UPDATE_DTS TIMESTAMP(6)


);





CREATE OR REPLACE PROCEDURE SCD1_DB.PUBLIC.CUSTOMER_SP()


RETURNS VARCHAR(50)


LANGUAGE JAVASCRIPT


EXECUTE AS CALLER


AS


$$


try {


 


//Create statement BEGIN, Begins a transaction in the current session


 


snowflake.execute({sqlText:`BEGIN TRANSACTION;`});


 


//load data from Customer Stream to a temp table


    


snowflake.execute({sqlText:`CREATE OR REPLACE TEMPORARY TABLE SCD1_DB.PUBLIC.CUSTOMER_TEMP


AS


SELECT


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


CURRENT_TIMESTAMP(6) AS INSERT_DTS,


CURRENT_TIMESTAMP(6) AS UPDATE_DTS


FROM 


SCD1_DB.PUBLIC.CUSTOMER_STREAM;`});


    


//Perfom the required SCD1 logic on the Customer Target table based on the primary column


 


snowflake.execute({sqlText:`MERGE INTO SCD1_DB.PUBLIC.CUSTOMER TGT


USING SCD1_DB.PUBLIC.CUSTOMER_TEMP TMP


ON TGT.CONTACTFIRSTNAME = TMP.CONTACTFIRSTNAME


AND TGT.CONTACTLASTNAME = TMP.CONTACTLASTNAME


 


WHEN MATCHED THEN UPDATE SET    


TGT.CUSTOMERNAME = TMP.CUSTOMERNAME,


TGT.PHONE = TMP.PHONE,


TGT.ADDRESSLINE1 = TMP.ADDRESSLINE1,


TGT.ADDRESSLINE2 = TMP.ADDRESSLINE2,


TGT.CITY = TMP.CITY,


TGT.STATE = TMP.STATE,


TGT.POSTALCODE = TMP.POSTALCODE,


TGT.COUNTRY = TMP.COUNTRY,


TGT.TERRITORY = TMP.TERRITORY,


TGT.UPDATE_DTS = TMP.UPDATE_DTS


            


WHEN NOT MATCHED THEN INSERT 


(


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


INSERT_DTS,


UPDATE_DTS


)


VALUES 


(


TMP.CONTACTFIRSTNAME,


TMP.CONTACTLASTNAME,


TMP.CUSTOMERNAME,


TMP.PHONE,


TMP.ADDRESSLINE1,


TMP.ADDRESSLINE2,


TMP.CITY,


TMP.STATE,


TMP.POSTALCODE,


TMP.COUNTRY,


TMP.TERRITORY,


TMP.INSERT_DTS,


TMP.UPDATE_DTS


);`});


 


//Create statement COMMIT, Commits an open transaction in the current session


 


snowflake.execute({sqlText:`COMMIT;`});


 


//Statement returned for info and debuging purposes


 


return "Store Procedure Executed Successfully";


}


 


catch (err)


{


    result = 'Error: ' + err;


    snowflake.execute({sqlText:`ROLLBACK;`});


    throw result;


}


$$;





CREATE OR REPLACE TASK SCD1_DB.PUBLIC.CUSTOMER_TASK


WAREHOUSE = COMPUTE_WH


SCHEDULE = '1 MINUTE'


WHEN SYSTEM$STREAM_HAS_DATA('SCD1_DB.PUBLIC.CUSTOMER_STREAM')


AS CALL SCD1_DB.PUBLIC.CUSTOMER_SP();




ALTER TASK SCD1_DB.PUBLIC.CUSTOMER_TASK RESUME;



SHOW TASKS;

Oct 4, 2025

Glue job sample data load python script

 import boto3

import requests

import base64

from botocore.exceptions import NoCredentialsError, PartialCredentialsError

# Example usage

github_url = "https://raw.githubusercontent.com/deacademygit/project-data/refs/heads/main/country_details.json"  # Raw file URL from GitHub

bucket_name = "sampleS3bucketname"  # The S3 bucket where the file will be uploaded

s3_key = "data/country_details.json"  # The key (path) in the S3 bucket

def fetch_data_from_github_and_upload_to_s3(github_url, bucket_name, s3_key):

    try:

        # Fetch the file content from GitHub (raw URL or GitHub API URL)

        response = requests.get(github_url)

        # Check if the request was successful

        if response.status_code == 200:

            # Get the content from the response (for raw URL, it's already plain text or binary)

            file_content = response.content

            # Initialize the S3 client

            s3_client = boto3.client('s3')

            # Upload the file content to S3

            s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=file_content)

            print(f"File uploaded successfully to s3://{bucket_name}/{s3_key}")

        else:

            print(f"Error: Failed to fetch file from GitHub. Status code {response.status_code}")

    except (NoCredentialsError, PartialCredentialsError) as e:

        print(f"Error: AWS credentials are missing or incomplete. {e}")

    except requests.exceptions.RequestException as e:

        print(f"Error fetching the file from GitHub: {e}")

    except Exception as e:

        print(f"Error uploading file to S3: {e}")

fetch_data_from_github_and_upload_to_s3(github_url, bucket_name, s3_key)

Jul 20, 2025

python script sample

 # Create a new DataFrame that includes only the title_name and genres columns, and remove the square brackets and spaces from the genres column

df = dim_titles_roku_python[['title_name', 'genres']].copy()  # make a copy to avoid modifying the original DataFrame


df.loc[:, 'genres'] = df['genres'].str.replace('[', '', regex=True).str.replace(']', '', regex=True).str.replace(' ', '', regex=True)


# df['genres'] = df['genres'].str.replace('[', '').str.replace(']', '').str.replace(' ', '')

# Use the `apply` function to split the genres column into a list of individual genres


df = df.apply(

    lambda x: pd.Series(x['genres'].split(',')),

    axis=1).stack().reset_index(level=1, drop=True)


# Use the `reset_index` function to create a new DataFrame where each genre is a separate row


df = df.reset_index(name='genre')


# Use the `groupby` and `size` functions to count the number of occurrences of each genre


df = df.groupby('genre').size().reset_index(name='frequency')


# Sort the resulting DataFrame by the frequency column in descending order


df = df.sort_values(by='frequency', ascending=False).head(10)

print(df)

Jul 18, 2025

Top & Bottom Categories each Year for Men and Woman based on Product Reviews

 df = dim_category_nike_python.copy()

#print(df)


product = dim_product_nike_python.copy()


product = product.merge(df,on='category_id',how='inner')


product = product[['order_date','gender','category_name','product_reviews']]


product['order_date'] = pd.to_datetime(product['order_date']).dt.year

product=product.groupby(['order_date','gender','category_name'])['product_reviews'].mean().reset_index()


product['top_rank']=product.groupby(['order_date','gender'])['product_reviews'].rank(ascending=False)


product['bottom_rank']=product.groupby(['order_date','gender'])['product_reviews'].rank(ascending=True)


new_df = product[(product['top_rank']==1)|(product['bottom_rank']==1)]



new_df = new_df.copy()


new_df.loc[:,'Category_type'] = new_df.apply(lambda row: 'Top' if row['top_rank']== 1 else 'Bottom',axis=1 )



grouped  =new_df.groupby(['order_date','gender','Category_type'])['category_name'].first().reset_index()



pivot_df = grouped.pivot(index='order_date',columns=['gender','Category_type'],values='category_name')



pivot_df.columns = [  f"{gender}_{abc}_Category"  for gender,abc in pivot_df.columns ]


pivot_df = pivot_df.reset_index()



column = {'men_Bottom_Category':'Men_Bottom_Category',

    'woman_Top_Category':'Woman_Top_Category',

    'woman_Bottom_Category':'Woman_Bottom_Category',

    'men_Top_Category':'Men_Top_Category',

    'order_date':'order_year'

    

}


pivot_df=pivot_df.rename(columns = column)



#print(product[product['order_date']==2020])


print(pivot_df)

python script to aggregate , merge, sort etc

 df= fact_orders_grubhub_python.merge(dim_delivery_person_grubhub_python, on='Delivery_person_ID')

# Use the `groupby` and `agg` functions to compute the average age and rating for each city_type

df = df.groupby('city_type').agg({

    'Delivery_person_Age': 'mean',

    'Delivery_person_Ratings': 'mean'

}).reset_index()

# Use the `round` function to round the age and rating columns to 2 decimal places

df['Delivery_person_Age'] = df['Delivery_person_Age'].round(2)

df['Delivery_person_Ratings'] = df['Delivery_person_Ratings'].round(2)


# Rename the columns to match the names in the SQL query

df.rename(columns={

    'Delivery_person_Age': 'avg_age',

    'Delivery_person_Ratings': 'avg_rating'

}, inplace=True)


print(df.head(5))

Jul 15, 2025

Qlik connection string switch by environment

 

 Environment var.

LET v.SYS.ServerName = UPPER(ComputerName());

 

//storing environment name into a varibale

LET v.SYS.EnvironmentName = if(index('$(v.SYS.ServerName)','DV'),'dev connection string name', 

                            if(index('$(v.SYS.ServerName)','UA'),'UAT connection string name'                   ));

LIB CONNECT TO  $(v.SYS.EnvironmentName);

Jun 2, 2025

Customer rating using SQL

 WITH CustomerRatings AS (

    SELECT

        customer_id,

        AVG(rating) AS avg_rating

    FROM

        wlmt_reviews

    GROUP BY

        customer_id

)

SELECT

    CASE

        WHEN avg_rating >= 4 THEN 'Promoter'

        ELSE 'Neutral' END AS segment,

    COUNT(*) AS customer_count,

    ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM CustomerRatings),2) AS percentage

FROM

    CustomerRatings

GROUP BY

    segment

ORDER BY

    percentage DESC;

    

    

    

   

May 31, 2025

Find top 3 rated Netflix series using CTE

 WITH FiveRatedseries AS

    (

    SELECT

            c.series_id,

            series_name,

            COUNT(DISTINCT r.profile_id) AS TotalProfileCount,

            AVG(rating) AS AvgRating

    FROM            content c 

    INNER JOIN      series s ON s.series_id=c.series_id

    INNER JOIN      reviews_ntf r   ON r.content_id=c.content_id

    GROUP  BY c.series_id,

            series_name

   

    )

SELECT series_name,TotalProfileCount,ROUND(AvgRating,2)AS AvgRating

FROM FiveRatedseries

WHERE TotalProfileCount >=  5

ORDER BY AvgRating DESC


LIMIT 3


 

May 29, 2025

Find maximun difference in the order for the same customer SQL

 WITH CustomerData AS(


SELECT      customer_id,

            oz.order_id, 

            product_id,

            SUM(quantity)  AS purchase_quantity, 

            SUM(quantity) - LAG(SUM(quantity)) OVER(PARTITION BY customer_id,product_id ORDER BY order_id) quantity_diff

FROM        orders_amz oz

INNER JOIN  orderdetails o ON o.order_id=oz.order_id

GROUP BY customer_id,oz.order_id, product_id)


SELECT      c.customer_id, 

            p.product_id, 

            product_name

FROM        products_amz p 

INNER JOIN  CustomerData c on c.product_id=p.product_id

ORDER BY    quantity_diff DESC

LIMIT 1


 

CTE query to find top selling products

 WITH MonthlySales AS (

    SELECT

        p.product_name,

        s.store_id,

        SUM(s.units_sold) as units_sold,

        SUM(s.units_sold) OVER (PARTITION BY p.product_id) as total_units,

        RANK() OVER (PARTITION BY p.product_id ORDER BY SUM(s.units_sold) DESC) as store_rank

    FROM

        product p

    JOIN

        sales s ON p.product_id = s.product_id

    WHERE

        YEAR(p.launch_date)=2022

        AND s.sale_date BETWEEN p.launch_date AND DATE_ADD(p.launch_date, INTERVAL 1 MONTH)

    GROUP BY

        p.product_name, s.store_id

)


SELECT

    ms.product_name,

    st.location_city,

    st.location_country,

    ms.units_sold,

    ms.total_units,

    ROUND(100.0 * ms.units_sold / ms.total_units, 2) as store_influence_percentage

FROM

    MonthlySales ms

JOIN

    stores st ON ms.store_id = st.store_id

WHERE

    ms.store_rank = 1

ORDER BY

    store_influence_percentage DESC;

May 28, 2025

Year part extract in SQL

 SELECT 

p.product_name,

p.category,

SUM(s.units_sold) AS total_quantity_sold,

SUM(s.units_sold*s.unit_price) AS total_revenue


FROM product p

INNER JOIN sales s ON s.product_id=p.product_id

WHERE EXTRACT(YEAR FROM s.sale_date) = 2023

GROUP BY p.product_name

ORDER BY total_quantity_sold DESC 

LIMIT 5

May 18, 2025

Find the top-performing employee in terms of both revenue generated and projects completed in January 2023.

 SELECT e.employee_id, e.role

FROM employee_dimension e

JOIN employee_performance_fact ep 

ON e.employee_id = ep.employee_id

WHERE ep.month = 'January' AND ep.year = 2023

AND (ep.revenue_generated, ep.projects_completed) = (

    SELECT MAX(revenue_generated), MAX(projects_completed)

    FROM employee_performance_fact

    WHERE month = 'January' AND year = 2023)

Customer with the Highest Total Spending in Each City

 SELECT city, name, SUM(amount) AS TotalSpending

FROM customers_prc c

JOIN orders_prc o ON c.customerid = o.customer_id

GROUP BY city, name

HAVING TotalSpending = (SELECT MAX(TotalSpending) 

FROM (SELECT city, name, SUM(amount) AS TotalSpending 

        FROM customers_prc c JOIN orders_prc o ON c.customerid = o.customer_id GROUP BY city, name) AS Subquery 

WHERE Subquery.city = c.city);