//create or replace database scd1db
CREATE OR REPLACE STORAGE INTEGRATION SCD1_INT
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::848922562186:role/scd1role'
STORAGE_ALLOWED_LOCATIONS = ('s3://scd1databucket-ambi/data/');
// to describe the database integration details between S3 bucket to snowflake
DESC INTEGRATION SCD1_INT;
CREATE OR REPLACE STAGE SCD1_DB.PUBLIC.SCD1_STAGE
STORAGE_INTEGRATION = SCD1_INT
URL= 's3://scd1databucket-ambi/data/';
CREATE OR REPLACE PIPE SCD1_DB.PUBLIC.SCD1PIPE
AUTO_INGEST = TRUE AS
COPY INTO SCD1_DB.PUBLIC.CUSTOMER_SOURCE
FROM
(
SELECT
$1 AS CUSTOMERNAME,
$2 AS PHONE,
$3 ASADDRESSLINE1,
$4 AS ADDRESSLINE2,
$5 AS CITY,
$6 AS STATE,
$7 AS POSTALCODE,
$8 AS COUNTRY,
$9 AS TERRITORY,
$10 AS CONTACTFIRSTNAME,
$11 AS CONTACTLASTNAME
FROM @SCD1_DB.PUBLIC.SCD1_STAGE)
FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY ='"');
USE SCHEMA SCD1_DB.PUBLIC;
SHOW PIPES;
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE COMPUTE_WH;
USE SCHEMA SCD1_DB.PUBLIC;
CREATE OR REPLACE STREAM SCD1_DB.PUBLIC.CUSTOMER_STREAM
ON TABLE SCD1_DB.PUBLIC.CUSTOMER_SOURCE
APPEND_ONLY = TRUE;
SELECT * FROM SCD1_DB.PUBLIC.CUSTOMER_STREAM;
CREATE OR REPLACE TABLE SCD1_DB.PUBLIC.CUSTOMER
(
CONTACTFIRSTNAME STRING,
CONTACTLASTNAME STRING,
CUSTOMERNAME STRING,
PHONE STRING,
ADDRESSLINE1 STRING,
ADDRESSLINE2 STRING,
CITY STRING,
STATE STRING,
POSTALCODE STRING,
COUNTRY STRING,
TERRITORY STRING,
INSERT_DTS TIMESTAMP(6),
UPDATE_DTS TIMESTAMP(6)
);
CREATE OR REPLACE PROCEDURE SCD1_DB.PUBLIC.CUSTOMER_SP()
RETURNS VARCHAR(50)
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
try {
//Create statement BEGIN, Begins a transaction in the current session
snowflake.execute({sqlText:`BEGIN TRANSACTION;`});
//load data from Customer Stream to a temp table
snowflake.execute({sqlText:`CREATE OR REPLACE TEMPORARY TABLE SCD1_DB.PUBLIC.CUSTOMER_TEMP
AS
SELECT
CONTACTFIRSTNAME,
CONTACTLASTNAME,
CUSTOMERNAME,
PHONE,
ADDRESSLINE1,
ADDRESSLINE2,
CITY,
STATE,
POSTALCODE,
COUNTRY,
TERRITORY,
CURRENT_TIMESTAMP(6) AS INSERT_DTS,
CURRENT_TIMESTAMP(6) AS UPDATE_DTS
FROM
SCD1_DB.PUBLIC.CUSTOMER_STREAM;`});
//Perfom the required SCD1 logic on the Customer Target table based on the primary column
snowflake.execute({sqlText:`MERGE INTO SCD1_DB.PUBLIC.CUSTOMER TGT
USING SCD1_DB.PUBLIC.CUSTOMER_TEMP TMP
ON TGT.CONTACTFIRSTNAME = TMP.CONTACTFIRSTNAME
AND TGT.CONTACTLASTNAME = TMP.CONTACTLASTNAME
WHEN MATCHED THEN UPDATE SET
TGT.CUSTOMERNAME = TMP.CUSTOMERNAME,
TGT.PHONE = TMP.PHONE,
TGT.ADDRESSLINE1 = TMP.ADDRESSLINE1,
TGT.ADDRESSLINE2 = TMP.ADDRESSLINE2,
TGT.CITY = TMP.CITY,
TGT.STATE = TMP.STATE,
TGT.POSTALCODE = TMP.POSTALCODE,
TGT.COUNTRY = TMP.COUNTRY,
TGT.TERRITORY = TMP.TERRITORY,
TGT.UPDATE_DTS = TMP.UPDATE_DTS
WHEN NOT MATCHED THEN INSERT
(
CONTACTFIRSTNAME,
CONTACTLASTNAME,
CUSTOMERNAME,
PHONE,
ADDRESSLINE1,
ADDRESSLINE2,
CITY,
STATE,
POSTALCODE,
COUNTRY,
TERRITORY,
INSERT_DTS,
UPDATE_DTS
)
VALUES
(
TMP.CONTACTFIRSTNAME,
TMP.CONTACTLASTNAME,
TMP.CUSTOMERNAME,
TMP.PHONE,
TMP.ADDRESSLINE1,
TMP.ADDRESSLINE2,
TMP.CITY,
TMP.STATE,
TMP.POSTALCODE,
TMP.COUNTRY,
TMP.TERRITORY,
TMP.INSERT_DTS,
TMP.UPDATE_DTS
);`});
//Create statement COMMIT, Commits an open transaction in the current session
snowflake.execute({sqlText:`COMMIT;`});
//Statement returned for info and debuging purposes
return "Store Procedure Executed Successfully";
}
catch (err)
{
result = 'Error: ' + err;
snowflake.execute({sqlText:`ROLLBACK;`});
throw result;
}
$$;
CREATE OR REPLACE TASK SCD1_DB.PUBLIC.CUSTOMER_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('SCD1_DB.PUBLIC.CUSTOMER_STREAM')
AS CALL SCD1_DB.PUBLIC.CUSTOMER_SP();
ALTER TASK SCD1_DB.PUBLIC.CUSTOMER_TASK RESUME;
SHOW TASKS;