Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
emielver committed Jul 11, 2023
1 parent 85374ba commit 8a1d2df
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 77 deletions.
75 changes: 44 additions & 31 deletions macros/base/base_create_snowplow_events_this_run.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers='{"atomic": "domain_sessionid"}', session_sql=none, session_ctes=none, session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', custom_joins=none) %}
{{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_sql, session_ctes, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_joins)) }}
{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers='{"atomic": "domain_sessionid"}', session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', custom_contexts=none, custom_sql=none) %}
{{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_contexts, custom_sql)) }}
{% endmacro %}

{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_sql, session_ctes, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_joins) %}
{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_contexts_str, custom_sql) %}
{%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table),
'start_tstamp',
'end_tstamp') %}
Expand All @@ -17,24 +17,18 @@
{% set events_this_run_query %}
with identified_events AS (
select
{%- if session_identifiers %}
COALESCE(
{% for key, field in session_identifiers.items() %}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_field(key, field, 'e', dbt.type_string()) }}
{%- else -%}
e.{{field}}
{%- endif -%}
,
{%- endfor -%}
NULL
) as session_identifier,
{%- elif session_sql -%}
{{ session_sql }} as session_identifier,
{%- else -%}
{% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session code") %}
{%- endif %}
e.*
COALESCE(
{% for key, field in session_identifiers.items() %}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_field(key, field, 'e', dbt.type_string()) }}
{%- else -%}
e.{{field}}
{%- endif -%}
,
{%- endfor -%}
NULL
) as session_identifier,
e.*
{% if custom_sql %}
, {{ custom_sql }}
{% endif %}
Expand Down Expand Up @@ -74,30 +68,41 @@

{% endmacro %}

{% macro postgres__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_sql, session_ctes, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_sql, custom_joins) %}
{% macro postgres__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers_str, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, custom_contexts_str, custom_sql) %}
{%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table),
'start_tstamp',
'end_tstamp') %}
{% set session_identifiers = none %}
{% if session_identifiers_str %}
{% set session_identifiers = fromjson(session_identifiers_str) %}
{% endif %}
{% set custom_contexts = none %}
{% if custom_contexts_str %}
{% set custom_contexts = fromjson(custom_contexts_str) %}
{% endif %}
{% set sessions_this_run = ref(sessions_this_run_table) %}
{% set snowplow_events = api.Relation.create(schema=snowplow_events_schema, identifier=snowplow_events_table) %}

{% set events_this_run_query %}
with

{% if session_identifiers -%}
{% for key, field in session_identifiers.items() %}
{% for key, _ in session_identifiers.items() %}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_sde_or_context('atomic', key, lower_limit, upper_limit, key) }},
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, key) }},
{%- endif -%}
{% endfor %}
{% endif %}

{% if custom_contexts -%}
{% for key, single_entity in custom_contexts.items() %}
{%- if single_entity is not boolean -%}
{% do exceptions.raise_compiler_error("Need to specify a boolean value for each custom context denoting `True` if it is a single entity and `False` if it is not") %}
{%- endif -%}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, key, single_entity) }},
{%- endif -%}
{% endfor %}
{% elif session_ctes %}
{{ session_ctes }},
{% else %}
{% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session cte") %}
{% endif %}

identified_events AS (
Expand All @@ -123,18 +128,26 @@
{% endif %}
{% if custom_sql %}
, {{ custom_sql }}
{% elif custom_contexts %}
{% for key, _ in custom_contexts.items() %}
, {{key}}.*
{%- endfor-%}
{% endif %}

from {{ snowplow_events }} e
{% if session_identifiers %}
{% for key, _ in session_identifiers.items() %}
{%- if key != 'atomic' -%}
left join {{key}} on e.event_id = {{key}}.{{key}}_id and e.collector_tstamp = {{key}}.{{key}}_tstamp
left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.{{key}}_id and e.collector_tstamp = {{key}}.{{key}}_tstamp
{%- endif -%}
{% endfor %}
{% endif %}
{% if custom_joins %}
{{ custom_joins }}
{% if custom_contexts %}
{% for key, _ in custom_contexts.items() %}
{%- if key != 'atomic' -%}
left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.{{key}}_id and e.collector_tstamp = {{key}}.{{key}}_tstamp
{%- endif -%}
{% endfor %}
{% endif %}

), events_this_run as (
Expand Down
67 changes: 29 additions & 38 deletions macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

{% set sessions_lifecycle_manifest_query %}

with new_events_session_ids as (
with new_events_session_ids_init as (
select
{%- if session_identifiers|length > 0 %}
COALESCE(
Expand Down Expand Up @@ -65,9 +65,6 @@

where
domain_sessionid is not null
{% if quarantined_sessions %}
and not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions
{%- endif %}
and dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late
and {{ session_timestamp }} >= {{ lower_limit }}
and {{ session_timestamp }} <= {{ upper_limit }}
Expand All @@ -79,6 +76,13 @@
{% endif %}

group by 1
), new_events_session_ids as (
select *
from new_events_session_ids_init e
{% if quarantined_sessions %}
where not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions
{%- endif %}

)
{% if is_incremental() %}

Expand Down Expand Up @@ -152,30 +156,18 @@

with

{% if session_identifiers|length > 0 -%}
{% for key, field in session_identifiers.items() %}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_sde_or_context('atomic', key, lower_limit, upper_limit, 'event') }},
{%- endif -%}
{% endfor %}
{% elif session_ctes %}
{{ session_ctes }},
{% else %}
{% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session cte") %}
{% endif %}
{% if user_identifiers|length > 0 %}
{% for key, field in user_identifiers.items() %}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_sde_or_context('atomic', key, lower_limit, upper_limit, 'event') }},
{%- endif -%}
{% endfor %}
{% elif user_ctes %}
{{ user_ctes }},
{% else %}
{% do exceptions.raise_compiler_error("Need to specify either user identifiers or custom user cte") %}
{% endif %}

new_events_session_ids as (
{% for key, _ in session_identifiers.items() %}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, 'event') }},
{%- endif -%}
{% endfor %}

