Skip to content

Commit

Permalink
[file-based cdk]: add config option to limit number of files for sche…
Browse files Browse the repository at this point in the history
…ma discover (#39317)

Co-authored-by: askarpets <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
Co-authored-by: Serhii Lazebnyi <[email protected]>
  • Loading branch information
4 people authored Jul 11, 2024
1 parent 9e23b3f commit 6c439a8
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ class FileBasedStreamConfig(BaseModel):
description="When enabled, syncs will not validate or structure records against the stream's schema.",
default=False,
)
recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
title="Files To Read For Schema Discover",
description="The number of resent files which will be used to discover the schema for this stream.",
default=None,
gt=0,
)

@validator("input_schema", pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AbstractFileBasedStream(Stream):
files in the stream.
- A DiscoveryPolicy that controls the number of concurrent requests sent to the source
during discover, and the number of files used for schema discovery.
- A dictionary of FileType:Parser that holds all of the file types that can be handled
- A dictionary of FileType:Parser that holds all the file types that can be handled
by the stream.
"""

Expand Down Expand Up @@ -70,7 +70,7 @@ def list_files(self) -> List[RemoteFile]:
List all files that belong to the stream.
The output of this method is cached so we don't need to list the files more than once.
This means we won't pick up changes to the files during a sync. This meethod uses the
This means we won't pick up changes to the files during a sync. This method uses the
get_files method which is implemented by the concrete stream class.
"""
return list(self.get_files())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,30 +191,40 @@ def _get_raw_json_schema(self) -> JsonSchema:
return schemaless_schema
else:
files = self.list_files()
total_n_files = len(files)

if total_n_files == 0:
self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.")
return schemaless_schema

max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
if total_n_files > max_n_files_for_schema_inference:
# Use the most recent files for schema inference, so we pick up schema changes during discovery.
files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:max_n_files_for_schema_inference]
self.logger.warn(
msg=f"Refusing to infer schema for all {total_n_files} files; using {max_n_files_for_schema_inference} files."
first_n_files = len(files)

if self.config.recent_n_files_to_read_for_schema_discovery:
self.logger.info(
msg=(
f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema "
f"for stream {self.name} due to limitation in config."
)
)
first_n_files = self.config.recent_n_files_to_read_for_schema_discovery

inferred_schema = self.infer_schema(files)
if first_n_files == 0:
self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.")
return schemaless_schema

if not inferred_schema:
raise InvalidSchemaError(
FileBasedSourceError.INVALID_SCHEMA_ERROR,
details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
stream=self.name,
)
max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())

if first_n_files > max_n_files_for_schema_inference:
# Use the most recent files for schema inference, so we pick up schema changes during discovery.
self.logger.warning(msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files.")
first_n_files = max_n_files_for_schema_inference

files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files]

inferred_schema = self.infer_schema(files)

if not inferred_schema:
raise InvalidSchemaError(
FileBasedSourceError.INVALID_SCHEMA_ERROR,
details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
stream=self.name,
)

schema = {"type": "object", "properties": inferred_schema}
schema = {"type": "object", "properties": inferred_schema}

return schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@
"default": False,
"type": "boolean",
},
"recent_n_files_to_read_for_schema_discovery": {
"title": "Files To Read For Schema Discover",
"description": "The number of resent files which will be used to discover the schema for this stream.",
"exclusiveMinimum": 0,
"type": "integer",
}
},
"required": ["name", "format"],
},
Expand Down Expand Up @@ -815,6 +821,144 @@
)
).build()

multi_csv_stream_n_file_exceeds_config_limit_for_inference = (
TestScenarioBuilder[InMemoryFilesSource]()
.set_name("multi_csv_stream_n_file_exceeds_config_limit_for_inference")
.set_config(
{
"streams": [
{
"name": "stream1",
"format": {"filetype": "csv"},
"globs": ["*"],
"validation_policy": "Emit Record",
"recent_n_files_to_read_for_schema_discovery": 3,
}
]
}
)
.set_source_builder(
FileBasedSourceBuilder()
.set_files(
{
"a.csv": {
"contents": [
("col1", "col2"),
("val11a", "val12a"),
("val21a", "val22a"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b.csv": {
"contents": [
("col1", "col2", "col3"),
("val11b", "val12b", "val13b"),
("val21b", "val22b", "val23b"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"c.csv": {
"contents": [
("col1", "col2", "col3", "col4"),
("val11c", "val12c", "val13c", "val14c"),
("val21c", "val22c", "val23c", "val24c"),
],
"last_modified": "2023-06-06T03:54:07.000Z",
},
}
)
.set_file_type("csv")
)
.set_expected_catalog(
{
"streams": [
{
"default_cursor_field": ["_ab_source_file_last_modified"],
"json_schema": {
"type": "object",
"properties": {
"col1": {"type": ["null", "string"]},
"col2": {"type": ["null", "string"]},
"col3": {"type": ["null", "string"]},
"col4": {"type": ["null", "string"]},
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
},
},
"name": "stream1",
"source_defined_cursor": True,
"supported_sync_modes": ["full_refresh", "incremental"],
"is_resumable": True,
}
]
}
)
.set_expected_records(
[
{
"data": {
"col1": "val11a",
"col2": "val12a",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val21a",
"col2": "val22a",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val11b",
"col2": "val12b",
"col3": "val13b",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "b.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val21b",
"col2": "val22b",
"col3": "val23b",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "b.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val11c",
"col2": "val12c",
"col3": "val13c",
"col4": "val14c",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "c.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val21c",
"col2": "val22c",
"col3": "val23c",
"col4": "val24c",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "c.csv",
},
"stream": "stream1",
},
]
)
).build()

invalid_csv_scenario: TestScenario[InMemoryFilesSource] = (
TestScenarioBuilder[InMemoryFilesSource]()
.set_name("invalid_csv_scenario") # too many values for the number of headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None:
self._parser.infer_schema.return_value = {"data": {"type": "string"}}
files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)]
self._stream_reader.get_matching_files.return_value = files
self._stream.config.recent_n_files_to_read_for_schema_discovery = 3

schema = self._stream.get_json_schema()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
invalid_csv_multi_scenario,
invalid_csv_scenario,
multi_csv_scenario,
multi_csv_stream_n_file_exceeds_config_limit_for_inference,
multi_csv_stream_n_file_exceeds_limit_for_inference,
multi_stream_custom_format,
schemaless_csv_multi_stream_scenario,
Expand Down Expand Up @@ -167,6 +168,7 @@
single_csv_scenario,
multi_csv_scenario,
multi_csv_stream_n_file_exceeds_limit_for_inference,
multi_csv_stream_n_file_exceeds_config_limit_for_inference,
single_csv_input_state_is_earlier_scenario,
single_csv_no_input_state_scenario,
single_csv_input_state_is_later_scenario,
Expand Down

0 comments on commit 6c439a8

Please sign in to comment.