diff --git a/ods_tools/odtf/runner/pandas.py b/ods_tools/odtf/runner/pandas.py index a0b475c..9072174 100644 --- a/ods_tools/odtf/runner/pandas.py +++ b/ods_tools/odtf/runner/pandas.py @@ -413,8 +413,8 @@ def transform( self, extractor: BaseConnector, mapping: BaseMapping ) -> Iterable[Dict[str, Any]]: """ - Transforms the data. - It receives it in batches from the extractor, calculates which + Orchestrates the data transformation. + It receives the data in batches from the extractor, calculates which transformations can be applied to it, applies them, and yields the transformed rows. @@ -422,7 +422,17 @@ def transform( :param mapping: The mapping object defining transformations. :return: An iterable of dictionaries, each representing a transformed row. """ - validator = PandasValidator(search_paths=([os.path.dirname(self.config.path)] if self.config.path else [])) + # Initial search paths + search_paths = [os.path.dirname(self.config.path)] if self.config.path else [] + + # Add search paths from the config, if provided + config_search_paths = self.config.get('mapping', {}).get('options', {}).get('search_paths', []) + if isinstance(config_search_paths, str): + config_search_paths = [config_search_paths] + search_paths.extend(config_search_paths) + + # Instantiate the validator with the updated search paths + validator = PandasValidator(search_paths=search_paths) runner_config = self.config.config.get('runner', None) batch_size = runner_config.get('batch_size', 100000) total_rows = 0 @@ -438,13 +448,28 @@ def transform( logger.info( f"Running transformation set {transformations[0].input_format} -> {transformations[-1].output_format} [{extractor.name}]") - validator.run(self.coerce_row_types(batch, transformations[0].types), - mapping.input_format.name, mapping.input_format.version, mapping.file_type) + # Validate input data + try: + validator.run(self.coerce_row_types(batch, transformations[0].types), + mapping.input_format.name, mapping.input_format.version, mapping.file_type) + except KeyError as e: + logger.warning(f"Validation failed due to a missing column: {e}") + except Exception as e: + logger.warning(f"Validation failed: {e}") + # Apply the transformations to the batch for transformation in transformations: batch = self.apply_transformation_set(batch, transformation) - validator.run(batch, mapping.output_format.name, mapping.output_format.version, mapping.file_type) + # Validate output data + try: + validator.run(batch, mapping.output_format.name, mapping.output_format.version, mapping.file_type) + except KeyError as e: + logger.warning(f"Validation failed due to a missing column: {e}") + except Exception as e: + logger.warning(f"Validation failed: {e}") + + # Log the transformation progress total_rows += len(batch) logger.info(f"Processed {len(batch)} rows in the current batch (total: {total_rows})")