Skip to content

Commit

Permalink
Merge branch 'main' into fix/656
Browse files Browse the repository at this point in the history
  • Loading branch information
antonysouthworth-halter authored Jun 12, 2024
2 parents c4e08f6 + 1e23de0 commit 922936a
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 16 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,8 @@ def model(dbt, spark_session):
runs on Trino.
- Snapshot materializations are not supported.
- Spark can only reference tables within the same catalog.
- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error
when trying to create the table.

[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html
[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/athena/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.1"
version = "1.8.2"
22 changes: 15 additions & 7 deletions dbt/include/athena/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,29 @@
{% endcall %}
{%- endif -%}

{%- set old_relation_table_type = adapter.get_glue_table_type(old_relation) -%}
{%- set old_relation_table_type = adapter.get_glue_table_type(old_relation).value if old_relation else none -%}

{%- if old_relation_table_type == 'iceberg' -%}
{{ rename_relation(old_relation, old_bkp_relation) }}
-- we cannot use old_bkp_relation, because it returns None if the relation doesn't exist
-- we need to create a python object via the make_temp_relation instead
{%- set old_relation_bkp = make_temp_relation(old_relation, '__bkp') -%}

{%- if old_relation_table_type == 'iceberg_table' -%}
{{ rename_relation(old_relation, old_relation_bkp) }}
{%- else -%}
{%- do drop_relation_glue(old_relation) -%}
{%- endif -%}

-- publish the target table doing a final renaming
{{ rename_relation(tmp_relation, target_relation) }}

-- old_bkp_relation might not exists in case we have a switch from hive to iceberg
-- we prevent to drop something that doesn't exist even if drop_relation is able to deal with not existing tables
{%- if old_bkp_relation is not none -%}
{%- do drop_relation(old_bkp_relation) -%}
-- if old relation is iceberg_table, we have a backup
-- therefore we can drop the old relation backup, in all other cases there is nothing to do
-- in case of switch from hive to iceberg the backup table do not exists
-- in case of first run, the backup table do not exists
{%- if old_relation_table_type == 'iceberg_table' -%}
{%- do drop_relation(old_relation_bkp) -%}
{%- endif -%}

{%- endif -%}
{%- endif -%}

Expand Down
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ dbt-tests-adapter~=1.8.0
flake8~=7.0
Flake8-pyproject~=1.2
isort~=5.13
moto~=5.0.8
moto~=5.0.9
pre-commit~=3.5
pyparsing~=3.1.2
pytest~=8.2
pytest-cov~=5.0
pytest-dotenv~=0.5
pytest-xdist~=3.6
pyupgrade~=3.15
pyupgrade~=3.16
32 changes: 26 additions & 6 deletions tests/functional/adapter/test_ha_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,37 @@ class TestTableIcebergTableUnique:
def models(self):
return {"table_iceberg_table_unique.sql": models__table_iceberg_naming_table_unique}

def test__table_creation(self, project):
def test__table_creation(self, project, capsys):
relation_name = "table_iceberg_table_unique"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

model_run = run_dbt(["run", "--select", relation_name])
model_run_result = model_run.results[0]
assert model_run_result.status == RunStatus.Success
fist_model_run = run_dbt(["run", "--select", relation_name])
first_model_run_result = fist_model_run.results[0]
assert first_model_run_result.status == RunStatus.Success

first_models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert first_models_records_count == 2

second_model_run = run_dbt(["run", "-d", "--select", relation_name])
second_model_run_result = second_model_run.results[0]
assert second_model_run_result.status == RunStatus.Success

out, _ = capsys.readouterr()
# in case of 2nd run we expect that the target table is renamed to __bkp
alter_statement = (
f"alter table `awsdatacatalog`.`{project.test_schema}`.`{relation_name}` "
f"rename to `{project.test_schema}`.`{relation_name}__bkp`"
)
delete_bkp_table_log = (
f'Deleted table from glue catalog: "awsdatacatalog"."{project.test_schema}"."{relation_name}__bkp"'
)
assert alter_statement in out
assert delete_bkp_table_log in out

models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]
second_models_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert models_records_count == 2
assert second_models_records_count == 2


# in case s3_data_naming=table for iceberg a compile error must be raised
Expand Down

0 comments on commit 922936a

Please sign in to comment.