Skip to content

Commit

Permalink
Merge branch 'master' into augustin/connectors-ci/change-pytest-step-…
Browse files Browse the repository at this point in the history
…result-evaluation
  • Loading branch information
alafanechere committed Jul 27, 2023
2 parents 650793f + 1658ecc commit e3eae0d
Show file tree
Hide file tree
Showing 59 changed files with 1,397 additions and 131 deletions.
5 changes: 5 additions & 0 deletions .github/actions/run-dagger-pipeline/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ inputs:
description: "Bucket name for metadata service"
required: false
default: "prod-airbyte-cloud-connector-metadata-service"
sentry_dsn:
description: "Sentry DSN"
required: false
spec_cache_bucket_name:
description: "Bucket name for GCS spec cache"
required: false
Expand Down Expand Up @@ -89,6 +92,7 @@ runs:
shell: bash
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/pipelines/
- name: Run airbyte-ci
shell: bash
Expand All @@ -110,6 +114,7 @@ runs:
METADATA_SERVICE_GCS_CREDENTIALS: ${{ inputs.metadata_service_gcs_credentials }}
PRODUCTION: ${{ inputs.production }}
PULL_REQUEST_NUMBER: ${{ github.event.pull_request.number }}
SENTRY_DSN: ${{ inputs.sentry_dsn }}
SLACK_WEBHOOK: ${{ inputs.slack_webhook_url }}
SPEC_CACHE_BUCKET_NAME: ${{ inputs.spec_cache_bucket_name }}
SPEC_CACHE_GCS_CREDENTIALS: ${{ inputs.spec_cache_gcs_credentials }}
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/airbyte_ci_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: Airbyte CI pipeline tests

on:
push:
branches:
- !master
paths:
- "airbyte-ci/"
1 change: 1 addition & 0 deletions .github/workflows/connector-performance-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ jobs:
- name: Install CI scripts
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/ci_credentials
pipx install airbyte-ci/connectors/connector_ops
- name: Source or Destination harness
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/connector_metadata_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
- name: Install ci-connector-ops package
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/connector_ops/
- name: Check test strictness level
run: check-test-strictness-level
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/connector_teams_review_requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- name: Install ci-connector-ops package
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/connector_ops
- name: Write review requirements file
id: write-review-requirements-file
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/legacy-publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,14 @@ jobs:
with:
python-version: "3.10"
- name: Install CI scripts
shell: bash
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/ci_credentials
pipx install airbyte-ci/connectors/connector_ops
- name: Write Integration Test Credentials for ${{ matrix.connector }}
shell: bash
run: |
ci_credentials ${{ matrix.connector }} write-to-storage
# normalization also runs destination-specific tests, so fetch their creds also
Expand All @@ -267,6 +270,7 @@ jobs:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
- name: Set Name and Version Environment Vars
if: startsWith(matrix.connector, 'connectors')
shell: bash
run: |
source tools/lib/lib.sh
DOCKERFILE=airbyte-integrations/${{ matrix.connector }}/Dockerfile
Expand All @@ -282,6 +286,7 @@ jobs:
- name: Run QA checks for ${{ matrix.connector }}
id: qa_checks
if: always()
shell: bash
run: |
run-qa-checks ${{ matrix.connector }}
- name: Publish ${{ matrix.connector }}
Expand All @@ -300,6 +305,7 @@ jobs:
attempt_delay: 5000 in # ms
- name: Update Integration Test Credentials after test run for ${{ github.event.inputs.connector }}
if: always()
shell: bash
run: |
ci_credentials ${{ matrix.connector }} update-secrets
# normalization also runs destination-specific tests, so fetch their creds also
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/legacy-test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ jobs:
- name: Install CI scripts
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/ci_credentials
pipx install airbyte-ci/connectors/connector_ops
- name: Write Integration Test Credentials for ${{ github.event.inputs.connector }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
metadata_service_gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
slack_webhook_url: ${{ secrets.PUBLISH_ON_MERGE_SLACK_WEBHOOK }}
spec_cache_gcs_credentials: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY_PUBLISH }}
subcommand: "connectors --concurrency=1 --execute-timeout=3600 --modified publish --main-release"
Expand All @@ -53,6 +54,7 @@ jobs:
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
metadata_service_gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
slack_webhook_url: ${{ secrets.PUBLISH_ON_MERGE_SLACK_WEBHOOK }}
spec_cache_gcs_credentials: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY_PUBLISH }}
subcommand: "connectors ${{ github.event.inputs.connectors-options }} publish ${{ github.event.inputs.publish-options }}"
1 change: 1 addition & 0 deletions .github/workflows/test-performance-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ jobs:
- name: Install CI scripts
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/ci_credentials
- name: Write Integration Test Credentials for ${{ github.event.inputs.connector }}
run: |
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.47.2
current_version = 0.47.3
commit = False

