Skip to content

Commit

Permalink
fix: adjust macro name, create schema if not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrebzl committed May 27, 2024
1 parent 2eb788e commit d90b20a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ athena:
- For incremental models using insert overwrite strategy on hive table
- Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid
- Useful if you are looking to run multiple dbt build inserting in the same table in parallel
- `tmp_schema` (`default=none`)
- For incremental models, it allows to define a schema to hold temporary create statements used in incremental model runs
- Schema will be created in the model target database if does not exist
- `lf_tags_config` (`default=none`)
- [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns
- `enabled` (`default=False`) whether LF tags management is enabled for a model
Expand Down
3 changes: 2 additions & 1 deletion dbt/include/athena/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
{%- endcall %}
{%- endmacro %}

{% macro make_temp_relation(base_relation, suffix, temp_schema=none) %}
{% macro athena__make_temp_relation(base_relation, suffix, temp_schema=none) %}
{%- set temp_identifier = base_relation.identifier ~ suffix -%}
{%- set temp_relation = base_relation.incorporate(path={"identifier": temp_identifier}) -%}

Expand All @@ -45,6 +45,7 @@
"identifier": temp_identifier,
"schema": temp_schema
}) -%}
{%- do athena__create_schema(temp_relation) -%}
{% endif %}

{{ return(temp_relation) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
{% set old_tmp_relation = adapter.get_relation(identifier=target_relation.identifier ~ tmp_table_suffix,
schema=schema,
database=database) %}
{% set tmp_relation = make_temp_relation(target_relation, suffix=tmp_table_suffix, temp_schema=tmp_schema) %}
{% set tmp_relation = athena__make_temp_relation(target_relation, suffix=tmp_table_suffix, temp_schema=tmp_schema) %}

-- If no partitions are used with insert_overwrite, we fall back to append mode.
{% if partitioned_by is none and strategy == 'insert_overwrite' %}
Expand Down
18 changes: 11 additions & 7 deletions tests/functional/adapter/test_incremental_tmp_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import yaml
from tests.functional.adapter.utils.parse_dbt_run_output import (
extract_create_statement_table_names,
extract_running_create_statements,
Expand All @@ -12,7 +13,7 @@
materialized='incremental',
incremental_strategy='insert_overwrite',
partitioned_by=['date_column'],
tmp_schema='test_temporary_schema_used_to_hold_tmp_tables'
tmp_schema=var('tmp_schema_name')
)
}}
select
Expand All @@ -28,18 +29,22 @@ def models(self):

def test__schema_tmp(self, project, capsys):
relation_name = "schema_tmp"
tmp_schema_name = "test_temporary_schema_used_to_hold_tmp_tables"
create_tmp_schema = f"create schema if not exists `{tmp_schema_name}`"
tmp_schema_name = f"{project.test_schema}_tmp"
drop_tmp_schema = f"drop schema if exists `{tmp_schema_name}` cascade"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

vars_dict = {
"tmp_schema_name": tmp_schema_name,
"logical_date": "2024-01-01",
}

first_model_run = run_dbt(
[
"run",
"--select",
relation_name,
"--vars",
'{"logical_date": "2024-01-01"}',
yaml.safe_dump(vars_dict),
"--log-level",
"debug",
"--log-format",
Expand All @@ -66,15 +71,14 @@ def test__schema_tmp(self, project, capsys):

assert tmp_schema_name not in incremental_model_run_result_table_name

project.run_sql(create_tmp_schema)

vars_dict["logical_date"] = "2024-01-02"
incremental_model_run = run_dbt(
[
"run",
"--select",
relation_name,
"--vars",
'{"logical_date": "2024-01-02"}',
yaml.safe_dump(vars_dict),
"--log-level",
"debug",
"--log-format",
Expand Down

0 comments on commit d90b20a

Please sign in to comment.