Snowflake Native Apps Sample Code#
Note
Make sure to have your License ready before running the sample code. Visit License Key for more information or contact our Sales Representative .
Snowsight#
Below is a sample Python code to run a job in Snowflake Workspaces.
Step 1: Set up your Jupyter notebook#
Add a new Jupyter Notebook environment within Snowflake Workspaces, make sure to connect to a service or create a new one if needed.
Step 2: Create and run Python cell#
Add a new Python cell in your notebook, and copy the sample code below to run a job in Snowflake Workspaces.
The sample code demonstrates how to set up logging, load configuration, create temporary views for input data, call a stored procedure for each instance, and log the results and timing information. Make sure to replace the placeholders with your values.
Click to see the sample code
import logging
import time
from datetime import datetime, timedelta
import json
import concurrent.futures
from snowflake.snowpark import Session
import logging
import time
from datetime import datetime, timedelta
import json
import concurrent.futures
from snowflake.snowpark import Session
session = Session.getActiveSession()
# ──────────────────────────────────────────────────────
# Logging setup – writes to event table and console
# https://docs.snowflake.com/en/user-guide/ui-snowsight/notebooks-in-workspaces/notebooks-in-workspaces-observability-logging
# ──────────────────────────────────────────────────────
logging.getLogger().setLevel(logging.INFO)
job_logger = logging.getLogger("personator sample job")
job_logger.setLevel(logging.INFO)
job_logger.propagate = False
if job_logger.handlers:
job_logger.handlers.clear()
_formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
# Log to console
_consoleHandler = logging.StreamHandler()
_consoleHandler.setFormatter(_formatter)
job_logger.addHandler(_consoleHandler)
# ──────────────────────────────────────────────────────
# Load config
# You can generalize/individualize the config as needed
# ──────────────────────────────────────────────────────
# General config
ACTIONS = "Check,Append"
COLUMNS = "Plus4"
OUTPUT_TABLE_FIELDS = "RecordID,Results,Plus4,MelissaAddressKey"
DUPLICATE_CHECK = "FALSE"
# Input/Output config for each instance
INSTANCES = [
{ "app_name": "APPNAME_INSTANCE_1", "input_table":"TESTDB.TESTSCHEMA.INPUT_TABLE_1", "output_table": "OUT_INST1_TIMESTAMP" },
{ "app_name": "APPNAME_INSTANCE_2", "input_table":"TESTDB.TESTSCHEMA.INPUT_TABLE_2", "output_table": "OUT_INST2_TIMESTAMP" }
]
# ──────────────────────────────────────────────────────
# Helper functions
# ──────────────────────────────────────────────────────
def run_batch_job(job_name: str, instance: dict) -> dict:
"""Run CHECK_MULTIPLE_CONTACTS on one app instance. Returns timing info."""
app = instance["app_name"]
input_table = instance["input_table"]
output_table = instance["output_table"]
job_logger.info(f"[{job_name}] [{app}] Start instance call")
start_time = time.time()
start_datetime = datetime.now()
# Create the temporary input view
create_view_sql = f"""
CREATE OR REPLACE TEMPORARY VIEW {app}.CORE.INPUT_RECORDS AS
SELECT
DISTINCT
RECID AS RECORDID,
COALESCE(ADDRESSLINE1, '') AS ADDRESSLINE1,
COALESCE(ADDRESSLINE2, '') AS ADDRESSLINE2,
COALESCE(LOCALITY, '') AS CITY,
COALESCE(ADMINISTRATIVEAREA, '') AS STATE,
COALESCE(POSTALCODE,'') AS POSTALCODE,
COALESCE(COUNTRY,'') AS COUNTRY
FROM {input_table}
LIMIT 30 -- Remove or adjust the limit as needed
"""
session.sql(create_view_sql).collect()
# Get source table row count
row_count = session.sql(f"SELECT COUNT(*) AS REC_COUNT FROM {app}.CORE.INPUT_RECORDS").collect()[0]["REC_COUNT"]
job_logger.info(f"[{job_name}] [{app}] Source table {input_table}. Row count: {row_count:,}")
call_sql = f"""
CALL {app}.CORE.BATCH_PROCEDURE(
LICENSE => '<REPLACE_WITH_YOUR_LICENSE_KEY>'
,INPUT_TABLE_NAME => TABLE({app}.CORE.INPUT_RECORDS)
,OUTPUT_TABLE_NAME => '{output_table}'
,OUTPUT_TABLE_FIELDS => '{OUTPUT_TABLE_FIELDS}'
,ACTIONS => '{ACTIONS}'
,COLUMNS => '{COLUMNS}'
,DUPLICATE_CHECK => '{DUPLICATE_CHECK}'
)
"""
result = session.sql(call_sql).collect()
end_time = time.time()
end_dt = datetime.now()
elapsed = end_time - start_time
job_logger.info(f"[{job_name}] [{app}] [Instance Summary] Row count: {row_count}. Instance elapsed: {timedelta(seconds=int(elapsed))}. Result: {json.dumps(json.loads(result[0][0]), indent=2)}")
job_logger.info(f"[{job_name}] [{app}] End instance call")
return {
"app_name": app,
"input_table": input_table,
"output_table": output_table,
"row_count": row_count,
"start": start_datetime,
"end": end_dt,
"elapsed_seconds": elapsed,
"result": result,
}
# ──────────────────────────────────────────────────────
# Main
# ──────────────────────────────────────────────────────
def main(session:Session):
job_name = "SAMPLE JOB"
job_logger.info(f"[{job_name}] [MAIN] Start job")
total_start = time.time()
total_start_datetime = datetime.now()
results = []
# Run batch job calls in parallel across all instances
with concurrent.futures.ThreadPoolExecutor(max_workers=len(INSTANCES)) as executor:
futures = {
executor.submit(run_batch_job, job_name, inst): inst["app_name"]
for inst in INSTANCES
}
for future in concurrent.futures.as_completed(futures):
app_name = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
job_logger.error(f"Error on {app_name}: {e}")
total_end = time.time()
total_end_datetime = datetime.now()
total_elapsed = total_end - total_start
total_rows = sum(r["row_count"] for r in results)
# Summary
summary_message = f"Total instance counts: {len(INSTANCES)}. Total row counts: {total_rows}. Total elapsed: {timedelta(seconds=int(total_elapsed))}"
job_logger.info(f"[{job_name}] [MAIN] [JOB SUMMARY] {summary_message}")
job_logger.info(f"[{job_name}] [MAIN] End job")
return f"{summary_message}"
if __name__ == "__main__":
main(session)
Step 3: View log messages#
Add a new SQL cell in your Jupyter notebook, and run the below query to view the log messages generated from the sample code above.
Show EVENT_TABLE in your account
SHOW PARAMETERS LIKE 'event_table' IN ACCOUNT;
View logs from the
event_tablefor the job done by the sample code above.
SELECT VALUE, TIMESTAMP, RECORD_TYPE, RECORD, RECORD_ATTRIBUTES
FROM <REPLACE_WITH_YOUR_EVENT_TABLE> -- Replace with the actual event table name in your account
WHERE RECORD_TYPE = 'LOG'
AND VALUE LIKE '%[SAMPLE JOB]%'
ORDER BY TIMESTAMP DESC
LIMIT 100;
View logs for each instance call.
SELECT
RESOURCE_ATTRIBUTES:"snow.application.name"::STRING as APP_NAME
,RECORD, RECORD_ATTRIBUTES, RECORD_TYPE, TIMESTAMP
FROM <REPLACE_WITH_YOUR_EVENT_TABLE> -- Replace with the actual event table name in your account
WHERE RECORD_TYPE LIKE 'SPAN_EVENT%'
ORDER BY TIMESTAMP DESC
LIMIT 100;
Snowflake CLI#
For users who prefer using command line interface, you can use Snowflake CLI to execute the scripts provided in our GitHub repository.
Language |
GitHub Repository |
|---|---|
Python3 |