Skip to content

Commit

Permalink
Adds integration tests and converts types to use Type and Orchestrato…
Browse files Browse the repository at this point in the history
…rType classes
  • Loading branch information
tcboles committed Nov 5, 2024
1 parent 957f7eb commit 99a4c44
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
7 changes: 4 additions & 3 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from airbyte_protocol.models import (
AirbyteMessage,
ConnectorSpecification,
OrchestratorType,
Status,
TraceType,
Type,
Expand Down Expand Up @@ -384,11 +385,11 @@ def _peek_airbyte_message(
return

if (
message.type == "CONTROL"
and message.control.type == "CONNECTOR_CONFIG"
message.type == Type.CONTROL
and message.control.type == OrchestratorType.CONNECTOR_CONFIG
and self.config_change_callback is not None
):
self.config_change_callback(message.control.config)
self.config_change_callback(message.control.connectorConfig.config)
return

def _execute(
Expand Down
87 changes: 87 additions & 0 deletions tests/integration_tests/test_config_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""Integration tests which test destination capabilities using the JSONL destination (docker-based)."""


from __future__ import annotations

import pytest
from unittest.mock import patch
from airbyte import get_source, get_destination
from airbyte.destinations.base import Destination
from airbyte_protocol.models import AirbyteMessage, Type, AirbyteControlMessage, OrchestratorType, AirbyteControlConnectorConfigMessage


def config_change_callback(config: dict[str, Any]) -> None:
print(f"Updated config: {config}")

@pytest.fixture
def new_duckdb_destination() -> Destination:
"""Return a new JSONL destination."""
return get_destination(
name="destination-duckdb",
config={
# This path is relative to the container:
"destination_path": "/local/temp/db.duckdb",
},
config_change_callback=config_change_callback
)


@pytest.fixture
def new_source_faker() -> Source:
return get_source(
"source-faker",
config={
"count": 100,
"seed": 1234,
"parallelism": 16,
},
install_if_missing=True,
streams=["products"],
config_change_callback=config_change_callback,
)


def test_source_config_callback(
new_duckdb_destination: Destination,
new_source_faker: Source,
) -> None:
with patch.object(new_source_faker, 'config_change_callback') as mock_config_change_callback:
updated_config = {
"count": 1000,
"seed": 1234,
"parallelism": 16,
}
airbyte_source_control_message = AirbyteMessage(
type=Type.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=0,
connectorConfig=AirbyteControlConnectorConfigMessage(config=updated_config),
),
)

new_source_faker._peek_airbyte_message(airbyte_source_control_message)
mock_config_change_callback.assert_called_once_with(updated_config)


def test_destination_config_callback(
new_duckdb_destination: Destination,
new_source_faker: Source,
) -> None:
with patch.object(new_duckdb_destination, 'config_change_callback') as mock_config_change_callback:
updated_config = {
"destination_path": "/local/temp/db.duckdb",
}
airbyte_destination_control_message = AirbyteMessage(
type=Type.CONTROL,
control=AirbyteControlMessage(
type=OrchestratorType.CONNECTOR_CONFIG,
emitted_at=0,
connectorConfig=AirbyteControlConnectorConfigMessage(config=updated_config),
),
)

new_duckdb_destination._peek_airbyte_message(airbyte_destination_control_message)
mock_config_change_callback.assert_called_once_with(updated_config)

0 comments on commit 99a4c44

Please sign in to comment.