Skip to content

Commit

Permalink
merging conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
bindipankhudi committed Mar 13, 2024
2 parents 74e5035 + 2f483ec commit 5a99835
Show file tree
Hide file tree
Showing 26 changed files with 657 additions and 77 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/autofix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
repository_dispatch:
types: [autofix-command]

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

jobs:
python-autofix:
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/pydoc_preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
- main
pull_request: {}

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

jobs:
preview_docs:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pydoc_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ on:
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages
permissions:
contents: read
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pypi_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:

workflow_dispatch:

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

jobs:
build:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/python_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
- main
pull_request: {}

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

jobs:
ruff-lint-check:
name: Ruff Lint Check
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/python_pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ on:
- main
pull_request: {}

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

jobs:
pytest-fast:
name: Pytest (Fast)
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/release_drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on:
branches:
- main

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

permissions:
contents: read

Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/semantic_pr_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ on:
- edited
- synchronize

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

permissions:
pull-requests: read

Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/slash_command_dispatch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on:
issue_comment:
types: [created]

env:
AIRBYTE_ANALYTICS_ID: ${{ vars.AIRBYTE_ANALYTICS_ID }}

jobs:
slashCommandDispatch:
runs-on: ubuntu-latest
Expand Down
44 changes: 27 additions & 17 deletions airbyte/_processors/sql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sqlalchemy
import ulid
from overrides import overrides
from pandas import Index
from sqlalchemy import (
Column,
Table,
Expand All @@ -29,7 +30,7 @@

from airbyte import exceptions as exc
from airbyte._processors.base import RecordProcessor
from airbyte._util.text_util import lower_case_set
from airbyte._util.name_normalizers import LowerCaseNormalizer
from airbyte.caches._catalog_manager import CatalogManager
from airbyte.datasets._sql import CachedDataset
from airbyte.progress import progress
Expand Down Expand Up @@ -73,9 +74,17 @@ class SqlProcessorBase(RecordProcessor):
"""A base class to be used for SQL Caches."""

type_converter_class: type[SQLTypeConverter] = SQLTypeConverter
"""The type converter class to use for converting JSON schema types to SQL types."""

normalizer = LowerCaseNormalizer
"""The name normalizer to user for table and column name normalization."""

file_writer_class: type[FileWriterBase]
"""The file writer class to use for writing files to the cache."""

supports_merge_insert = False
"""True if the database supports the MERGE INTO syntax."""

use_singleton_connection = False # If true, the same connection is used for all operations.

# Constructor:
Expand Down Expand Up @@ -197,7 +206,7 @@ def get_sql_table_name(

# TODO: Add default prefix based on the source name.

return self._normalize_table_name(
return self.normalizer.normalize(
f"{table_prefix}{stream_name}{self.cache.table_suffix}",
)

Expand Down Expand Up @@ -324,7 +333,7 @@ def _get_temp_table_name(
) -> str:
"""Return a new (unique) temporary table name."""
batch_id = batch_id or str(ulid.ULID())
return self._normalize_table_name(f"{stream_name}_{batch_id}")
return self.normalizer.normalize(f"{stream_name}_{batch_id}")

def _fully_qualified(
self,
Expand Down Expand Up @@ -414,11 +423,11 @@ def _ensure_compatible_table_schema(
stream_column_names: list[str] = json_schema["properties"].keys()
table_column_names: list[str] = self.get_sql_table(stream_name).columns.keys()

lower_case_table_column_names = lower_case_set(table_column_names)
lower_case_table_column_names = self.normalizer.normalize_set(table_column_names)
missing_columns = [
stream_col
for stream_col in stream_column_names
if stream_col.lower() not in lower_case_table_column_names
if self.normalizer.normalize(stream_col) not in lower_case_table_column_names
]
if missing_columns:
if raise_on_error:
Expand Down Expand Up @@ -452,17 +461,12 @@ def _create_table(
"""
_ = self._execute_sql(cmd)

def _normalize_column_name(
self,
raw_name: str,
) -> str:
return raw_name.lower().replace(" ", "_").replace("-", "_")

def _normalize_table_name(
def _get_stream_properties(
self,
raw_name: str,
) -> str:
return raw_name.lower().replace(" ", "_").replace("-", "_")
stream_name: str,
) -> dict[str, dict]:
"""Return the names of the top-level properties for the given stream."""
return self._get_stream_json_schema(stream_name)["properties"]

@final
def _get_sql_column_definitions(
Expand All @@ -471,9 +475,9 @@ def _get_sql_column_definitions(
) -> dict[str, sqlalchemy.types.TypeEngine]:
"""Return the column definitions for the given stream."""
columns: dict[str, sqlalchemy.types.TypeEngine] = {}
properties = self._get_stream_json_schema(stream_name)["properties"]
properties = self._get_stream_properties(stream_name)
for property_name, json_schema_property_def in properties.items():
clean_prop_name = self._normalize_column_name(property_name)
clean_prop_name = self.normalizer.normalize(property_name)
columns[clean_prop_name] = self.type_converter.to_sql_type(
json_schema_property_def,
)
Expand Down Expand Up @@ -635,6 +639,12 @@ def _write_files_to_new_table(
},
)

# Normalize all column names to lower case.
dataframe.columns = Index(
[LowerCaseNormalizer.normalize(col) for col in dataframe.columns]
)

# Write the data to the table.
dataframe.to_sql(
temp_table_name,
self.get_sql_alchemy_url(),
Expand Down
12 changes: 9 additions & 3 deletions airbyte/_processors/sql/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def _write_files_to_new_table(
stream_name=stream_name,
batch_id=batch_id,
)
properties_list = list(self._get_stream_properties(stream_name).keys())
columns_list = list(self._get_sql_column_definitions(stream_name=stream_name).keys())
columns_list_str = indent(
"\n, ".join([self._quote_identifier(c) for c in columns_list]),
Expand All @@ -93,9 +94,14 @@ def _write_files_to_new_table(
columns_type_map = indent(
"\n, ".join(
[
f"{self._quote_identifier(c)}: "
f"{self._get_sql_column_definitions(stream_name)[c]!s}"
for c in columns_list
self._quote_identifier(prop_name)
+ ": "
+ str(
self._get_sql_column_definitions(stream_name)[
self.normalizer.normalize(prop_name)
]
)
for prop_name in properties_list
]
),
" ",
Expand Down
6 changes: 4 additions & 2 deletions airbyte/_processors/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ def _write_files_to_new_table(
]
)
self._execute_sql(put_files_statements)

properties_list: list[str] = list(self._get_stream_properties(stream_name).keys())
columns_list = [
self._quote_identifier(c)
for c in list(self._get_sql_column_definitions(stream_name).keys())
]
files_list = ", ".join([f"'{f.name}'" for f in files])
columns_list_str: str = indent("\n, ".join(columns_list), " " * 12)
variant_cols_str: str = ("\n" + " " * 21 + ", ").join([f"$1:{col}" for col in columns_list])
variant_cols_str: str = ("\n" + " " * 21 + ", ").join(
[f"$1:{col}" for col in properties_list]
)
copy_statement = dedent(
f"""
COPY INTO {temp_table_name}
Expand Down
Loading

0 comments on commit 5a99835

Please sign in to comment.