From 6a9d8d7dfa3505d1ab5e87dead6dbf8654e2cebe Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Thu, 19 Sep 2024 10:00:54 +0200 Subject: [PATCH] chore(pipeline) : Make the main DAG run hourly Having two DAGS running the geocodages model in the same time doesn't bode well with the incremental models. Let's switch to running the main DAG hourly, which also kind of makes sense as our pipeline is more frequently updated and is also by far the simpler option, thus the one with the fewer surprises. --- pipeline/dags/compute_hourly.py | 29 ----------------------------- pipeline/dags/dag_utils/dbt.py | 7 ++----- pipeline/dags/main.py | 2 +- 3 files changed, 3 insertions(+), 35 deletions(-) delete mode 100644 pipeline/dags/compute_hourly.py diff --git a/pipeline/dags/compute_hourly.py b/pipeline/dags/compute_hourly.py deleted file mode 100644 index 34e93d14..00000000 --- a/pipeline/dags/compute_hourly.py +++ /dev/null @@ -1,29 +0,0 @@ -import pendulum - -import airflow -from airflow.operators import empty - -from dag_utils import date, marts, notifications -from dag_utils.dbt import ( - get_intermediate_tasks, - get_staging_tasks, -) - -with airflow.DAG( - dag_id="compute_hourly", - start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), - default_args=notifications.notify_failure_args(), - schedule="@hourly", - catchup=False, - concurrency=4, -) as dag: - start = empty.EmptyOperator(task_id="start") - end = empty.EmptyOperator(task_id="end") - - ( - start - >> get_staging_tasks(schedule="@hourly") - >> get_intermediate_tasks() - >> marts.export_di_dataset_to_s3() - >> end - ) diff --git a/pipeline/dags/dag_utils/dbt.py b/pipeline/dags/dag_utils/dbt.py index 61c0787f..17453f04 100644 --- a/pipeline/dags/dag_utils/dbt.py +++ b/pipeline/dags/dag_utils/dbt.py @@ -48,13 +48,10 @@ def dbt_operator_factory( ) -def get_staging_tasks(schedule=None): +def get_staging_tasks(): task_list = [] - for source_id, src_meta in sorted(SOURCES_CONFIGS.items()): - if schedule and "schedule" in src_meta and src_meta["schedule"] != schedule: - continue - + for source_id in sorted(SOURCES_CONFIGS): dbt_source_id = source_id.replace("-", "_") stg_selector = f"path:models/staging/sources/**/stg_{dbt_source_id}__*.sql" diff --git a/pipeline/dags/main.py b/pipeline/dags/main.py index adde7e64..f518f563 100644 --- a/pipeline/dags/main.py +++ b/pipeline/dags/main.py @@ -15,7 +15,7 @@ dag_id="main", start_date=pendulum.datetime(2022, 1, 1, tz=date.TIME_ZONE), default_args=notify_failure_args(), - schedule="0 4 * * *", + schedule="@hourly", catchup=False, concurrency=4, ) as dag: