Skip to content

Commit

Permalink
Merge pull request #14 from Snowflake-Labs/sfc-gh-ghernandez-patch-3
Browse files Browse the repository at this point in the history
Update 07_deploy_task_dag.py
  • Loading branch information
sfc-gh-ghernandez authored Jun 27, 2024
2 parents 0239130 + 9ee1d43 commit 69fd84c
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions steps/07_deploy_task_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
from snowflake.core._common import CreateMode


def create_tasks_procedurally(session: Session) -> str:
database_name = "HOL_DB"
Expand Down Expand Up @@ -36,8 +38,9 @@ def create_tasks_procedurally(session: Session) -> str:
task3 = tasks.create(task3_entity, mode="orReplace")

# Set task dependencies
task2.predecessors = [task1.name]
task3.predecessors = [task2.name]
# Use fully qualified names
task2.predecessors = [f"{task1.database.name}{task1.schema.name}{task1.name}"]
task3.predecessors = [f"{task2.database.name}.{task2.schema.name}.{task2.name}"]

# List the tasks in Snowflake
for t in tasks.iter(like="%task"):
Expand All @@ -51,13 +54,11 @@ def main(session: Session) -> str:
warehouse_name = "HOL_WH"

api_root = Root(session)
schema = api_root.databases[database_name].schemas[schema_name]
tasks = schema.tasks
dag_op = DAGOperation(schema)
# tasks = schema.tasks

# Define the DAG
dag_name = "HOL_DAG"
dag = DAG(dag_name, schedule=timedelta(days=1), warehouse=warehouse_name)
dag = DAG(dag_name, schedule=timedelta(days=1), use_func_return_value=True, warehouse=warehouse_name)
with dag:
dag_task1 = DAGTask("LOAD_ORDER_DETAIL_TASK", definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/order_detail.xlsx'), 'order_detail', 'ORDER_DETAIL')", warehouse=warehouse_name)
dag_task2 = DAGTask("LOAD_LOCATION_TASK", definition="CALL LOAD_EXCEL_WORKSHEET_TO_TABLE_SP(BUILD_SCOPED_FILE_URL(@FROSTBYTE_RAW_STAGE, 'intro/location.xlsx'), 'location', 'LOCATION')", warehouse=warehouse_name)
Expand All @@ -67,7 +68,10 @@ def main(session: Session) -> str:
dag_task3 >> dag_task2

# Create the DAG in Snowflake
dag_op.deploy(dag, mode="orreplace")
schema = api_root.databases[database_name].schemas[schema_name]
dag_op = DAGOperation(schema)

dag_op.deploy(dag, mode=CreateMode.or_replace)

dagiter = dag_op.iter_dags(like='hol_dag%')
for dag_name in dagiter:
Expand Down

0 comments on commit 69fd84c

Please sign in to comment.