Skip to content

Commit

Permalink
Fix config instantiation for ConcurrentDeclarativeSource to have the …
Browse files Browse the repository at this point in the history
…right config
  • Loading branch information
maxi297 committed Nov 8, 2024
1 parent 80607bd commit 7d2baea
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ def __init__(self, email: str, password: str):

class SourceZendeskSupport(YamlDeclarativeSource):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
if config and not config.get("start_date", None):
config["start_date"] = SourceZendeskSupport.get_default_start_date()
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"})
# Before 2024-11-11, the config was being modified in `streams`. We can't do that anymore because `ConcurrentDeclarativeSource` use
# the config to make concurrent components. Hence, the "main" config needs to be the declarative one and if Python sources need
# something a bit different, it needs to deduce it from the declarative config
declarative_config = SourceZendeskSupport.convert_config_to_declarative_stream_args(config) if config else None
super().__init__(catalog=catalog, config=declarative_config, state=state, **{"path_to_yaml": "manifest.yaml"})

@classmethod
def get_default_start_date(cls) -> str:
Expand Down Expand Up @@ -112,13 +114,12 @@ def convert_config2stream_args(cls, config: Mapping[str, Any]) -> Mapping[str, A

@classmethod
def convert_config_to_declarative_stream_args(cls, config: Mapping[str, Any]) -> Mapping[str, Any]:
"""Convert input configs to parameters of the future streams
This function is used by unit tests too
"""
Sets defaults to `start_date` and `ignore_pagination`.
"""
return {
"subdomain": config["subdomain"],
"start_date": config.get("start_date", cls.get_default_start_date()),
"auth_type": config.get("auth_type"),
"credentials": config.get("credentials"),
"ignore_pagination": config.get("ignore_pagination", False),
}
Expand Down Expand Up @@ -169,8 +170,7 @@ def check_enterprise_streams(self, declarative_streams: List[Stream]) -> List[St
return all_streams

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
args = self.convert_config_to_declarative_stream_args(config)
declarative_streams = super().streams(args)
declarative_streams = super().streams(config)

nested_streams = self.get_nested_streams(config)
declarative_streams.extend(nested_streams)
Expand Down

0 comments on commit 7d2baea

Please sign in to comment.