[bumpversion:file:setup.py]
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.47.3
Connector Builder: Ensure we return when there are no slices

## 0.47.2
low-code: deduplicate query params if they are already encoded in the URL

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.47.2
RUN pip install --prefix=/install airbyte-cdk==0.47.3

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.47.2
LABEL io.airbyte.version=0.47.3
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
AirbyteMessage,
AirbyteTraceMessage,
ConfiguredAirbyteCatalog,
Level,
OrchestratorType,
TraceType,
)
Expand Down Expand Up @@ -126,15 +125,14 @@ def _get_message_groups(
current_slice_pages: List[StreamReadPages] = []
current_page_request: Optional[HttpRequest] = None
current_page_response: Optional[HttpResponse] = None
had_error = False

while records_count < limit and (message := next(messages, None)):
json_object = self._parse_json(message.log) if message.type == MessageType.LOG else None
if json_object is not None and not isinstance(json_object, dict):
raise ValueError(f"Expected log message to be a dict, got {json_object} of type {type(json_object)}")
json_message: Optional[Dict[str, JsonType]] = json_object
if self._need_to_close_page(at_least_one_page_in_group, message, json_message):
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records, True)
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records)
current_page_request = None
current_page_response = None

Expand Down Expand Up @@ -172,12 +170,9 @@ def _get_message_groups(
current_page_request = self._create_request_from_log_message(json_message)
current_page_response = self._create_response_from_log_message(json_message)
else:
if message.log.level == Level.ERROR:
had_error = True
yield message.log
elif message.type == MessageType.TRACE:
if message.trace.type == TraceType.ERROR:
had_error = True
yield message.trace
elif message.type == MessageType.RECORD:
current_page_records.append(message.record.data)
Expand All @@ -187,8 +182,9 @@ def _get_message_groups(
elif message.type == MessageType.CONTROL and message.control.type == OrchestratorType.CONNECTOR_CONFIG:
yield message.control
else:
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records, validate_page_complete=not had_error)
yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor)
if current_page_request or current_page_response or current_page_records:
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records)
yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor)

@staticmethod
def _need_to_close_page(at_least_one_page_in_group: bool, message: AirbyteMessage, json_message: Optional[Dict[str, Any]]) -> bool:
Expand Down Expand Up @@ -224,15 +220,10 @@ def _is_auxiliary_http_request(message: Optional[Dict[str, Any]]) -> bool:
return is_http and message.get("http", {}).get("is_auxiliary", False)

@staticmethod
def _close_page(current_page_request: Optional[HttpRequest], current_page_response: Optional[HttpResponse], current_slice_pages: List[StreamReadPages], current_page_records: List[Mapping[str, Any]], validate_page_complete: bool) -> None:
def _close_page(current_page_request: Optional[HttpRequest], current_page_response: Optional[HttpResponse], current_slice_pages: List[StreamReadPages], current_page_records: List[Mapping[str, Any]]) -> None:
"""
Close a page when parsing message groups
@param validate_page_complete: in some cases, we expect the CDK to not return a response. As of today, this will only happen before
an uncaught exception and therefore, the assumption is that `validate_page_complete=True` only on the last page that is being closed
"""
if validate_page_complete and (not current_page_request or not current_page_response):
raise ValueError("Every message grouping should have at least one request and response")

current_slice_pages.append(
StreamReadPages(request=current_page_request, response=current_page_response, records=deepcopy(current_page_records)) # type: ignore
)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.47.2",
version="0.47.3",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,7 @@ def check_config_against_spec(self):
response = read_stream(source, TEST_READ_CONFIG, ConfiguredAirbyteCatalog.parse_obj(CONFIGURED_CATALOG), limits)

expected_stream_read = StreamRead(logs=[LogMessage("error_message - a stack trace", "ERROR")],
slices=[StreamReadSlices(
pages=[StreamReadPages(records=[], request=None, response=None)],
slice_descriptor=None, state=None)],
slices=[],
test_read_limit_reached=False,
auxiliary_requests=[],
inferred_schema=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,25 +367,6 @@ def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None:
assert actual_page == expected_pages[i]


@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read')
def test_get_grouped_messages_invalid_group_format(mock_entrypoint_read: Mock) -> None:
response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'}

mock_source = make_mock_source(mock_entrypoint_read, iter(
[
response_log_message(response),
record_message("hashiras", {"name": "Shinobu Kocho"}),
record_message("hashiras", {"name": "Muichiro Tokito"}),
]
)
)

api = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)

