Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utils for Spark ( Iceberg ) Support RC2 #182

Open
wants to merge 5 commits into
base: release/snowplow-utils/0.17
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pr_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ jobs:
- name: Configure Docker credentials
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }}
password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/ci/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ integration_tests:
host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}"
port: 10000
user: "{{ env_var('SPARK_USER', 'spark') }}"
schema: "{{ env_var('SPARK_SCHEMA', 'default') }}"
schema: "gh_sp_utils_dbt_{{ env_var('SCHEMA_SUFFIX') }}"
connect_retries: 5
connect_timeout: 60
threads: 1
vars:
snowplow__datalake_file_format: iceberg
threads: 1
8 changes: 1 addition & 7 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,7 @@ models:
snowplow_utils_integration_tests:
+schema: "snplw_utils_int_tests"
+incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}"
+file_format: >
{{
var(
'snowplow__datalake_file_format',
'delta' if target.type != 'spark' else 'iceberg'
)
}}
+file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}"
materializations:
snowflake_delete_insert:
enabled: "{{ target.type == 'snowflake' | as_bool() }}"
Expand Down
20 changes: 12 additions & 8 deletions macros/utils/get_schemas_by_pattern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This program is licensed to you under the Snowplow Personal and Academic License
and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0.
You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/
#}
{% macro get_schemas_by_pattern(schema_pattern) %}
{% macro get_schemas_by_pattern(schema_pattern=target.schema) %}
{{ return(adapter.dispatch('get_schemas_by_pattern', 'snowplow_utils')
(schema_pattern)) }}
{% endmacro %}
Expand All @@ -19,16 +19,20 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% endmacro %}

{% macro spark__get_schemas_by_pattern(schema_pattern) %}
{# databricks/spark uses a regex on SHOW SCHEMAS and doesn't have an information schema in hive_metastore #}
{%- set schema_pattern= dbt.replace(schema_pattern, "%", "*") -%}
{#
Databricks/Spark uses a regex on SHOW SCHEMAS and doesn't have an information schema in hive_metastore.
Replace '%' with '*' for Spark's pattern matching.
#}
{%- set adjusted_schema_pattern = schema_pattern | replace("%", "*") -%}

{# Get all schemas with the target.schema prefix #}
{%- set get_schemas_sql -%}
SHOW SCHEMAS LIKE '{{schema_pattern}}';
{%- endset -%}
{# Construct the SHOW SCHEMAS LIKE query #}
{%- set get_schemas_sql = "SHOW SCHEMAS LIKE '" ~ adjusted_schema_pattern ~ "'" -%}

{# Execute the query and fetch results #}
{% set results = run_query(get_schemas_sql) %}
{% set schemas = results|map(attribute='databaseName')|unique|list %}

{# Extract schema names from the results #}
{% set schemas = results.columns[0].values() | unique | list %}

{{ return(schemas) }}

Expand Down
87 changes: 86 additions & 1 deletion macros/utils/post_ci_cleanup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,56 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{# Destructive macro. Use with care! #}

{% macro post_ci_cleanup(schema_pattern=target.schema) %}
{{ return(adapter.dispatch('post_ci_cleanup', 'snowplow_utils')(schema_pattern)) }}
{% endmacro %}


{% macro default__post_ci_cleanup(schema_pattern=target.schema) %}

{# Get all schemas with the target.schema prefix #}
{% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern~'%') %}

{% if schemas|length %}

{%- if target.type in ['databricks'] -%}
{# Generate sql to drop all identified schemas #}
{% for schema in schemas -%}
{%- set drop_schema_sql -%}
DROP SCHEMA IF EXISTS {{schema}} CASCADE;
{%- endset -%}

{% do run_query(drop_schema_sql) %}

{% endfor %}

{%- else -%}
{# Generate sql to drop all identified schemas #}
{% set drop_schema_sql -%}

{% for schema in schemas -%}
DROP SCHEMA IF EXISTS {{schema}} CASCADE;
{% endfor %}

{%- endset %}

{# Drop schemas #}
{% do run_query(drop_schema_sql) %}

{%- endif -%}

{% endif %}

{% endmacro %}


{% macro databricks__post_ci_cleanup(schema_pattern=target.schema) %}

{# Get all schemas with the target.schema prefix #}
{% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern~'%') %}

{% if schemas|length %}

{%- if target.type in ['databricks', 'spark'] -%}
{%- if target.type in ['databricks'] -%}
{# Generate sql to drop all identified schemas #}
{% for schema in schemas -%}
{%- set drop_schema_sql -%}
Expand Down Expand Up @@ -42,3 +85,45 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% endif %}

{% endmacro %}

{#
Spark-specific implementation for post CI cleanup.
#}

{% macro spark__post_ci_cleanup(schema_pattern=target.schema) %}
{# Retrieve all schemas matching the pattern #}
{% set schemas = snowplow_utils.get_schemas_by_pattern(schema_pattern ~ "%") %}

{% if schemas | length > 0 %}
{% for schema in schemas %}
{{ log("Processing schema: " ~ schema, info=True) }}

{# Step 1: List all tables in the current schema #}
{% set tables_query = "SHOW TABLES IN " ~ schema %}
{% set tables_result = run_query(tables_query) %}

{# Initialize an empty list for tables #}
{% set table_list = [] %}

{% if tables_result and tables_result.rows %}
{% for row in tables_result.rows %}
{% set table = row[1] %}
{% do table_list.append(table) %}
{% endfor %}

{# Step 2: Drop each table individually #}
{% for table in table_list %}
{% set drop_table_sql = "DROP TABLE IF EXISTS " ~ schema ~ "." ~ table ~ ";" %}
{% do adapter.execute(drop_table_sql) %}
{% endfor %}
{% else %}
{% endif %}

{# Step 3: Drop the schema #}
{% set drop_schema_sql = "DROP SCHEMA IF EXISTS " ~ schema ~ ";" %}
{% do adapter.execute(drop_schema_sql) %}
{% endfor %}
{% else %}
{{ log("No schemas found matching pattern: " ~ schema_pattern, info=True) }}
{% endif %}
{% endmacro %}
Loading