From 7d2baeac27b05cd0f2b1511aef3e773b044b10d8 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 8 Nov 2024 09:51:55 -0500 Subject: [PATCH] Fix config instantiation for ConcurrentDeclarativeSource to have the right config --- .../source_zendesk_support/source.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py index 9b744703febe..3c551a039c4e 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/source.py @@ -44,9 +44,11 @@ def __init__(self, email: str, password: str): class SourceZendeskSupport(YamlDeclarativeSource): def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs): - if config and not config.get("start_date", None): - config["start_date"] = SourceZendeskSupport.get_default_start_date() - super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"}) + # Before 2024-11-11, the config was being modified in `streams`. We can't do that anymore because `ConcurrentDeclarativeSource` use + # the config to make concurrent components. Hence, the "main" config needs to be the declarative one and if Python sources need + # something a bit different, it needs to deduce it from the declarative config + declarative_config = SourceZendeskSupport.convert_config_to_declarative_stream_args(config) if config else None + super().__init__(catalog=catalog, config=declarative_config, state=state, **{"path_to_yaml": "manifest.yaml"}) @classmethod def get_default_start_date(cls) -> str: @@ -112,13 +114,12 @@ def convert_config2stream_args(cls, config: Mapping[str, Any]) -> Mapping[str, A @classmethod def convert_config_to_declarative_stream_args(cls, config: Mapping[str, Any]) -> Mapping[str, Any]: - """Convert input configs to parameters of the future streams - This function is used by unit tests too + """ + Sets defaults to `start_date` and `ignore_pagination`. """ return { "subdomain": config["subdomain"], "start_date": config.get("start_date", cls.get_default_start_date()), - "auth_type": config.get("auth_type"), "credentials": config.get("credentials"), "ignore_pagination": config.get("ignore_pagination", False), } @@ -169,8 +170,7 @@ def check_enterprise_streams(self, declarative_streams: List[Stream]) -> List[St return all_streams def streams(self, config: Mapping[str, Any]) -> List[Stream]: - args = self.convert_config_to_declarative_stream_args(config) - declarative_streams = super().streams(args) + declarative_streams = super().streams(config) nested_streams = self.get_nested_streams(config) declarative_streams.extend(nested_streams)