From 8a1d2dfec3fc7f020dce02e74188c6975c353b6b Mon Sep 17 00:00:00 2001 From: Emiel Date: Tue, 11 Jul 2023 17:05:28 -0400 Subject: [PATCH] PR feedback --- .../base_create_snowplow_events_this_run.sql | 75 +++++++++++-------- ...e_snowplow_sessions_lifecycle_manifest.sql | 67 +++++++---------- .../get_enabled_snowplow_models.sql | 11 +-- 3 files changed, 76 insertions(+), 77 deletions(-) diff --git a/macros/base/base_create_snowplow_events_this_run.sql b/macros/base/base_create_snowplow_events_this_run.sql index 06fac89e..f727d407 100644 --- a/macros/base/base_create_snowplow_events_this_run.sql +++ b/macros/base/base_create_snowplow_events_this_run.sql @@ -1,9 +1,9 @@ -{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers='{"atomic": "domain_sessionid"}', session_sql=none, session_ctes=none, session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', custom_joins=none) %} - {{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_sql, session_ctes, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_joins)) }} +{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers='{"atomic": "domain_sessionid"}', session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', custom_contexts=none, custom_sql=none) %} + {{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_contexts, custom_sql)) }} {% endmacro %} -{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_sql, session_ctes, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_joins) %} +{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_contexts_str, custom_sql) %} {%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table), 'start_tstamp', 'end_tstamp') %} @@ -17,24 +17,18 @@ {% set events_this_run_query %} with identified_events AS ( select - {%- if session_identifiers %} - COALESCE( - {% for key, field in session_identifiers.items() %} - {%- if key != 'atomic' -%} - {{ snowplow_utils.get_field(key, field, 'e', dbt.type_string()) }} - {%- else -%} - e.{{field}} - {%- endif -%} - , - {%- endfor -%} - NULL - ) as session_identifier, - {%- elif session_sql -%} - {{ session_sql }} as session_identifier, - {%- else -%} - {% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session code") %} - {%- endif %} - e.* + COALESCE( + {% for key, field in session_identifiers.items() %} + {%- if key != 'atomic' -%} + {{ snowplow_utils.get_field(key, field, 'e', dbt.type_string()) }} + {%- else -%} + e.{{field}} + {%- endif -%} + , + {%- endfor -%} + NULL + ) as session_identifier, + e.* {% if custom_sql %} , {{ custom_sql }} {% endif %} @@ -74,7 +68,7 @@ {% endmacro %} -{% macro postgres__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_sql, session_ctes, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_sql, custom_joins) %} +{% macro postgres__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_contexts_str, custom_sql) %} {%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table), 'start_tstamp', 'end_tstamp') %} @@ -82,6 +76,10 @@ {% if session_identifiers_str %} {% set session_identifiers = fromjson(session_identifiers_str) %} {% endif %} + {% set custom_contexts = none %} + {% if custom_contexts_str %} + {% set custom_contexts = fromjson(custom_contexts_str) %} + {% endif %} {% set sessions_this_run = ref(sessions_this_run_table) %} {% set snowplow_events = api.Relation.create(schema=snowplow_events_schema, identifier=snowplow_events_table) %} @@ -89,15 +87,22 @@ with {% if session_identifiers -%} - {% for key, field in session_identifiers.items() %} + {% for key, _ in session_identifiers.items() %} {%- if key != 'atomic' -%} - {{ snowplow_utils.get_sde_or_context('atomic', key, lower_limit, upper_limit, key) }}, + {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, key) }}, + {%- endif -%} + {% endfor %} + {% endif %} + + {% if custom_contexts -%} + {% for key, single_entity in custom_contexts.items() %} + {%- if single_entity is not boolean -%} + {% do exceptions.raise_compiler_error("Need to specify a boolean value for each custom context denoting `True` if it is a single entity and `False` if it is not") %} + {%- endif -%} + {%- if key != 'atomic' -%} + {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, key, single_entity) }}, {%- endif -%} {% endfor %} - {% elif session_ctes %} - {{ session_ctes }}, - {% else %} - {% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session cte") %} {% endif %} identified_events AS ( @@ -123,18 +128,26 @@ {% endif %} {% if custom_sql %} , {{ custom_sql }} + {% elif custom_contexts %} + {% for key, _ in custom_contexts.items() %} + , {{key}}.* + {%- endfor-%} {% endif %} from {{ snowplow_events }} e {% if session_identifiers %} {% for key, _ in session_identifiers.items() %} {%- if key != 'atomic' -%} - left join {{key}} on e.event_id = {{key}}.{{key}}_id and e.collector_tstamp = {{key}}.{{key}}_tstamp + left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.{{key}}_id and e.collector_tstamp = {{key}}.{{key}}_tstamp {%- endif -%} {% endfor %} {% endif %} - {% if custom_joins %} - {{ custom_joins }} + {% if custom_contexts %} + {% for key, _ in custom_contexts.items() %} + {%- if key != 'atomic' -%} + left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.{{key}}_id and e.collector_tstamp = {{key}}.{{key}}_tstamp + {%- endif -%} + {% endfor %} {% endif %} ), events_this_run as ( diff --git a/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql b/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql index a902893e..5c40f1d0 100644 --- a/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql +++ b/macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql @@ -20,7 +20,7 @@ {% set sessions_lifecycle_manifest_query %} - with new_events_session_ids as ( + with new_events_session_ids_init as ( select {%- if session_identifiers|length > 0 %} COALESCE( @@ -65,9 +65,6 @@ where domain_sessionid is not null - {% if quarantined_sessions %} - and not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions - {%- endif %} and dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late and {{ session_timestamp }} >= {{ lower_limit }} and {{ session_timestamp }} <= {{ upper_limit }} @@ -79,6 +76,13 @@ {% endif %} group by 1 + ), new_events_session_ids as ( + select * + from new_events_session_ids_init e + {% if quarantined_sessions %} + where not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions + {%- endif %} + ) {% if is_incremental() %} @@ -152,30 +156,18 @@ with - {% if session_identifiers|length > 0 -%} - {% for key, field in session_identifiers.items() %} - {%- if key != 'atomic' -%} - {{ snowplow_utils.get_sde_or_context('atomic', key, lower_limit, upper_limit, 'event') }}, - {%- endif -%} - {% endfor %} - {% elif session_ctes %} - {{ session_ctes }}, - {% else %} - {% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session cte") %} - {% endif %} - {% if user_identifiers|length > 0 %} - {% for key, field in user_identifiers.items() %} - {%- if key != 'atomic' -%} - {{ snowplow_utils.get_sde_or_context('atomic', key, lower_limit, upper_limit, 'event') }}, - {%- endif -%} - {% endfor %} - {% elif user_ctes %} - {{ user_ctes }}, - {% else %} - {% do exceptions.raise_compiler_error("Need to specify either user identifiers or custom user cte") %} - {% endif %} - - new_events_session_ids as ( + {% for key, _ in session_identifiers.items() %} + {%- if key != 'atomic' -%} + {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, 'event') }}, + {%- endif -%} + {% endfor %} + + {% for key, _ in user_identifiers.items() %} + {%- if key != 'atomic' -%} + {{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, 'event') }}, + {%- endif -%} + {% endfor %} + new_events_session_ids_init as ( select {% if session_identifiers|length > 0 %} COALESCE( @@ -220,26 +212,19 @@ {% if session_identifiers|length > 0 %} {% for key, _ in session_identifiers.items() %} {%- if key != 'atomic' -%} - left join {{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp + left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp {%- endif -%} {% endfor %} {% endif %} {% if user_identifiers|length > 0 %} {% for key, _ in user_identifiers.items() %} {%- if key != 'atomic' -%} - left join {{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp + left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp {%- endif -%} {% endfor %} {% endif %} - {% if identifier_joins %} - {{ identifier_joins }} - {% endif %} - where domain_sessionid is not null - {% if quarantined_sessions %} - and not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions - {%- endif %} and dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late and {{ session_timestamp }} >= {{ lower_limit }} and {{ session_timestamp }} <= {{ upper_limit }} @@ -251,7 +236,13 @@ {% endif %} group by 1 - ) + ), new_events_session_ids as ( + select * + from new_events_session_ids_init e + {% if quarantined_sessions %} + where not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions + {%- endif %} + ) {% if is_incremental() %} diff --git a/macros/incremental_hooks/get_enabled_snowplow_models.sql b/macros/incremental_hooks/get_enabled_snowplow_models.sql index 9e8ef004..d2054ae7 100644 --- a/macros/incremental_hooks/get_enabled_snowplow_models.sql +++ b/macros/incremental_hooks/get_enabled_snowplow_models.sql @@ -1,6 +1,6 @@ {# Returns an array of enabled models tagged with snowplow_web_incremental using dbts graph object. Throws an error if untagged models are found that depend on the base_events_this_run model#} -{% macro get_enabled_snowplow_models(package_name, graph_object=none, models_to_run=var("models_to_run", "")) -%} +{% macro get_enabled_snowplow_models(package_name, graph_object=none, models_to_run=var("models_to_run", ""), base_events_table_name='snowplow_base_events_this_run') -%} {# Override dbt graph object if graph_object is passed. Testing purposes #} {% if graph_object is not none %} @@ -17,8 +17,7 @@ {% set enabled_models = [] %} {% set untagged_snowplow_models = [] %} {% set snowplow_model_tag = package_name+'_incremental' %} - {% set snowplow_events_this_run_path = 'model.'+project_name+'.'+package_name+'_base_events_this_run' %} - + {% set snowplow_events_this_run_path = 'model.'+package_name+'.'+base_events_table_name %} {% if execute %} {% set nodes = graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} @@ -49,15 +48,11 @@ Without this tagging these models will not be inserted into the manifest, breaking the incremental logic. Only catches first degree dependencies rather than all downstream models #} - {%- do exceptions.raise_compiler_error("Snowplow Warning: Untagged models referencing '"+package_name+"_base_events_this_run'. Please refer to the Snowplow docs on tagging. " + {%- do exceptions.raise_compiler_error("Snowplow Warning: Untagged models referencing '"+snowplow_events_this_run_path+"'. Please refer to the Snowplow docs on tagging. " + "Models: "+ ', '.join(untagged_snowplow_models)) -%} {% endif %} - {% if enabled_models|length == 0 %} - {%- do exceptions.raise_compiler_error("No enabled models identified.") -%} - {% endif %} - {% endif %} {{ return(enabled_models) }}