{% for key, _ in user_identifiers.items() %}
{%- if key != 'atomic' -%}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, key, lower_limit, upper_limit, 'event') }},
{%- endif -%}
{% endfor %}
new_events_session_ids_init as (
select
{% if session_identifiers|length > 0 %}
COALESCE(
Expand Down Expand Up @@ -220,26 +212,19 @@
{% if session_identifiers|length > 0 %}
{% for key, _ in session_identifiers.items() %}
{%- if key != 'atomic' -%}
left join {{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp
left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp
{%- endif -%}
{% endfor %}
{% endif %}
{% if user_identifiers|length > 0 %}
{% for key, _ in user_identifiers.items() %}
{%- if key != 'atomic' -%}
left join {{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp
left join {{snowplow_events_schema}}.{{key}} on e.event_id = {{key}}.event_id and e.collector_tstamp = {{key}}.event_tstamp
{%- endif -%}
{% endfor %}
{% endif %}
{% if identifier_joins %}
{{ identifier_joins }}
{% endif %}

where
domain_sessionid is not null
{% if quarantined_sessions %}
and not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions
{%- endif %}
and dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late
and {{ session_timestamp }} >= {{ lower_limit }}
and {{ session_timestamp }} <= {{ upper_limit }}
Expand All @@ -251,7 +236,13 @@
{% endif %}

group by 1
)
), new_events_session_ids as (
select *
from new_events_session_ids_init e
{% if quarantined_sessions %}
where not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions
{%- endif %}
)

{% if is_incremental() %}

Expand Down
11 changes: 3 additions & 8 deletions macros/incremental_hooks/get_enabled_snowplow_models.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{# Returns an array of enabled models tagged with snowplow_web_incremental using dbts graph object.
Throws an error if untagged models are found that depend on the base_events_this_run model#}
{% macro get_enabled_snowplow_models(package_name, graph_object=none, models_to_run=var("models_to_run", "")) -%}
{% macro get_enabled_snowplow_models(package_name, graph_object=none, models_to_run=var("models_to_run", ""), base_events_table_name='snowplow_base_events_this_run') -%}

{# Override dbt graph object if graph_object is passed. Testing purposes #}
{% if graph_object is not none %}
Expand All @@ -17,8 +17,7 @@
{% set enabled_models = [] %}
{% set untagged_snowplow_models = [] %}
{% set snowplow_model_tag = package_name+'_incremental' %}
{% set snowplow_events_this_run_path = 'model.'+project_name+'.'+package_name+'_base_events_this_run' %}

{% set snowplow_events_this_run_path = 'model.'+package_name+'.'+base_events_table_name %}
{% if execute %}

{% set nodes = graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}
Expand Down Expand Up @@ -49,15 +48,11 @@
Without this tagging these models will not be inserted into the manifest, breaking the incremental logic.
Only catches first degree dependencies rather than all downstream models
#}
{%- do exceptions.raise_compiler_error("Snowplow Warning: Untagged models referencing '"+package_name+"_base_events_this_run'. Please refer to the Snowplow docs on tagging. "
{%- do exceptions.raise_compiler_error("Snowplow Warning: Untagged models referencing '"+snowplow_events_this_run_path+"'. Please refer to the Snowplow docs on tagging. "
+ "Models: "+ ', '.join(untagged_snowplow_models)) -%}

{% endif %}

{% if enabled_models|length == 0 %}
{%- do exceptions.raise_compiler_error("No enabled models identified.") -%}
{% endif %}

{% endif %}

{{ return(enabled_models) }}
Expand Down

0 comments on commit 8a1d2df

Please sign in to comment.