Skip to content

Commit

Permalink
Instantiate ConcurrentDeclarativeSource with args for remote manifest…
Browse files Browse the repository at this point in the history
… sources
  • Loading branch information
brianjlai committed Nov 5, 2024
1 parent 0bb74a0 commit 3834113
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations

import json
Expand All @@ -11,21 +12,21 @@
from pathlib import Path
from typing import Any, List, Mapping, Optional

from airbyte_cdk import TState
from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import (
AirbyteErrorTraceMessage,
AirbyteMessage,
AirbyteMessageSerializer,
AirbyteStateMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecificationSerializer,
TraceType,
Type,
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.concurrent_declarative_source import ConcurrentDeclarativeSource
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.source import TState
from orjson import orjson


Expand Down Expand Up @@ -61,16 +62,10 @@ def handle_command(args: List[str]) -> None:
handle_remote_manifest_command(args)


def _get_local_yaml_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
def _get_local_yaml_source(args: List[str]) -> SourceLocalYaml:
try:
return SourceLocalYaml(
SourceLocalYaml.read_catalog(catalog_path) if catalog_path else None,
SourceLocalYaml.read_config(config_path) if config_path else None,
SourceLocalYaml.read_state(state_path) if state_path else None,
)
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
return SourceLocalYaml(config=config, catalog=catalog, state=state)
except Exception as error:
print(
orjson.dumps(
Expand Down Expand Up @@ -112,24 +107,56 @@ def handle_remote_manifest_command(args: List[str]) -> None:
message = AirbyteMessage(type=Type.SPEC, spec=spec)
print(AirbyteEntrypoint.airbyte_message_to_string(message))
else:
source = create_manifest(args)
source = create_declarative_source(args)
launch(source, args)


def create_manifest(args: List[str]) -> ManifestDeclarativeSource:
def create_declarative_source(args: List[str]) -> ConcurrentDeclarativeSource:
"""Creates the source with the injected config.
This essentially does what other low-code sources do at build time, but at runtime,
with a user-provided manifest in the config. This better reflects what happens in the
connector builder.
"""
parsed_args = AirbyteEntrypoint.parse_args(args)
config = BaseConnector.read_config(parsed_args.config)
if "__injected_declarative_manifest" not in config:
raise ValueError(
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
try:
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
if "__injected_declarative_manifest" not in config:
raise ValueError(
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
)
return ConcurrentDeclarativeSource(
config=config, catalog=catalog, state=state, source_config=config.get("__injected_declarative_manifest")
)
return ManifestDeclarativeSource(config.get("__injected_declarative_manifest"))
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
)
)
).decode()
)
raise error


def _parse_inputs_into_config_catalog_state(
args: List[str],
) -> (Optional[Mapping[str, Any]], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]):
parsed_args = AirbyteEntrypoint.parse_args(args)
config = ConcurrentDeclarativeSource.read_config(parsed_args.config) if hasattr(parsed_args, "config") else None
catalog = ConcurrentDeclarativeSource.read_catalog(parsed_args.catalog) if hasattr(parsed_args, "catalog") else None
state = ConcurrentDeclarativeSource.read_state(parsed_args.state) if hasattr(parsed_args, "state") else []

return config, catalog, state


def run():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
from unittest.mock import patch

import pytest
Expand All @@ -10,28 +11,35 @@

POKEAPI_JSON_SPEC_SUBSTRING = '"required":["pokemon_name"]'
SUCCESS_CHECK_SUBSTRING = '"connectionStatus":{"status":"SUCCEEDED"}'
FAILED_CHECK_SUBSTRING = '"connectionStatus":{"status":"FAILED"}'


@pytest.fixture(autouse=True)
def setup(valid_local_manifest_yaml):
with patch('source_declarative_manifest.run._is_local_manifest_command', return_value=True):
with patch('source_declarative_manifest.run.YamlDeclarativeSource._read_and_parse_yaml_file', return_value=valid_local_manifest_yaml):
yield


def test_spec_is_poke_api(capsys):
run.handle_command(["spec"])
stdout = capsys.readouterr()
assert POKEAPI_JSON_SPEC_SUBSTRING in stdout.out


def test_invalid_yaml_throws(capsys, invalid_local_manifest_yaml):
with patch('source_declarative_manifest.run.YamlDeclarativeSource._read_and_parse_yaml_file', return_value=invalid_local_manifest_yaml):
with pytest.raises(ValidationError):
run.handle_command(["spec"])

def test_given_invalid_config_then_raise_value_error(invalid_local_config_file):
with pytest.raises(ValueError):
run.create_manifest(["check", "--config", str(invalid_local_config_file)])

def test_given_invalid_config_then_raise_value_error(capsys, valid_local_config_file):
def test_given_invalid_config_then_unsuccessful_check(capsys, invalid_local_config_file):
run.handle_command(["check", "--config", str(invalid_local_config_file)])
stdout = capsys.readouterr()
assert json.loads(stdout.out).get("connectionStatus").get("status") == "FAILED"


def test_given_valid_config_with_successful_check(capsys, valid_local_config_file):
run.handle_command(["check", "--config", str(valid_local_config_file)])
stdout = capsys.readouterr()
assert SUCCESS_CHECK_SUBSTRING in stdout.out
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from source_declarative_manifest.run import create_manifest, handle_command
from source_declarative_manifest.run import create_declarative_source, handle_command

REMOTE_MANIFEST_SPEC_SUBSTRING = '"required":["__injected_declarative_manifest"]'

Expand All @@ -17,9 +17,9 @@ def test_spec_does_not_raise_value_error(capsys):

def test_given_no_injected_declarative_manifest_then_raise_value_error(invalid_remote_config):
with pytest.raises(ValueError):
create_manifest(["check", "--config", str(invalid_remote_config)])
create_declarative_source(["check", "--config", str(invalid_remote_config)])


def test_given_injected_declarative_manifest_then_return_declarative_manifest(valid_remote_config):
source = create_manifest(["check", "--config", str(valid_remote_config)])
source = create_declarative_source(["check", "--config", str(valid_remote_config)])
assert isinstance(source, ManifestDeclarativeSource)

0 comments on commit 3834113

Please sign in to comment.