Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
chore(s3_delta): recursively search delta tables (#39)
Browse files Browse the repository at this point in the history
* feat(s3_delta): added filter for objects base name
  • Loading branch information
Vixtir authored Jun 9, 2023
1 parent 16b1aa7 commit aa17d71
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 89 deletions.
19 changes: 18 additions & 1 deletion config_examples/s3_delta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,25 @@ plugins:
# aws_session_token: # optional if you want to specify the aws session token
delta_tables:
- bucket: bucket
prefix: delta_data
prefix: delta_data # Prefix to DeltaTable or directory where delta tables are stored

# S3 Delta Lake Adapter with Filter
default_pulling_interval: 10
token:
platform_host_url: "http://localhost:8080"
plugins:
- type: s3_delta
name: s3_adapter
aws_access_key_id:
aws_secret_access_key:
aws_region:
# aws_session_token: # optional if you want to specify the aws session token
delta_tables:
- bucket: bucket
prefix: delta_data
object_filter: # will exclude all folders with _pii at the end of name from ingestion
exclude:
- ".*_pii"

# Minio S3 Delta Lake Adapter
default_pulling_interval: 10
Expand Down
7 changes: 5 additions & 2 deletions odd_collector_aws/adapters/s3/file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,8 @@ def get_folder(self, path: str, recursive: bool = True) -> Folder:

def remove_protocol(path: str) -> str:
if path.startswith("s3://"):
path = path[5:]
return path
return path.removeprefix("s3://")
elif path.startswith(("s3a://", "s3n://")):
return path[6:]
else:
return path
6 changes: 3 additions & 3 deletions odd_collector_aws/adapters/s3_delta/adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Union

from funcy import lmap, partial
from funcy import lmap, mapcat, partial
from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models.models import DataEntityList
from oddrn_generator.generators import Generator, S3Generator
Expand Down Expand Up @@ -30,10 +30,10 @@ def create_generator(self) -> Generator:
def get_data_entity_list(self) -> DataEntityList:
logger.debug(f"Getting data entity list for {self.config.delta_tables}")

tables = lmap(self.client.get_table, self.config.delta_tables)
tables = mapcat(self.client.get_table, self.config.delta_tables)
data_entities = lmap(partial(map_delta_table, self.generator), tables)

return DataEntityList(
data_source_oddrn=self.generator.get_data_source_oddrn(),
items=list(data_entities),
items=data_entities,
)
120 changes: 82 additions & 38 deletions odd_collector_aws/adapters/s3_delta/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import datetime
import traceback as tb
from dataclasses import asdict, dataclass
from typing import Optional
from typing import Any, Iterable, Optional

from deltalake import DeltaTable
from funcy import complement, isnone, last, partial, select_values, silent, walk

from odd_collector_aws.domain.plugin import DeltaTableConfig, S3DeltaPlugin
from odd_collector_aws.filesystem.pyarrow_fs import FileSystem

from .logger import logger
from .models.table import DTable
Expand Down Expand Up @@ -51,58 +52,101 @@ def to_dict(self) -> dict[str, str]:
return select_values(complement(isnone), asdict(self))


class IsNotDeltaTable(Exception):
...


class DeltaClient:
def __init__(self, config: S3DeltaPlugin) -> None:
self.storage_options: StorageOptions = StorageOptions.from_config(config)
self.fs = FileSystem(config)

def get_table(self, delta_table_config: DeltaTableConfig) -> DTable:
# sourcery skip: raise-specific-error
def load_delta_table(self, delta_table_config: DeltaTableConfig) -> None:
try:
table = DeltaTable(
delta_table_config.path, storage_options=self.storage_options.to_dict()
return DeltaTable(
delta_table_config.path,
storage_options=self.storage_options.to_dict(),
)
except Exception as e:
raise IsNotDeltaTable() from e

def handle_folder(self, config: DeltaTableConfig) -> Iterable[DTable]:
logger.debug(f"Getting delta tables from folder {config.path}")

objects = self.fs.get_file_info(remove_protocol(config.path))
allowed = filter(
lambda object: not object.is_file and config.allow(object.base_name),
objects,
)

for object in allowed:
config = config.append_prefix(object.base_name)
yield from self.get_table(config)

def get_table(self, delta_table_config: DeltaTableConfig) -> Iterable[DTable]:
# sourcery skip: raise-specific-error
try:
logger.debug(f"Getting delta table {delta_table_config.path}")
table = self.load_delta_table(delta_table_config)

metadata = get_metadata(table)

metadata = {}

try:
logger.debug(f"Getting actions list for {delta_table_config.path}")
actions = table.get_add_actions(flatten=True).to_pydict()

metadata |= walk(
partial(handle_values, actions),
{"size_bytes": sum, "num_records": sum, "modification_time": last},
)
except Exception as e:
logger.error(
f"Failed to get actions list for {delta_table_config.path}"
)

try:
logger.debug(f"Getting metadata for {delta_table_config.path}")
delta_metadata = table.metadata()
metadata |= {
"id": delta_metadata.id,
"name": delta_metadata.name,
"description": delta_metadata.description,
"partition_columns": ",".join(delta_metadata.partition_columns),
"configuration": delta_metadata.configuration,
"created_time": delta_metadata.created_time,
}
except Exception as e:
logger.debug(tb.format_exc())
logger.error(
f"Failed to get metadata for {delta_table_config.path}. {e}"
)

return DTable(
yield DTable(
table_uri=table.table_uri,
schema=table.schema(),
num_rows=metadata.get("num_records"),
metadata=metadata,
created_at=silent(from_ms)(metadata.get("created_time")),
updated_at=silent(add_utc_timezone)(metadata.get("modification_time")),
)
except IsNotDeltaTable:
logger.warning(
f"Path {delta_table_config.path} is not a delta table. Searching for"
" delta tables in subfolders."
)
yield from self.handle_folder(delta_table_config)
except Exception as e:
raise Exception(
f"Failed to get delta table {delta_table_config.path}. {e}"
) from e


def get_metadata(table: DeltaTable) -> dict[str, Any]:
metadata = {}

try:
logger.debug(f"Getting actions list for {table.table_uri}")
actions = table.get_add_actions(flatten=True).to_pydict()

metadata |= walk(
partial(handle_values, actions),
{"size_bytes": sum, "num_records": sum, "modification_time": last},
)
except Exception as e:
logger.error(f"Failed to get actions list for {table.table_uri}")

try:
logger.debug(f"Getting metadata for {table.table_uri}")
delta_metadata = table.metadata()
metadata |= {
"id": delta_metadata.id,
"name": delta_metadata.name,
"description": delta_metadata.description,
"partition_columns": ",".join(delta_metadata.partition_columns),
"configuration": delta_metadata.configuration,
"created_time": delta_metadata.created_time,
}
except Exception as e:
logger.debug(tb.format_exc())
logger.error(f"Failed to get metadata for {table.table_uri}. {e}")

return metadata


def remove_protocol(path: str) -> str:
if path.startswith("s3://"):
return path.removeprefix("s3://")
elif path.startswith(("s3a://", "s3n://")):
return path[6:]
else:
return path
13 changes: 13 additions & 0 deletions odd_collector_aws/domain/plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, List, Literal, Optional

from odd_collector_sdk.domain.filter import Filter
from odd_collector_sdk.domain.plugin import Plugin
from odd_collector_sdk.types import PluginFactory
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -41,11 +42,23 @@ class DeltaTableConfig(BaseModel):
scheme: str = Field(default="s3", alias="schema")
bucket: str
prefix: str
object_filter: Optional[Filter] = None

@property
def path(self) -> str:
return f"{self.scheme}://{self.bucket}/{self.prefix.strip('/')}"

def append_prefix(self, path: str) -> "DeltaTableConfig":
return DeltaTableConfig(
schema=self.scheme,
bucket=self.bucket,
prefix=f"{self.prefix}/{path}",
object_filter=self.object_filter,
)

def allow(self, name: str) -> bool:
return self.object_filter.is_allowed(name)


class S3DeltaPlugin(AwsPlugin):
type: Literal["s3_delta"]
Expand Down
34 changes: 34 additions & 0 deletions odd_collector_aws/filesystem/pyarrow_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pyarrow._fs import FileInfo, FileSelector
from pyarrow.fs import S3FileSystem

from odd_collector_aws.domain.plugin import AwsPlugin


class FileSystem:
"""
FileSystem hides pyarrow.fs implementation details.
"""

def __init__(self, config: AwsPlugin):
params = {}

if config.aws_access_key_id:
params["access_key"] = config.aws_access_key_id
if config.aws_secret_access_key:
params["secret_key"] = config.aws_secret_access_key
if config.aws_session_token:
params["session_token"] = config.aws_session_token
if config.aws_region:
params["region"] = config.aws_region
if config.endpoint_url:
params["endpoint_override"] = config.endpoint_url

self.fs = S3FileSystem(**params)

def get_file_info(self, path: str) -> list[FileInfo]:
"""
Get file info from path.
@param path: s3 path to file or folder
@return: FileInfo
"""
return self.fs.get_file_info(FileSelector(base_dir=path))
Loading

0 comments on commit aa17d71

Please sign in to comment.