Skip to content

Commit

Permalink
Look for validation files in custom paths (#138)
Browse files Browse the repository at this point in the history
* add optional folders to validator paths

* capture specific errors in validation, comments

* pep

* pep

* remove lark version pin
  • Loading branch information
ncerutti authored Sep 19, 2024
1 parent 611dec6 commit 5c9a9ed
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions ods_tools/odtf/runner/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,26 @@ def transform(
self, extractor: BaseConnector, mapping: BaseMapping
) -> Iterable[Dict[str, Any]]:
"""
Transforms the data.
It receives it in batches from the extractor, calculates which
Orchestrates the data transformation.
It receives the data in batches from the extractor, calculates which
transformations can be applied to it, applies them, and yields the
transformed rows.
:param extractor: The data extractor object (e.g., CSV or database connector).
:param mapping: The mapping object defining transformations.
:return: An iterable of dictionaries, each representing a transformed row.
"""
validator = PandasValidator(search_paths=([os.path.dirname(self.config.path)] if self.config.path else []))
# Initial search paths
search_paths = [os.path.dirname(self.config.path)] if self.config.path else []

# Add search paths from the config, if provided
config_search_paths = self.config.get('mapping', {}).get('options', {}).get('search_paths', [])
if isinstance(config_search_paths, str):
config_search_paths = [config_search_paths]
search_paths.extend(config_search_paths)

# Instantiate the validator with the updated search paths
validator = PandasValidator(search_paths=search_paths)
runner_config = self.config.config.get('runner', None)
batch_size = runner_config.get('batch_size', 100000)
total_rows = 0
Expand All @@ -438,13 +448,28 @@ def transform(
logger.info(
f"Running transformation set {transformations[0].input_format} -> {transformations[-1].output_format} [{extractor.name}]")

validator.run(self.coerce_row_types(batch, transformations[0].types),
mapping.input_format.name, mapping.input_format.version, mapping.file_type)
# Validate input data
try:
validator.run(self.coerce_row_types(batch, transformations[0].types),
mapping.input_format.name, mapping.input_format.version, mapping.file_type)
except KeyError as e:
logger.warning(f"Validation failed due to a missing column: {e}")
except Exception as e:
logger.warning(f"Validation failed: {e}")

# Apply the transformations to the batch
for transformation in transformations:
batch = self.apply_transformation_set(batch, transformation)

validator.run(batch, mapping.output_format.name, mapping.output_format.version, mapping.file_type)
# Validate output data
try:
validator.run(batch, mapping.output_format.name, mapping.output_format.version, mapping.file_type)
except KeyError as e:
logger.warning(f"Validation failed due to a missing column: {e}")
except Exception as e:
logger.warning(f"Validation failed: {e}")

# Log the transformation progress
total_rows += len(batch)
logger.info(f"Processed {len(batch)} rows in the current batch (total: {total_rows})")

Expand Down

0 comments on commit 5c9a9ed

Please sign in to comment.