Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
chore(s3_delta): Add s3_delta (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vixtir authored Jun 8, 2023
1 parent e3e5654 commit 16b1aa7
Show file tree
Hide file tree
Showing 57 changed files with 1,518 additions and 1,042 deletions.
30 changes: 30 additions & 0 deletions config_examples/s3_delta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# S3 Delta Lake Adapter
default_pulling_interval: 10
token:
platform_host_url: "http://localhost:8080"
plugins:
- type: s3_delta
name: s3_adapter
aws_access_key_id:
aws_secret_access_key:
aws_region:
# aws_session_token: # optional if you want to specify the aws session token
delta_tables:
- bucket: bucket
prefix: delta_data


# Minio S3 Delta Lake Adapter
default_pulling_interval: 10
token:
platform_host_url: "http://localhost:8080"
plugins:
- type: s3_delta
name: s3_adapter
endpoint_url: http://localhost:9000
aws_storage_allow_http: True
aws_access_key_id: minioadmin
aws_secret_access_key: minioadmin
delta_tables:
- bucket: bucket
prefix: delta_data
15 changes: 8 additions & 7 deletions odd_collector_aws/adapters/athena/adapter.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from dataclasses import dataclass
from typing import Dict, Callable, Union, Any, Iterable, Optional, List
from typing import Any, Callable, Dict, Iterable, List, Optional, Union

import boto3
from more_itertools import flatten
from odd_models.models import DataEntity
from odd_models.models import DataEntityList
from oddrn_generator import AthenaGenerator
from odd_collector_sdk.domain.adapter import AbstractAdapter
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import AthenaGenerator

from odd_collector_aws.domain.plugin import AthenaPlugin
from odd_collector_aws.domain.paginator_config import PaginatorConfig
from odd_collector_aws.domain.plugin import AthenaPlugin

from .mappers.tables import map_athena_table

Expand Down Expand Up @@ -111,8 +110,10 @@ def __fetch_paginator(self, conf: PaginatorConfig) -> Iterable:
)

