diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index d601d2a..f13a222 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -115,7 +115,7 @@ jobs: dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} - name: Run tests - run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_tests.sh -d ${{ matrix.warehouse }} - name: "Post-test: Drop ci schemas" run: | diff --git a/CHANGELOG b/CHANGELOG index 94328d6..500c0e8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,14 @@ +Snowplow Normalize 0.3.3 (2023-09-29) +--------------------------------------- +## Summary +- Include the new base macro functionality from utils in the package +- Allow users to specify the timestamp used to process events (from the default of `collector_tstamp`) + +## Under the hood +- Simplify the model architecture +## Upgrading +Bump the snowplow-normalize version in your `packages.yml` file. + Snowplow Normalize 0.3.2 (2023-09-12) --------------------------------------- ## Summary @@ -19,11 +30,11 @@ To upgrade the package, bump the version number in the `packages.yml` file in yo Snowplow Normalize 0.3.0 (2023-03-28) --------------------------------------- ## Summary -This version migrates our models away from the `snowplow_incremental_materialization` and instead move to using the built-in `incremental` with an optimization applied on top. +This version migrates our models away from the `snowplow_incremental_materialization` and instead move to using the built-in `incremental` with an optimization applied on top. ## 🚨 Breaking Changes 🚨 ### Changes to materialization -To take advantage of the optimization we apply to the `incremental` materialization, users will need to add the following to their `dbt_project.yml` : +To take advantage of the optimization we apply to the `incremental` materialization, users will need to add the following to their `dbt_project.yml` : ```yaml # dbt_project.yml ... @@ -53,7 +64,7 @@ This release allows users to disable the days late data filter to enable normali - Allow disabling of days late filter by setting `snowplow__days_late_allowed` to `-1` (#28) ## Upgrading -To upgrade the package, bump the version number in the packages.yml file in your project. +To upgrade the package, bump the version number in the packages.yml file in your project. Snowplow Normalize 0.2.2 (2023-03-13) --------------------------------------- @@ -122,7 +133,7 @@ Once you have upgraded your config file, the easiest way to ensure your models m - Change the `unique_key` in the config section to `unique_id` - Add a line between the `event_table_name` and `from` lines for each select statement; `, event_id||'-'||'' as unique_id`, with the event table name for that select block. - For your users table: - - Add 3 new values to the start of the macro call, `'user_id','',''`, before the `user_cols` argument. + - Add 3 new values to the start of the macro call, `'user_id','',''`, before the `user_cols` argument. ### Upgrade your filtered events table If you use the master filtered events table, you will need to add a new column for the latest version to work. If you have not processed much data yet it may be easier to simply re-run the package from scratch using `dbt run --full-refresh --vars 'snowplow__allow_refresh: true'`, alternatively run the following in your warehouse, replacing the schema/dataset/warehouse and table name for your table: diff --git a/dbt_project.yml b/dbt_project.yml index 63e91e7..fcf5966 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,6 +1,6 @@ name: 'snowplow_normalize' -version: '0.3.2' +version: '0.3.3' config-version: 2 require-dbt-version: [">=1.4.0", "<2.0.0"] @@ -38,6 +38,7 @@ vars: snowplow__query_tag: "snowplow_dbt" snowplow__dev_target_name: 'dev' snowplow__allow_refresh: false + snowplow__session_timestamp: 'collector_tstamp' # Variables - Databricks Only # Add the following variable to your dbt project's dbt_project.yml file # Depending on the use case it should either be the catalog (for Unity Catalog users from databricks connector 1.1.1 onwards) or the same value as your snowplow__atomic_schema (unless changed it should be 'atomic') @@ -51,7 +52,7 @@ on-run-start: # Update manifest table with last event consumed per sucessfully executed node/model on-run-end: - - "{{ snowplow_utils.snowplow_incremental_post_hook('snowplow_normalize') }}" + - "{{ snowplow_utils.snowplow_incremental_post_hook('snowplow_normalize', 'snowplow_normalize_incremental_manifest', 'snowplow_normalize_base_events_this_run', var('snowplow__session_timestamp')) }}" # Tag 'snowplow_normalize_incremental' allows snowplow_incremental_post_hook to identify Snowplow models @@ -67,9 +68,4 @@ models: scratch: +schema: "scratch" +tags: "scratch" - bigquery: - enabled: "{{ target.type == 'bigquery' | as_bool() }}" - databricks: - enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" - snowflake: - enabled: "{{ target.type == 'snowflake' | as_bool() }}" + +enabled: "{{ target.type in ['bigquery', 'databricks', 'spark', 'snowflake'] | as_bool() }}" diff --git a/integration_tests/.scripts/integration_test.sh b/integration_tests/.scripts/integration_tests.sh similarity index 100% rename from integration_tests/.scripts/integration_test.sh rename to integration_tests/.scripts/integration_tests.sh diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index 697876b..2b79367 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -1,5 +1,5 @@ name: 'snowplow_normalize_integration_tests' -version: '0.3.2' +version: '0.3.3' config-version: 2 profile: 'integration_tests' diff --git a/models/base/manifest/snowplow_normalize_incremental_manifest.sql b/models/base/manifest/snowplow_normalize_incremental_manifest.sql index 19c3302..07703a9 100644 --- a/models/base/manifest/snowplow_normalize_incremental_manifest.sql +++ b/models/base/manifest/snowplow_normalize_incremental_manifest.sql @@ -8,13 +8,4 @@ -- Boilerplate to generate table. -- Table updated as part of end-run hook -with prep as ( - select - cast(null as {{ snowplow_utils.type_max_string() }}) model, - cast('1970-01-01' as {{ type_timestamp() }}) as last_success -) - -select * - -from prep -where false +{{ snowplow_utils.base_create_snowplow_incremental_manifest() }} diff --git a/models/base/scratch/bigquery/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/bigquery/snowplow_normalize_base_events_this_run.sql deleted file mode 100644 index 5ad20a6..0000000 --- a/models/base/scratch/bigquery/snowplow_normalize_base_events_this_run.sql +++ /dev/null @@ -1,36 +0,0 @@ -{{ - config( - tags=["this_run"] - ) -}} - -{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} - --- without downstream joins, it's safe to dedupe by picking the first event_id found. -select - array_agg(e order by e.collector_tstamp limit 1)[offset(0)].* - -from ( - - select - a.* - - from {{ var('snowplow__events') }} as a - - where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} - {% endif %} - and a.collector_tstamp >= {{ lower_limit }} - and a.collector_tstamp <= {{ upper_limit }} - {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} - and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} - and a.derived_tstamp <= {{ upper_limit }} - {% endif %} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} - -) e -group by e.event_id diff --git a/models/base/scratch/databricks/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/databricks/snowplow_normalize_base_events_this_run.sql deleted file mode 100644 index 08f1b1d..0000000 --- a/models/base/scratch/databricks/snowplow_normalize_base_events_this_run.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ - config( - tags=["this_run"] - ) -}} - -{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} - -select - a.* - -from {{ var('snowplow__events') }} as a - -where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} - {% endif %} - and a.collector_tstamp >= {{ lower_limit }} - and a.collector_tstamp <= {{ upper_limit }} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} - -qualify row_number() over (partition by a.event_id order by a.collector_tstamp, a.etl_tstamp) = 1 diff --git a/models/base/scratch/snowflake/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/snowflake/snowplow_normalize_base_events_this_run.sql deleted file mode 100644 index 9e3c728..0000000 --- a/models/base/scratch/snowflake/snowplow_normalize_base_events_this_run.sql +++ /dev/null @@ -1,27 +0,0 @@ -{{ - config( - tags=["this_run"], - sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')) - ) -}} - -{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} - -select - a.* - -from {{ var('snowplow__events') }} as a - - -where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} - {% endif %} - and a.collector_tstamp >= {{ lower_limit }} - and a.collector_tstamp <= {{ upper_limit }} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} - -qualify row_number() over (partition by a.event_id order by a.collector_tstamp) = 1 diff --git a/models/base/scratch/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/snowplow_normalize_base_events_this_run.sql new file mode 100644 index 0000000..38e7bbb --- /dev/null +++ b/models/base/scratch/snowplow_normalize_base_events_this_run.sql @@ -0,0 +1,30 @@ +{{ + config( + tags=["this_run"], + sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')) + ) +}} + +{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} + +select + a.* + +from {{ var('snowplow__events') }} as a + +where + {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} + {% if var("snowplow__days_late_allowed") == -1 %} + 1 = 1 + {% else %} + a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} + {% endif %} + and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} >= {{ lower_limit }} + and a.{{ var('snowplow__session_timestamp', 'collector_tstamp') }} <= {{ upper_limit }} + {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} + and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} + and a.derived_tstamp <= {{ upper_limit }} + {% endif %} + and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} + +qualify row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.etl_tstamp {%- endif %}) = 1 diff --git a/models/base/scratch/snowplow_normalize_base_new_event_limits.sql b/models/base/scratch/snowplow_normalize_base_new_event_limits.sql index 72078ed..4238cf3 100644 --- a/models/base/scratch/snowplow_normalize_base_new_event_limits.sql +++ b/models/base/scratch/snowplow_normalize_base_new_event_limits.sql @@ -10,14 +10,15 @@ {% set min_last_success, max_last_success, models_matched_from_manifest, - has_matched_all_models = snowplow_utils.get_incremental_manifest_status(ref('snowplow_normalize_incremental_manifest'), models_in_run) -%} + has_matched_all_models = snowplow_utils.get_incremental_manifest_status(ref('snowplow_normalize_incremental_manifest'), + models_in_run) -%} {% set run_limits_query = snowplow_utils.get_run_limits(min_last_success, - max_last_success, - models_matched_from_manifest, - has_matched_all_models, - var("snowplow__start_date","2020-01-01")) -%} + max_last_success, + models_matched_from_manifest, + has_matched_all_models, + var("snowplow__start_date","2020-01-01")) -%} {{ run_limits_query }} diff --git a/packages.yml b/packages.yml index 74baac8..98cdce5 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - package: snowplow/snowplow_utils - version: [">=0.14.0", "<0.16.0"] + version: [">=0.15.1", "<0.16.0"]