Skip to content

Commit

Permalink
Merge pull request #13 from Snowflake-Labs/sfc-gh-ghernandez-patch-2
Browse files Browse the repository at this point in the history
Update 07_deploy_task_dag.py
  • Loading branch information
sfc-gh-ghernandez authored Jun 27, 2024
2 parents 9380099 + 9df3333 commit 0239130
Showing 1 changed file with 6 additions and 36 deletions.
42 changes: 6 additions & 36 deletions steps/07_deploy_task_dag.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,9 @@
#------------------------------------------------------------------------------
# Hands-On Lab: Intro to Data Engineering with Snowpark Python
# Script: 07_deploy_task_dag.py
# Author: Jeremiah Hansen
# Last Updated: 9/26/2023
#------------------------------------------------------------------------------

# SNOWFLAKE ADVANTAGE: Snowpark Python API
# SNOWFLAKE ADVANTAGE: Snowpark Python Task DAG API


from datetime import timedelta

#from snowflake.connector import connect
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task import Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask


# Alternative way to create the tasks
def create_tasks_procedurally(session: Session) -> str:
database_name = "HOL_DB"
schema_name = "HOL_SCHEMA"
Expand All @@ -46,31 +29,22 @@ def create_tasks_procedurally(session: Session) -> str:
definition="CALL LOAD_DAILY_CITY_METRICS_SP()",
warehouse=warehouse_name
)
task2_entity.predecessors = [task1_entity.name]
task3_entity.predecessors = [task2_entity.name]

# Create the tasks in Snowflake
task1 = tasks.create(task1_entity, mode="orReplace")
task2 = tasks.create(task2_entity, mode="orReplace")
task3 = tasks.create(task3_entity, mode="orReplace")

# Set task dependencies
task2.predecessors = [task1.name]
task3.predecessors = [task2.name]

# List the tasks in Snowflake
for t in tasks.iter(like="%task"):
print(f"Definition of {t.name}: \n\n", t.name, t.definition, sep="", end="\n\n--------------------------\n\n")

task1.execute()

# task1.get_current_graphs()

# task1.suspend()
# task2.suspend()
# task3.suspend()
# task3.delete()
# task2.delete()
# task1.delete()


# Create the tasks using the DAG API
def main(session: Session) -> str:
database_name = "HOL_DB"
schema_name = "HOL_SCHEMA"
Expand All @@ -89,7 +63,7 @@ def main(session: Session) -> str:
dag_task2 = DAGTask("LOAD_LOCATION_TASK", definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/location.xlsx'), 'location', 'LOCATION')", warehouse=warehouse_name)
dag_task3 = DAGTask("LOAD_DAILY_CITY_METRICS_TASK", definition="CALL LOAD_DAILY_CITY_METRICS_SP()", warehouse=warehouse_name)

dag_task3 >> dag_task1
dag_task2 >> dag_task1
dag_task3 >> dag_task2

# Create the DAG in Snowflake
Expand All @@ -101,13 +75,9 @@ def main(session: Session) -> str:

dag_op.run(dag)

# dag_op.delete(dag)

return f"Successfully created and started the DAG"


# For local debugging
# Be aware you may need to type-convert arguments if you add input parameters
if __name__ == '__main__':
import os, sys
# Add the utils package to our path and import the snowpark_utils function
Expand Down

0 comments on commit 0239130

Please sign in to comment.