with pytest.raises(ValueError):
api.get_message_groups(source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras"))


@pytest.mark.parametrize(
"log_message, expected_response",
[
Expand Down Expand Up @@ -588,7 +569,7 @@ def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_ha


@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read')
def test_given_auxiliary_requests_then_return_global_request(mock_entrypoint_read: Mock) -> None:
def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_read: Mock) -> None:
mock_source = make_mock_source(mock_entrypoint_read, iter(
any_request_and_response_with_a_record() +
[
Expand All @@ -603,6 +584,17 @@ def test_given_auxiliary_requests_then_return_global_request(mock_entrypoint_rea
assert len(stream_read.auxiliary_requests) == 1


@patch('airbyte_cdk.connector_builder.message_grouper.AirbyteEntrypoint.read')
def test_given_no_slices_then_return_empty_slices(mock_entrypoint_read: Mock) -> None:
mock_source = make_mock_source(mock_entrypoint_read, iter([auxiliary_request_log_message()]))
connector_builder_handler = MessageGrouper(MAX_PAGES_PER_SLICE, MAX_SLICES)
stream_read: StreamRead = connector_builder_handler.get_message_groups(
source=mock_source, config=CONFIG, configured_catalog=create_configured_catalog("hashiras")
)

assert len(stream_read.slices) == 0


def make_mock_source(mock_entrypoint_read: Mock, return_value: Iterator[AirbyteMessage]) -> MagicMock:
mock_source = MagicMock()
mock_entrypoint_read.return_value = return_value
Expand Down
11 changes: 10 additions & 1 deletion airbyte-ci/connectors/pipelines/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

from rich.logging import RichHandler

from . import sentry_utils

sentry_utils.initialize()

logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
Expand All @@ -16,6 +20,11 @@
# RichHandler does not work great in the CI
logging_handlers = [logging.StreamHandler()]

logging.basicConfig(level=logging.INFO, format="%(name)s: %(message)s", datefmt="[%X]", handlers=logging_handlers)
logging.basicConfig(
level=logging.INFO,
format="%(name)s: %(message)s",
datefmt="[%X]",
handlers=logging_handlers,
)

main_logger = logging.getLogger(__name__)
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ async def with_airbyte_python_connector_full_dagger(context: ConnectorContext, b
.with_mounted_cache("/root/.cache/pip", pip_cache)
.with_exec(["pip", "install", "--upgrade", "pip"])
.with_exec(["apt-get", "install", "-y", "tzdata"])
.with_file("setup.py", await context.get_connector_dir(include="setup.py").file("setup.py"))
.with_file("setup.py", (await context.get_connector_dir(include="setup.py")).file("setup.py"))
)

for dependency_path in setup_dependencies_to_mount:
Expand All @@ -957,8 +957,8 @@ async def with_airbyte_python_connector_full_dagger(context: ConnectorContext, b
.with_file("/usr/localtime", builder.file("/usr/share/zoneinfo/Etc/UTC"))
.with_new_file("/etc/timezone", "Etc/UTC")
.with_exec(["apt-get", "install", "-y", "bash"])
.with_file("main.py", await context.get_connector_dir(include="main.py").file("main.py"))
.with_directory(snake_case_name, await context.get_connector_dir(include=snake_case_name).directory(snake_case_name))
.with_file("main.py", (await context.get_connector_dir(include="main.py")).file("main.py"))
.with_directory(snake_case_name, (await context.get_connector_dir(include=snake_case_name)).directory(snake_case_name))
.with_env_variable("AIRBYTE_ENTRYPOINT", " ".join(entrypoint))
.with_entrypoint(entrypoint)
.with_label("io.airbyte.version", context.metadata["dockerImageTag"])
Expand Down
2 changes: 2 additions & 0 deletions airbyte-ci/connectors/pipelines/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from connector_ops.utils import console
from dagger import Container, DaggerError, QueryError
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines import sentry_utils
from pipelines.actions import remote_storage
from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify
Expand Down Expand Up @@ -137,6 +138,7 @@ async def run_with_completion(self, completion_event: anyio.Event, *args, **kwar
completion_event.set()
return self._get_timed_out_step_result()

@sentry_utils.with_step_context
async def run(self, *args, **kwargs) -> StepResult:
"""Public method to run the step. It output a step result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def airbyte_ci(
main_logger.info(f"Pipeline Start Timestamp: {pipeline_start_timestamp}")
main_logger.info(f"Modified Files: {ctx.obj['modified_files']}")


airbyte_ci.add_command(connectors)
airbyte_ci.add_command(metadata)

Expand Down
Loading

0 comments on commit e3eae0d

Please sign in to comment.