for entity in sdk_response.build_full_result()[conf.list_fetch_key]:
yield entity if conf.mapper is None else conf.mapper(
entity, conf.mapper_args
yield (
entity
if conf.mapper is None
else conf.mapper(entity, conf.mapper_args)
)

if sdk_response.resume_token is None:
Expand Down
6 changes: 2 additions & 4 deletions odd_collector_aws/adapters/athena/mappers/columns/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List, Dict, Any
from typing import Any, Dict, List

from lark import Lark, LarkError
from odd_models.models import DataSetField, DataSetFieldType
Expand Down Expand Up @@ -58,9 +58,7 @@ def __map_column(
name = (
column_name
if column_name is not None
else type_parsed["field_name"]
if "field_name" in type_parsed
else athena_type
else type_parsed["field_name"] if "field_name" in type_parsed else athena_type
)

resource_name = "keys" if is_key else "values" if is_value else "subcolumns"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lark import Transformer, Token
from lark import Token, Transformer


class AthenaFieldTypeTransformer(Transformer):
Expand Down
2 changes: 1 addition & 1 deletion odd_collector_aws/adapters/athena/mappers/metadata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import Any, Dict, Set, Callable
from typing import Any, Callable, Dict, Set

from humps.main import decamelize

Expand Down
3 changes: 1 addition & 2 deletions odd_collector_aws/adapters/athena/mappers/tables.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any
from typing import Any, Dict

from more_itertools import flatten
from odd_models.models import DataEntity, DataEntityType, DataSet
Expand All @@ -15,7 +15,6 @@ def map_athena_table(
database_name: str,
oddrn_generator: AthenaGenerator,
) -> DataEntity:

data_entity_type = TABLE_TYPES_SQL_TO_ODD.get(
raw_table_data["TableType"], DataEntityType.UNKNOWN
)
Expand Down
1 change: 1 addition & 0 deletions odd_collector_aws/adapters/athena/mappers/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict

from odd_models.models import DataEntityType

TABLE_TYPES_SQL_TO_ODD: Dict[str, DataEntityType] = {
Expand Down
16 changes: 11 additions & 5 deletions odd_collector_aws/adapters/dms/adapter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Iterable, Dict, Any
from typing import Any, Dict, Iterable

from odd_collector_sdk.domain.adapter import AbstractAdapter
from odd_collector_aws.domain.plugin import DmsPlugin
from oddrn_generator.generators import DmsGenerator
from odd_models.models import DataEntityList
from odd_collector_aws.domain.paginator_config import PaginatorConfig
from oddrn_generator.generators import DmsGenerator

from odd_collector_aws.domain.fetch_paginator import fetch_paginator
from odd_collector_aws.domain.paginator_config import PaginatorConfig
from odd_collector_aws.domain.plugin import DmsPlugin

from .client import DMSClient
from .mappers.tasks import map_dms_task

Expand Down Expand Up @@ -68,4 +71,7 @@ def _get_endpoints_nodes(self) -> Iterable:

def _get_endpoints_nodes_arn_dict(self) -> Dict[str, Any]:
endpoint_nodes = self._get_endpoints_nodes()
return {endpoint_node.get('EndpointArn'): endpoint_node for endpoint_node in endpoint_nodes}
return {
endpoint_node.get("EndpointArn"): endpoint_node
for endpoint_node in endpoint_nodes
}
2 changes: 1 addition & 1 deletion odd_collector_aws/adapters/dms/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from odd_collector_aws.domain.plugin import AwsPlugin
from odd_collector_aws.aws.aws_client import AwsClient
from odd_collector_aws.domain.plugin import AwsPlugin


class DMSClient:
Expand Down
69 changes: 44 additions & 25 deletions odd_collector_aws/adapters/dms/mappers/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from abc import abstractmethod
from typing import Any, Dict, List, Type

from oddrn_generator.generators import (
MssqlGenerator,
Generator,
S3Generator,
MongoGenerator,
MssqlGenerator,
MysqlGenerator,
PostgresqlGenerator,
MongoGenerator,
S3Generator,
)
from typing import Dict, Any, Type, List
from abc import abstractmethod


class EndpointEngine:
Expand All @@ -28,11 +29,15 @@ def get_oddrn_for_schema_name(self, generator: Generator, schema_name: str) -> s
pass

@abstractmethod
def get_oddrn_for_table_schema_names(self, generator: Generator, schema_name: str, table_name: str) -> str:
def get_oddrn_for_table_schema_names(
self, generator: Generator, schema_name: str, table_name: str
) -> str:
pass

@abstractmethod
def extend_schema_oddrn_with_table_name(self, schema_oddrn: str, table_name: str) -> str:
def extend_schema_oddrn_with_table_name(
self, schema_oddrn: str, table_name: str
) -> str:
pass


Expand All @@ -45,19 +50,25 @@ def get_oddrn_for_schema_name(self, generator: Generator, schema_name: str) -> s
generator.set_oddrn_paths(**{self.schemas_path_name: schema_name})
return generator.get_oddrn_by_path(self.schemas_path_name)

def get_oddrn_for_table_schema_names(self, generator: Generator, schema_name: str, table_name: str) -> str:
generator.set_oddrn_paths(**{self.schemas_path_name: schema_name, self.tables_path_name: table_name})
def get_oddrn_for_table_schema_names(
self, generator: Generator, schema_name: str, table_name: str
) -> str:
generator.set_oddrn_paths(
**{self.schemas_path_name: schema_name, self.tables_path_name: table_name}
)
return generator.get_oddrn_by_path(self.tables_path_name)

def extend_schema_oddrn_with_table_name(self, schema_oddrn: str, table_name: str) -> str:
def extend_schema_oddrn_with_table_name(
self, schema_oddrn: str, table_name: str
) -> str:
return f"{schema_oddrn}/{self.tables_path_name}/{table_name}"


class MongodbEngine(JdbcEngine):
engine_name = "mongodb"
settings_node_name = "MongoDbSettings"
schemas_path_name = 'schemas'
tables_path_name = 'collections'
schemas_path_name = "schemas"
tables_path_name = "collections"

def get_generator(self) -> Generator:
return MongoGenerator(
Expand All @@ -69,8 +80,8 @@ def get_generator(self) -> Generator:
class MysqlEngine(JdbcEngine):
engine_name = "mysql"
settings_node_name = "MySQLSettings"
schemas_path_name = 'schemas'
tables_path_name = 'tables'
schemas_path_name = "schemas"
tables_path_name = "tables"

def get_generator(self) -> Generator:
return MysqlGenerator(
Expand All @@ -82,8 +93,8 @@ def get_generator(self) -> Generator:
class MariadbEngine(JdbcEngine):
engine_name = "mariadb"
settings_node_name = "MySQLSettings"
schemas_path_name = 'schemas'
tables_path_name = 'tables'
schemas_path_name = "schemas"
tables_path_name = "tables"

def get_generator(self) -> Generator:
return MysqlGenerator(
Expand All @@ -95,8 +106,8 @@ def get_generator(self) -> Generator:
class PostgresqlEngine(JdbcEngine):
engine_name = "postgres"
settings_node_name = "PostgreSQLSettings"
schemas_path_name = 'schemas'
tables_path_name = 'tables'
schemas_path_name = "schemas"
tables_path_name = "tables"

def get_generator(self) -> Generator:
return PostgresqlGenerator(
Expand All @@ -108,8 +119,8 @@ def get_generator(self) -> Generator:
class MssqlEngine(JdbcEngine):
engine_name = "sqlserver"
settings_node_name = "MicrosoftSQLServerSettings"
schemas_path_name = 'schemas'
tables_path_name = 'tables'
schemas_path_name = "schemas"
tables_path_name = "tables"

def get_generator(self) -> Generator:
return MssqlGenerator(
Expand All @@ -135,17 +146,25 @@ def source_oddrn(self):
def get_oddrn_for_schema_name(self, generator: Generator, schema_name: str) -> str:
return f"{self.source_oddrn}\\{schema_name}"

def get_oddrn_for_table_schema_names(self, generator: Generator, schema_name: str, table_name: str) -> str:
def get_oddrn_for_table_schema_names(
self, generator: Generator, schema_name: str, table_name: str
) -> str:
return f"{self.source_oddrn}\\{schema_name}\\{table_name}"

def extend_schema_oddrn_with_table_name(self, schema_oddrn: str, table_name: str) -> str:
def extend_schema_oddrn_with_table_name(
self, schema_oddrn: str, table_name: str
) -> str:
return f"{schema_oddrn}\\{table_name}"


engines: List[Type[EndpointEngine]] = [
MssqlEngine, S3Engine, MysqlEngine,
PostgresqlEngine, MariadbEngine, MongodbEngine,
]
MssqlEngine,
S3Engine,
MysqlEngine,
PostgresqlEngine,
MariadbEngine,
MongodbEngine,
]

engines_factory: Dict[str, Type[EndpointEngine]] = {
engine.engine_name: engine for engine in engines
Expand Down
3 changes: 2 additions & 1 deletion odd_collector_aws/adapters/dms/mappers/metadata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Dict, List

from odd_models.models import MetadataExtension
from typing import List, Dict, Any


def create_metadata_extension_list(
Expand Down
Loading

0 comments on commit 16b1aa7

Please sign in to comment.