Oct 10, 2025

Snowflake database pipeline, task, procedure, etc sample

 //create or replace database scd1db


CREATE OR REPLACE STORAGE INTEGRATION SCD1_INT


  TYPE = EXTERNAL_STAGE


  STORAGE_PROVIDER = 'S3'


  ENABLED = TRUE


  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::848922562186:role/scd1role'


  STORAGE_ALLOWED_LOCATIONS = ('s3://scd1databucket-ambi/data/');


// to describe the database integration details between S3 bucket to snowflake

  DESC INTEGRATION SCD1_INT;



  CREATE OR REPLACE STAGE SCD1_DB.PUBLIC.SCD1_STAGE


STORAGE_INTEGRATION = SCD1_INT


URL= 's3://scd1databucket-ambi/data/';



CREATE OR REPLACE PIPE SCD1_DB.PUBLIC.SCD1PIPE 


AUTO_INGEST = TRUE AS


COPY INTO SCD1_DB.PUBLIC.CUSTOMER_SOURCE


FROM 


(


SELECT


$1 AS CUSTOMERNAME,


$2 AS PHONE,


$3 ASADDRESSLINE1,


$4 AS ADDRESSLINE2,


$5 AS CITY,


$6 AS STATE,


$7 AS POSTALCODE,


$8 AS COUNTRY,


$9 AS TERRITORY,


$10 AS CONTACTFIRSTNAME,


$11 AS CONTACTLASTNAME


FROM @SCD1_DB.PUBLIC.SCD1_STAGE)


FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY ='"');




USE SCHEMA SCD1_DB.PUBLIC;


SHOW PIPES;




USE ROLE ACCOUNTADMIN;


 


USE WAREHOUSE COMPUTE_WH;


 


USE SCHEMA SCD1_DB.PUBLIC;



CREATE OR REPLACE STREAM SCD1_DB.PUBLIC.CUSTOMER_STREAM


ON TABLE SCD1_DB.PUBLIC.CUSTOMER_SOURCE


APPEND_ONLY = TRUE;




SELECT * FROM SCD1_DB.PUBLIC.CUSTOMER_STREAM;




CREATE OR REPLACE TABLE SCD1_DB.PUBLIC.CUSTOMER


(


CONTACTFIRSTNAME STRING,


CONTACTLASTNAME STRING,


CUSTOMERNAME STRING,


PHONE STRING,


ADDRESSLINE1 STRING,


ADDRESSLINE2 STRING,


CITY STRING,


STATE STRING,


POSTALCODE STRING,


COUNTRY STRING,


TERRITORY STRING,


INSERT_DTS TIMESTAMP(6),


UPDATE_DTS TIMESTAMP(6)


);





CREATE OR REPLACE PROCEDURE SCD1_DB.PUBLIC.CUSTOMER_SP()


RETURNS VARCHAR(50)


LANGUAGE JAVASCRIPT


EXECUTE AS CALLER


AS


$$


try {


 


//Create statement BEGIN, Begins a transaction in the current session


 


snowflake.execute({sqlText:`BEGIN TRANSACTION;`});


 


//load data from Customer Stream to a temp table


    


snowflake.execute({sqlText:`CREATE OR REPLACE TEMPORARY TABLE SCD1_DB.PUBLIC.CUSTOMER_TEMP


AS


SELECT


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


CURRENT_TIMESTAMP(6) AS INSERT_DTS,


CURRENT_TIMESTAMP(6) AS UPDATE_DTS


FROM 


SCD1_DB.PUBLIC.CUSTOMER_STREAM;`});


    


//Perfom the required SCD1 logic on the Customer Target table based on the primary column


 


snowflake.execute({sqlText:`MERGE INTO SCD1_DB.PUBLIC.CUSTOMER TGT


USING SCD1_DB.PUBLIC.CUSTOMER_TEMP TMP


ON TGT.CONTACTFIRSTNAME = TMP.CONTACTFIRSTNAME


AND TGT.CONTACTLASTNAME = TMP.CONTACTLASTNAME


 


WHEN MATCHED THEN UPDATE SET    


TGT.CUSTOMERNAME = TMP.CUSTOMERNAME,


TGT.PHONE = TMP.PHONE,


TGT.ADDRESSLINE1 = TMP.ADDRESSLINE1,


TGT.ADDRESSLINE2 = TMP.ADDRESSLINE2,


TGT.CITY = TMP.CITY,


TGT.STATE = TMP.STATE,


TGT.POSTALCODE = TMP.POSTALCODE,


TGT.COUNTRY = TMP.COUNTRY,


TGT.TERRITORY = TMP.TERRITORY,


TGT.UPDATE_DTS = TMP.UPDATE_DTS


            


WHEN NOT MATCHED THEN INSERT 


(


CONTACTFIRSTNAME,


CONTACTLASTNAME,


CUSTOMERNAME,


PHONE,


ADDRESSLINE1,


ADDRESSLINE2,


CITY,


STATE,


POSTALCODE,


COUNTRY,


TERRITORY,


INSERT_DTS,


UPDATE_DTS


)


VALUES 


(


TMP.CONTACTFIRSTNAME,


TMP.CONTACTLASTNAME,


TMP.CUSTOMERNAME,


TMP.PHONE,


TMP.ADDRESSLINE1,


TMP.ADDRESSLINE2,


TMP.CITY,


TMP.STATE,


TMP.POSTALCODE,


TMP.COUNTRY,


TMP.TERRITORY,


TMP.INSERT_DTS,


TMP.UPDATE_DTS


);`});


 


//Create statement COMMIT, Commits an open transaction in the current session


 


snowflake.execute({sqlText:`COMMIT;`});


 


//Statement returned for info and debuging purposes


 


return "Store Procedure Executed Successfully";


}


 


catch (err)


{


    result = 'Error: ' + err;


    snowflake.execute({sqlText:`ROLLBACK;`});


    throw result;


}


$$;





CREATE OR REPLACE TASK SCD1_DB.PUBLIC.CUSTOMER_TASK


WAREHOUSE = COMPUTE_WH


SCHEDULE = '1 MINUTE'


WHEN SYSTEM$STREAM_HAS_DATA('SCD1_DB.PUBLIC.CUSTOMER_STREAM')


AS CALL SCD1_DB.PUBLIC.CUSTOMER_SP();




ALTER TASK SCD1_DB.PUBLIC.CUSTOMER_TASK RESUME;



SHOW TASKS;