Skip to content

Commit

Permalink
fix(airbyte-cdk): fix declarative schema refs (#42844)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <[email protected]>
  • Loading branch information
artem1205 authored Jul 29, 2024
1 parent b34d531 commit 328be4b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ definitions:
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
"$ref": "#/definitions/Decoder"
"$ref": "#/definitions/JsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,6 @@ class WaitUntilTimeFromHeader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class Decoder(BaseModel):
__root__: Any


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
Expand Down Expand Up @@ -1040,7 +1036,7 @@ class CursorPagination(BaseModel):
],
title='Stop Condition',
)
decoder: Optional[Decoder] = Field(
decoder: Optional[JsonDecoder] = Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import sys
from copy import deepcopy
from pathlib import Path
from typing import Any, List, Mapping
from unittest.mock import call, patch

Expand Down Expand Up @@ -1261,3 +1262,43 @@ def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMess
]
)
return list(source.read(logger, {}, catalog, {}))


def test_declarative_component_schema_valid_ref_links():
def load_yaml(file_path) -> Mapping[str, Any]:
with open(file_path, 'r') as file:
return yaml.safe_load(file)

def extract_refs(data, base_path='#') -> List[str]:
refs = []
if isinstance(data, dict):
for key, value in data.items():
if key == '$ref' and isinstance(value, str) and value.startswith('#'):
ref_path = value
refs.append(ref_path)
else:
refs.extend(extract_refs(value, base_path))
elif isinstance(data, list):
for item in data:
refs.extend(extract_refs(item, base_path))
return refs

def resolve_pointer(data: Mapping[str, Any], pointer: str) -> bool:
parts = pointer.split('/')[1:] # Skip the first empty part due to leading '#/'
current = data
try:
for part in parts:
part = part.replace('~1', '/').replace('~0', '~') # Unescape JSON Pointer
current = current[part]
return True
except (KeyError, TypeError):
return False

def validate_refs(yaml_file: str) -> List[str]:
data = load_yaml(yaml_file)
refs = extract_refs(data)
invalid_refs = [ref for ref in refs if not resolve_pointer(data, ref.replace('#', ''))]
return invalid_refs

yaml_file_path = Path(__file__).resolve().parent.parent.parent.parent / 'airbyte_cdk/sources/declarative/declarative_component_schema.yaml'
assert not validate_refs(yaml_file_path)

0 comments on commit 328be4b

Please sign in to comment.