Skip to content

Commit

Permalink
airbyte-cdk: only update airbyte-protocol-models to pydantic v2 (#39524)
Browse files Browse the repository at this point in the history
## What

Migrating Pydantic V2 for Protocol Messages to speed up emitting records. This gives us 2.5x boost over V1. 

Close airbytehq/airbyte-internal-issues#8333

## How
- Switch to using protocol models generated for pydantic_v2, in a new (temporary) package, `airbyte-protocol-models-pdv2` .
- Update pydantic dependency of the CDK accordingly to v2.
- For minimal impact, still use the compatibility code `pydantic.v1` in all of our pydantic code from airbyte-cdk that does not interact with the protocol models.

## Review guide
1. Checkout the code and clear your CDK virtual env (either `rm -rf .venv && python -m venv .venv` or `poetry env list; poetry env remove <env>`. This is necessary to fully clean out the `airbyte_protocol` library, for some reason. Then: `poetry lock --no-update && poetry install --all-extras`. This should install the CDK with new models. 
2. Run unit tests on the CDK
3. Take your favorite connector and point it's `pyproject.toml` on local CDK (see example in `source-s3`) and try running it's tests and it's regression tests.

## User Impact

> [!warning]
> This is a major CDK change due to the pydantic dependency change - if connectors use pydantic 1.10, they will break and will need to do similar `from pydantic.v1` updates to get running again. Therefore, we should release this as a major CDK version bump.

## Can this PR be safely reverted and rolled back?
- [x] YES 💚
- [ ] NO ❌

Even if sources migrate to this version, state format should not change, so a revert should be possible.

## Follow up work - Ella to move into issues

<details>

### Source-s3 - turn this into an issue
- [ ] Update source s3 CDK version and any required code changes
- [ ] Fix source-s3 unit tests
- [ ] Run source-s3 regression tests
- [ ] Merge and release source-s3 by June 21st

### Docs
- [ ] Update documentation on how to build with CDK 

### CDK pieces
- [ ] Update file-based CDK format validation to use Pydantic V2
  - This is doable, and requires a breaking change to change `OneOfOptionConfig`. There are a few unhandled test cases that present issues we're unsure of how to handle so far.
- [ ] Update low-code component generators to use Pydantic V2
  - This is doable, there are a few issues around custom component generation that are unhandled.

### Further CDK performance work - create issues for these
- [ ] Research if we can replace prints with buffered output (write to byte buffer and then flush to stdout)
- [ ] Replace `json` with `orjson`
...

</details>
  • Loading branch information
erohmensing authored Jun 20, 2024
1 parent 6d42eca commit fc12432
Show file tree
Hide file tree
Showing 43 changed files with 506 additions and 454 deletions.
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from pydantic import ValidationError
from pydantic import ValidationError as V2ValidationError

logger = logging.getLogger("airbyte")

Expand All @@ -37,7 +37,7 @@ def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[Airbyt
for line in input_stream:
try:
yield AirbyteMessage.parse_raw(line)
except ValidationError:
except V2ValidationError:
logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}")

def _run_write(
Expand Down Expand Up @@ -112,7 +112,7 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)

def run(self, args: List[str]):
def run(self, args: List[str]) -> None:
init_uncaught_exception_handler(logger)
parsed_args = self.parse_args(args)
output_messages = self.run_cmd(parsed_args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import dpath
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from airbyte_cdk.utils.spec_schema_transformations import resolve_refs
from pydantic import BaseModel, Field
from pydantic.v1 import BaseModel, Field


class SeparatorSplitterConfigModel(BaseModel):
Expand Down
2 changes: 0 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@
Boolean,
Date,
Integer,
IntegerEnum,
Model,
Number,
NumberEnum,
String,
TimestampWithoutTimezone,
TimestampWithTimezone,
Expand Down
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Dict

from airbyte_cdk.sources.utils.schema_helpers import expand_refs, rename_key
from pydantic import BaseModel
from pydantic.v1 import BaseModel


class BaseConfig(BaseModel):
Expand All @@ -17,10 +17,10 @@ class BaseConfig(BaseModel):
"""

@classmethod
def schema(cls, *args, **kwargs) -> Dict[str, Any]:
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(*args, **kwargs)
rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
expand_refs(schema)
schema.pop("description", None) # description added from the docstring
return schema
return schema # type: ignore[no-any-return]
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams import Stream
from pydantic import Extra
from pydantic import ConfigDict as V2ConfigDict


class HashableStreamDescriptor(StreamDescriptor):
Expand All @@ -25,9 +25,7 @@ class HashableStreamDescriptor(StreamDescriptor):
freezes its fields so that it be used as a hash key. This is only marked public because we use it outside for unit tests.
"""

class Config:
extra = Extra.allow
frozen = True
model_config = V2ConfigDict(extra="allow", frozen=True)


class ConnectorStateManager:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class DatetimeBasedCursor(DeclarativeCursor):
partition_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None
message_repository: Optional[MessageRepository] = None
is_compare_strictly: bool = False
is_compare_strictly: Optional[bool] = False
cursor_datetime_formats: List[str] = field(default_factory=lambda: [])

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, Extra, Field
from pydantic.v1 import BaseModel, Extra, Field
from typing_extensions import Literal


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.transform import TypeTransformer
from isodate import parse_duration
from pydantic import BaseModel
from pydantic.v1 import BaseModel

ComponentDefinition = Mapping[str, Any]

Expand Down
10 changes: 5 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/spec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional

from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification # type: ignore [attr-defined]
from airbyte_cdk.models.airbyte_protocol import AdvancedAuth, ConnectorSpecification # type: ignore [attr-defined]
from airbyte_cdk.sources.declarative.models.declarative_component_schema import AuthFlow


Expand All @@ -29,14 +29,14 @@ def generate_spec(self) -> ConnectorSpecification:
Returns the connector specification according the spec block defined in the low code connector manifest.
"""

obj: dict[str, Mapping[str, Any] | str | AuthFlow] = {"connectionSpecification": self.connection_specification}
obj: dict[str, Mapping[str, Any] | str | AdvancedAuth] = {"connectionSpecification": self.connection_specification}

if self.documentation_url:
obj["documentationUrl"] = self.documentation_url
if self.advanced_auth:
obj["advanced_auth"] = self.advanced_auth
# Get enum value
obj["advanced_auth"].auth_flow_type = obj["advanced_auth"].auth_flow_type.value # type: ignore # We know this is always assigned to an AuthFlow which has the auth_flow_type field
self.advanced_auth.auth_flow_type = self.advanced_auth.auth_flow_type.value # type: ignore # We know this is always assigned to an AuthFlow which has the auth_flow_type field
# Map CDK AuthFlow model to protocol AdvancedAuth model
obj["advanced_auth"] = AdvancedAuth.parse_obj(self.advanced_auth.dict())

# We remap these keys to camel case because that's the existing format expected by the rest of the platform
return ConnectorSpecification.parse_obj(obj)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import dpath
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.utils import schema_helpers
from pydantic import AnyUrl, BaseModel, Field
from pydantic.v1 import AnyUrl, BaseModel, Field


class AbstractFileBasedSpec(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field
from pydantic.v1 import BaseModel, Field


class AvroFormat(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from typing import Any, Dict, List, Optional, Set, Union

from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field, ValidationError, root_validator, validator
from pydantic.v1 import BaseModel, Field, root_validator, validator
from pydantic.v1.error_wrappers import ValidationError


class InferenceType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema
from pydantic import BaseModel, Field, validator
from pydantic.v1 import BaseModel, Field, validator

PrimaryKeyType = Optional[Union[str, List[str]]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field
from pydantic.v1 import BaseModel, Field


class JsonlFormat(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field
from pydantic.v1 import BaseModel, Field


class ParquetFormat(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import List, Literal, Optional, Union

from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic import BaseModel, Field
from pydantic.v1 import BaseModel, Field


class LocalProcessingConfigModel(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
from airbyte_cdk.utils.analytics_message import create_analytics_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from pydantic.error_wrappers import ValidationError
from pydantic.v1.error_wrappers import ValidationError

DEFAULT_CONCURRENCY = 100
MAX_CONCURRENCY = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def _to_output_value(avro_format: AvroFormat, record_type: Mapping[str, Any], re
return str(record_value)
elif record_type.get("logicalType") == "date":
return record_value.isoformat()
elif record_type.get("logicalType") == "timestamp-millis":
return record_value.isoformat(sep="T", timespec="milliseconds")
elif record_type.get("logicalType") == "timestamp-micros":
return record_value.isoformat(sep="T", timespec="microseconds")
elif record_type.get("logicalType") == "local-timestamp-millis":
return record_value.isoformat(sep="T", timespec="milliseconds")
elif record_type.get("logicalType") == "local-timestamp-micros":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat

if pa.types.is_decimal(parquet_value.type):
if parquet_format.decimal_as_float:
return parquet_value.as_py()
return float(parquet_value.as_py())
else:
return str(parquet_value.as_py())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime
from typing import Optional

from pydantic import BaseModel
from pydantic.v1 import BaseModel


class RemoteFile(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from jsonschema import RefResolver, validate
from jsonschema.exceptions import ValidationError
from pydantic import BaseModel, Field
from pydantic.v1 import BaseModel, Field


class JsonFileLoader:
Expand Down Expand Up @@ -187,7 +187,7 @@ class InternalConfig(BaseModel):
def dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
kwargs["by_alias"] = True
kwargs["exclude_unset"] = True
return super().dict(*args, **kwargs)
return super().dict(*args, **kwargs) # type: ignore[no-any-return]

def is_limit_reached(self, records_counter: int) -> bool:
"""
Expand Down
14 changes: 7 additions & 7 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from typing import Any, Dict, Optional, Type

from airbyte_cdk.sources.utils.schema_helpers import expand_refs
from pydantic import BaseModel, Extra
from pydantic.main import ModelMetaclass
from pydantic.typing import resolve_annotations
from pydantic.v1 import BaseModel, Extra
from pydantic.v1.main import ModelMetaclass
from pydantic.v1.typing import resolve_annotations


class AllOptional(ModelMetaclass):
Expand All @@ -28,7 +28,7 @@ class MyModel(BaseModel):
It would make code more clear and eliminate a lot of manual work.
"""

def __new__(mcs, name, bases, namespaces, **kwargs):
def __new__(mcs, name, bases, namespaces, **kwargs): # type: ignore[no-untyped-def] # super().__new__ is also untyped
"""
Iterate through fields and wrap then with typing.Optional type.
"""
Expand All @@ -37,7 +37,7 @@ def __new__(mcs, name, bases, namespaces, **kwargs):
annotations = {**annotations, **getattr(base, "__annotations__", {})}
for field in annotations:
if not field.startswith("__"):
annotations[field] = Optional[annotations[field]]
annotations[field] = Optional[annotations[field]] # type: ignore[assignment]
namespaces["__annotations__"] = annotations
return super().__new__(mcs, name, bases, namespaces, **kwargs)

Expand Down Expand Up @@ -77,8 +77,8 @@ def schema_extra(cls, schema: Dict[str, Any], model: Type[BaseModel]) -> None:
prop["oneOf"] = [{"type": "null"}, {"$ref": ref}]

@classmethod
def schema(cls, *args, **kwargs) -> Dict[str, Any]:
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
"""We're overriding the schema classmethod to enable some post-processing"""
schema = super().schema(*args, **kwargs)
expand_refs(schema)
return schema
return schema # type: ignore[no-any-return]
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
TraceType,
Type,
)
from pydantic.error_wrappers import ValidationError
from pydantic import ValidationError as V2ValidationError


class EntrypointOutput:
def __init__(self, messages: List[str], uncaught_exception: Optional[BaseException] = None):
try:
self._messages = [self._parse_message(message) for message in messages]
except ValidationError as exception:
except V2ValidationError as exception:
raise ValueError("All messages are expected to be AirbyteMessage") from exception

if uncaught_exception:
Expand All @@ -53,7 +53,7 @@ def __init__(self, messages: List[str], uncaught_exception: Optional[BaseExcepti
def _parse_message(message: str) -> AirbyteMessage:
try:
return AirbyteMessage.parse_obj(json.loads(message))
except (json.JSONDecodeError, ValidationError):
except (json.JSONDecodeError, V2ValidationError):
# The platform assumes that logs that are not of AirbyteMessage format are log messages
return AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=message))

Expand Down
5 changes: 5 additions & 0 deletions airbyte-cdk/python/bin/generate-component-manifest-dagger.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#!/usr/bin/env bash

# We need to run this script in a docker container because we need to use a
# specific version of datamodel-codegen that generates pydantic v1 models (correctly).
# The newer datamodel-codegen's "pydantic v1" models are different than those v1 models
# generated by the older version of datamodel-codegen.

set -e

pip install dagger-io==0.9.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def post_process_codegen(codegen_container: dagger.Container):
if generated_file.endswith(".py"):
original_content = await codegen_container.file(f"/generated/{generated_file}").contents()
# the space before _parameters is intentional to avoid replacing things like `request_parameters:` with `requestparameters:`
post_processed_content = original_content.replace(" _parameters:", " parameters:")
post_processed_content = original_content.replace(" _parameters:", " parameters:").replace("from pydantic", "from pydantic.v1")
codegen_container = codegen_container.with_new_file(
f"/generated_post_processed/{generated_file}", contents=post_processed_content
)
Expand Down
Loading

0 comments on commit fc12432

Please sign in to comment.