diff --git a/airbyte/_batch_handles.py b/airbyte/_batch_handles.py new file mode 100644 index 00000000..289d19e3 --- /dev/null +++ b/airbyte/_batch_handles.py @@ -0,0 +1,84 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Batch handle class.""" + +from __future__ import annotations + +from contextlib import suppress +from typing import IO, TYPE_CHECKING, Callable + + +if TYPE_CHECKING: + from pathlib import Path + + +class BatchHandle: + """A handle for a batch of records.""" + + def __init__( + self, + stream_name: str, + batch_id: str, + files: list[Path], + file_opener: Callable[[Path], IO[bytes]], + ) -> None: + """Initialize the batch handle.""" + self._stream_name = stream_name + self._batch_id = batch_id + self._files = files + self._record_count = 0 + assert self._files, "A batch must have at least one file." + self._open_file_writer: IO[bytes] = file_opener(self._files[0]) + + # Marker for whether the batch has been finalized. + self.finalized: bool = False + + @property + def files(self) -> list[Path]: + """Return the files.""" + return self._files + + @property + def batch_id(self) -> str: + """Return the batch ID.""" + return self._batch_id + + @property + def stream_name(self) -> str: + """Return the stream name.""" + return self._stream_name + + @property + def record_count(self) -> int: + """Return the record count.""" + return self._record_count + + def increment_record_count(self) -> None: + """Increment the record count.""" + self._record_count += 1 + + @property + def open_file_writer(self) -> IO[bytes] | None: + """Return the open file writer, if any, or None.""" + return self._open_file_writer + + def close_files(self) -> None: + """Close the file writer.""" + if self.open_file_writer is None: + return + + with suppress(Exception): + self.open_file_writer.close() + + def delete_files(self) -> None: + """Delete the files. + + If any files are open, they will be closed first. + If any files are missing, they will be ignored. + """ + self.close_files() + for file in self.files: + file.unlink(missing_ok=True) + + def __del__(self) -> None: + """Upon deletion, close the file writer.""" + self.close_files() diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index bde69494..754c7eb0 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -10,16 +10,11 @@ from __future__ import annotations import abc -import contextlib import io import sys from collections import defaultdict from typing import TYPE_CHECKING, Any, cast, final -import pandas as pd -import pyarrow as pa -import ulid - from airbyte_protocol.models import ( AirbyteMessage, AirbyteRecordMessage, @@ -32,35 +27,32 @@ ) from airbyte import exceptions as exc -from airbyte._util import protocol_util from airbyte.caches.base import CacheBase -from airbyte.progress import progress from airbyte.strategies import WriteStrategy -from airbyte.types import _get_pyarrow_type if TYPE_CHECKING: - from collections.abc import Generator, Iterable, Iterator + from collections.abc import Iterable, Iterator + from airbyte._batch_handles import BatchHandle from airbyte.caches._catalog_manager import CatalogManager -DEFAULT_BATCH_SIZE = 10_000 DEBUG_MODE = False # Set to True to enable additional debug logging. -class BatchHandle: - pass - - class AirbyteMessageParsingError(Exception): """Raised when an Airbyte message is invalid or cannot be parsed.""" class RecordProcessor(abc.ABC): - """Abstract base class for classes which can process input records.""" + """Abstract base class for classes which can process Airbyte messages from a source. - skip_finalize_step: bool = False + This class is responsible for all aspects of handling Airbyte protocol. + + The class leverages the cache's catalog manager class to store and retrieve metadata. + + """ def __init__( self, @@ -81,9 +73,6 @@ def __init__( self.source_catalog: ConfiguredAirbyteCatalog | None = None self._source_name: str | None = None - self._pending_batches: dict[str, dict[str, Any]] = defaultdict(dict, {}) - self._finalized_batches: dict[str, dict[str, Any]] = defaultdict(dict, {}) - self._pending_state_messages: dict[str, list[AirbyteStateMessage]] = defaultdict(list, {}) self._finalized_state_messages: dict[ str, @@ -120,17 +109,13 @@ def register_source( def process_stdin( self, write_strategy: WriteStrategy = WriteStrategy.AUTO, - *, - max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Process the input stream from stdin. Return a list of summaries for testing. """ input_stream = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8") - self.process_input_stream( - input_stream, write_strategy=write_strategy, max_batch_size=max_batch_size - ) + self.process_input_stream(input_stream, write_strategy=write_strategy) @final def _airbyte_messages_from_buffer( @@ -145,8 +130,6 @@ def process_input_stream( self, input_stream: io.TextIOBase, write_strategy: WriteStrategy = WriteStrategy.AUTO, - *, - max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Parse the input stream and process data in batches. @@ -156,16 +139,26 @@ def process_input_stream( self.process_airbyte_messages( messages, write_strategy=write_strategy, - max_batch_size=max_batch_size, ) + @abc.abstractmethod + def process_record_message( + self, + record_msg: AirbyteRecordMessage, + ) -> None: + """Write a record to the cache. + + This method is called for each record message. + + In most cases, the SQL processor will not perform any action, but will pass this along to to + the file processor. + """ + @final def process_airbyte_messages( self, messages: Iterable[AirbyteMessage], write_strategy: WriteStrategy, - *, - max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Process a stream of Airbyte messages.""" if not isinstance(write_strategy, WriteStrategy): @@ -174,20 +167,11 @@ def process_airbyte_messages( context={"write_strategy": write_strategy}, ) - stream_batches: dict[str, list[dict]] = defaultdict(list, {}) # Process messages, writing to batches as we go for message in messages: if message.type is Type.RECORD: record_msg = cast(AirbyteRecordMessage, message.record) - stream_name = record_msg.stream - stream_batch = stream_batches[stream_name] - stream_batch.append(protocol_util.airbyte_record_message_to_dict(record_msg)) - if len(stream_batch) >= max_batch_size: - batch_df = pd.DataFrame(stream_batch) - record_batch = pa.Table.from_pandas(batch_df) - self._process_batch(stream_name, record_batch) - progress.log_batch_written(stream_name, len(stream_batch)) - stream_batch.clear() + self.process_record_message(record_msg) elif message.type is Type.STATE: state_msg = cast(AirbyteStateMessage, message.state) @@ -203,150 +187,44 @@ def process_airbyte_messages( # Type.LOG, Type.TRACE, Type.CONTROL, etc. pass - # We are at the end of the stream. Process whatever else is queued. - for stream_name, stream_batch in stream_batches.items(): - batch_df = pd.DataFrame(stream_batch) - record_batch = pa.Table.from_pandas(batch_df) - self._process_batch(stream_name, record_batch) - progress.log_batch_written(stream_name, len(stream_batch)) - - all_streams = list(self._pending_batches.keys()) - # Add empty streams to the streams list, so we create a destination table for it - for stream_name in self.expected_streams: - if stream_name not in all_streams: - if DEBUG_MODE: - print(f"Stream {stream_name} has no data") - all_streams.append(stream_name) - - # Finalize any pending batches - for stream_name in all_streams: - self._finalize_batches(stream_name, write_strategy=write_strategy) - progress.log_stream_finalized(stream_name) - - @final - def _process_batch( - self, - stream_name: str, - record_batch: pa.Table, - ) -> tuple[str, Any, Exception | None]: - """Process a single batch. - - Returns a tuple of the batch ID, batch handle, and an exception if one occurred. - """ - batch_id = self._new_batch_id() - batch_handle = self._write_batch( - stream_name, - batch_id, - record_batch, - ) or self._get_batch_handle(stream_name, batch_id) + self.write_all_stream_data( + write_strategy=write_strategy, + ) - if self.skip_finalize_step: - self._finalized_batches[stream_name][batch_id] = batch_handle - else: - self._pending_batches[stream_name][batch_id] = batch_handle + # Clean up files, if requested. + if self.cache.cleanup: + self.cleanup_all() - return batch_id, batch_handle, None + def write_all_stream_data(self, write_strategy: WriteStrategy) -> None: + """Finalize any pending writes.""" + for stream_name in self.expected_streams: + self.write_stream_data(stream_name, write_strategy=write_strategy) @abc.abstractmethod - def _write_batch( - self, - stream_name: str, - batch_id: str, - record_batch: pa.Table, - ) -> BatchHandle: - """Process a single batch. - - Returns a batch handle, such as a path or any other custom reference. - """ - - def _cleanup_batch( # noqa: B027 # Intentionally empty, not abstract - self, - stream_name: str, - batch_id: str, - batch_handle: BatchHandle, - ) -> None: - """Clean up the cache. - - This method is called after the given batch has been finalized. - - For instance, file writers can override this method to delete the files created. Caches, - similarly, can override this method to delete any other temporary artifacts. - """ - pass - - def _new_batch_id(self) -> str: - """Return a new batch handle.""" - return str(ulid.ULID()) - - def _get_batch_handle( - self, - stream_name: str, - batch_id: str | None = None, # ULID of the batch - ) -> str: - """Return a new batch handle. - - By default this is a concatenation of the stream name and batch ID. - However, any Python object can be returned, such as a Path object. - """ - batch_id = batch_id or self._new_batch_id() - return f"{stream_name}_{batch_id}" - - def _finalize_batches( + def write_stream_data( self, stream_name: str, write_strategy: WriteStrategy, - ) -> dict[str, BatchHandle]: - """Finalize all uncommitted batches. - - Returns a mapping of batch IDs to batch handles, for processed batches. - - This is a generic implementation, which can be overridden. - """ - _ = write_strategy # Unused - with self._finalizing_batches(stream_name) as batches_to_finalize: - if batches_to_finalize and not self.skip_finalize_step: - raise NotImplementedError( - "Caches need to be finalized but no _finalize_batch() method " - f"exists for class {self.__class__.__name__}", - ) + ) -> list[BatchHandle]: + """Write pending stream data to the cache.""" + ... - return batches_to_finalize - - @abc.abstractmethod def _finalize_state_messages( self, stream_name: str, state_messages: list[AirbyteStateMessage], ) -> None: - """Handle state messages. - Might be a no-op if the processor doesn't handle incremental state.""" - pass - - @final - @contextlib.contextmanager - def _finalizing_batches( - self, - stream_name: str, - ) -> Generator[dict[str, BatchHandle], str, None]: - """Context manager to use for finalizing batches, if applicable. - - Returns a mapping of batch IDs to batch handles, for those processed batches. - """ - batches_to_finalize = self._pending_batches[stream_name].copy() - state_messages_to_finalize = self._pending_state_messages[stream_name].copy() - self._pending_batches[stream_name].clear() - self._pending_state_messages[stream_name].clear() - - progress.log_batches_finalizing(stream_name, len(batches_to_finalize)) - yield batches_to_finalize - self._finalize_state_messages(stream_name, state_messages_to_finalize) - progress.log_batches_finalized(stream_name, len(batches_to_finalize)) - - self._finalized_batches[stream_name].update(batches_to_finalize) - self._finalized_state_messages[stream_name] += state_messages_to_finalize - - for batch_id, batch_handle in batches_to_finalize.items(): - self._cleanup_batch(stream_name, batch_id, batch_handle) + """Handle state messages by passing them to the catalog manager.""" + if not self._catalog_manager: + raise exc.AirbyteLibInternalError( + message="Catalog manager should exist but does not.", + ) + if state_messages and self._source_name: + self._catalog_manager.save_state( + source_name=self._source_name, + stream_name=stream_name, + state=state_messages[-1], + ) def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract """Create the database. @@ -356,30 +234,12 @@ def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract """ pass - def _teardown(self) -> None: - """Teardown the processor resources. - - By default, the base implementation simply calls _cleanup_batch() for all pending batches. - """ - for stream_name, pending_batches in self._pending_batches.items(): - for batch_id, batch_handle in pending_batches.items(): - self._cleanup_batch( - stream_name=stream_name, - batch_id=batch_id, - batch_handle=batch_handle, - ) - - @final - def __del__(self) -> None: - """Teardown temporary resources when instance is unloaded from memory.""" - self._teardown() - @final def _get_stream_config( self, stream_name: str, ) -> ConfiguredAirbyteStream: - """Return the column definitions for the given stream.""" + """Return the definition of the given stream.""" if not self._catalog_manager: raise exc.AirbyteLibInternalError( message="Catalog manager should exist but does not.", @@ -395,16 +255,9 @@ def _get_stream_json_schema( """Return the column definitions for the given stream.""" return self._get_stream_config(stream_name).stream.json_schema - def _get_stream_pyarrow_schema( - self, - stream_name: str, - ) -> pa.Schema: - """Return the column definitions for the given stream.""" - return pa.schema( - fields=[ - pa.field(prop_name, _get_pyarrow_type(prop_def)) - for prop_name, prop_def in self._get_stream_json_schema(stream_name)[ - "properties" - ].items() - ] - ) + def cleanup_all(self) -> None: # noqa: B027 # Intentionally empty, not abstract + """Clean up all resources. + + The default implementation is a no-op. + """ + pass diff --git a/airbyte/_processors/file/__init__.py b/airbyte/_processors/file/__init__.py index 26c25484..2ef9b9a4 100644 --- a/airbyte/_processors/file/__init__.py +++ b/airbyte/_processors/file/__init__.py @@ -3,14 +3,13 @@ from __future__ import annotations -from .base import FileWriterBase, FileWriterBatchHandle -from .jsonl import JsonlWriter -from .parquet import ParquetWriter +from airbyte._batch_handles import BatchHandle +from airbyte._processors.file.base import FileWriterBase +from airbyte._processors.file.jsonl import JsonlWriter __all__ = [ - "FileWriterBatchHandle", + "BatchHandle", "FileWriterBase", "JsonlWriter", - "ParquetWriter", ] diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index 9954008f..96cc6ab8 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -4,72 +4,182 @@ from __future__ import annotations import abc -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, cast, final +from collections import defaultdict +from pathlib import Path +from typing import IO, TYPE_CHECKING, final -from overrides import overrides +import ulid -from airbyte._processors.base import BatchHandle, RecordProcessor +from airbyte import exceptions as exc +from airbyte._batch_handles import BatchHandle +from airbyte._util.protocol_util import airbyte_record_message_to_dict +from airbyte.progress import progress if TYPE_CHECKING: - from pathlib import Path - - import pyarrow as pa - from airbyte_protocol.models import ( - AirbyteStateMessage, + AirbyteRecordMessage, ) + from airbyte.caches.base import CacheBase + DEFAULT_BATCH_SIZE = 10000 -# The batch handle for file writers is a list of Path objects. -@dataclass -class FileWriterBatchHandle(BatchHandle): - """The file writer batch handle is a list of Path objects.""" +class FileWriterBase(abc.ABC): + """A generic base implementation for a file-based cache.""" - files: list[Path] = field(default_factory=list) + default_cache_file_suffix: str = ".batch" + MAX_BATCH_SIZE: int = DEFAULT_BATCH_SIZE -class FileWriterBase(RecordProcessor, abc.ABC): - """A generic base implementation for a file-based cache.""" + def __init__( + self, + cache: CacheBase, + ) -> None: + """Initialize the file writer.""" + self.cache = cache - @abc.abstractmethod - @overrides - def _write_batch( + self._active_batches: dict[str, BatchHandle] = {} + self._completed_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) + + def _get_new_cache_file_path( self, stream_name: str, - batch_id: str, - record_batch: pa.Table, - ) -> FileWriterBatchHandle: - """Process a record batch. + batch_id: str | None = None, # ULID of the batch + ) -> Path: + """Return a new cache file path for the given stream.""" + batch_id = batch_id or str(ulid.ULID()) + target_dir = Path(self.cache.cache_dir) + target_dir.mkdir(parents=True, exist_ok=True) + return target_dir / f"{stream_name}_{batch_id}{self.default_cache_file_suffix}" + + def _open_new_file( + self, + file_path: Path, + ) -> IO[bytes]: + """Open a new file for writing.""" + return file_path.open("wb") + + def _flush_active_batch( + self, + stream_name: str, + ) -> None: + """Flush the active batch for the given stream. - Return a list of paths to one or more cache files. + This entails moving the active batch to the pending batches, closing any open files, and + logging the batch as written. """ - ... + if stream_name not in self._active_batches: + return - @final - def write_batch( + batch_handle: BatchHandle = self._active_batches[stream_name] + batch_handle.close_files() + del self._active_batches[stream_name] + + self._completed_batches[stream_name].append(batch_handle) + progress.log_batch_written( + stream_name=stream_name, + batch_size=batch_handle.record_count, + ) + + def _new_batch( self, stream_name: str, - batch_id: str, - record_batch: pa.Table, - ) -> FileWriterBatchHandle: - """Write a batch of records to the cache. + ) -> BatchHandle: + """Create and return a new batch handle. + + The base implementation creates and opens a new file for writing so it is ready to receive + records. + + This also flushes the active batch if one already exists for the given stream. + """ + if stream_name in self._active_batches: + self._flush_active_batch(stream_name) + + batch_id = self._new_batch_id() + new_file_path = self._get_new_cache_file_path(stream_name) + + batch_handle = BatchHandle( + stream_name=stream_name, + batch_id=batch_id, + files=[new_file_path], + file_opener=self._open_new_file, + ) + self._active_batches[stream_name] = batch_handle + return batch_handle + + def _close_batch( + self, + batch_handle: BatchHandle, + ) -> None: + """Close the current batch.""" + if not batch_handle.open_file_writer: + return + + batch_handle.close_files() + + @final + def cleanup_all(self) -> None: + """Clean up the cache. + + For file writers, this means deleting the files created and declared in the batch. This method is final because it should not be overridden. - Subclasses should override `_write_batch` instead. + Subclasses should override `_cleanup_batch` instead. """ - return self._write_batch(stream_name, batch_id, record_batch) + for batch_handle in self._active_batches.values(): + self._cleanup_batch(batch_handle) + + for batch_list in self._completed_batches.values(): + for batch_handle in batch_list: + self._cleanup_batch(batch_handle) + + def process_record_message( + self, + record_msg: AirbyteRecordMessage, + ) -> None: + """Write a record to the cache. + + This method is called for each record message, before the batch is written. + + Returns: + A tuple of the stream name and the batch handle. + """ + stream_name = record_msg.stream + + batch_handle: BatchHandle + if stream_name not in self._active_batches: + batch_handle = self._new_batch(stream_name=stream_name) + + else: + batch_handle = self._active_batches[stream_name] + + if batch_handle.record_count + 1 > self.MAX_BATCH_SIZE: + # Already at max batch size, so start a new batch. + batch_handle = self._new_batch(stream_name=stream_name) + + if batch_handle.open_file_writer is None: + raise exc.AirbyteLibInternalError(message="Expected open file writer.") + + self._write_record_dict( + record_dict=airbyte_record_message_to_dict(record_message=record_msg), + open_file_writer=batch_handle.open_file_writer, + ) + batch_handle.increment_record_count() + + def flush_active_batches( + self, + ) -> None: + """Flush active batches for all streams.""" + streams = list(self._active_batches.keys()) + for stream_name in streams: + self._flush_active_batch(stream_name) - @overrides def _cleanup_batch( self, - stream_name: str, - batch_id: str, batch_handle: BatchHandle, ) -> None: """Clean up the cache. @@ -78,36 +188,46 @@ def _cleanup_batch( This method is a no-op if the `cleanup` config option is set to False. """ + self._close_batch(batch_handle) + if self.cache.cleanup: - batch_handle = cast(FileWriterBatchHandle, batch_handle) - _ = stream_name, batch_id - for file_path in batch_handle.files: - file_path.unlink() + batch_handle.delete_files() + + def _new_batch_id(self) -> str: + """Return a new batch handle.""" + return str(ulid.ULID()) + + # Destructor @final - def cleanup_batch( + def __del__(self) -> None: + """Teardown temporary resources when instance is unloaded from memory.""" + if self.cache.cleanup: + self.cleanup_all() + + # Abstract methods + + @abc.abstractmethod + def _write_record_dict( self, - stream_name: str, - batch_id: str, - batch_handle: BatchHandle, + record_dict: dict, + open_file_writer: IO[bytes], ) -> None: - """Clean up the cache. + """Write one record to a file.""" + raise NotImplementedError("No default implementation.") - For file writers, this means deleting the files created and declared in the batch. + # Public methods (for use by Cache and SQL Processor classes) - This method is final because it should not be overridden. + def get_active_batch(self, stream_name: str) -> BatchHandle | None: + """Return the active batch for a specific stream name.""" + return self._active_batches.get(stream_name, None) - Subclasses should override `_cleanup_batch` instead. - """ - self._cleanup_batch(stream_name, batch_id, batch_handle) + def get_pending_batches(self, stream_name: str) -> list[BatchHandle]: + """Return the pending batches for a specific stream name.""" + return [ + batch for batch in self._completed_batches.get(stream_name, []) if not batch.finalized + ] - @overrides - def _finalize_state_messages( - self, - stream_name: str, - state_messages: list[AirbyteStateMessage], - ) -> None: - """ - State messages are not used in file writers, so this method is a no-op. - """ - pass + def get_finalized_batches(self, stream_name: str) -> list[BatchHandle]: + """Return the finalized batches for a specific stream name.""" + return [batch for batch in self._completed_batches.get(stream_name, []) if batch.finalized] diff --git a/airbyte/_processors/file/jsonl.py b/airbyte/_processors/file/jsonl.py index 1ea66bde..0578fded 100644 --- a/airbyte/_processors/file/jsonl.py +++ b/airbyte/_processors/file/jsonl.py @@ -4,55 +4,36 @@ from __future__ import annotations import gzip -from pathlib import Path -from typing import TYPE_CHECKING +from typing import IO, TYPE_CHECKING, cast import orjson -import ulid -from overrides import overrides from airbyte._processors.file.base import ( FileWriterBase, - FileWriterBatchHandle, ) if TYPE_CHECKING: - import pyarrow as pa + from pathlib import Path + + pass class JsonlWriter(FileWriterBase): """A Jsonl cache implementation.""" - def get_new_cache_file_path( + default_cache_file_suffix = ".jsonl.gz" + + def _open_new_file( self, - stream_name: str, - batch_id: str | None = None, # ULID of the batch - ) -> Path: - """Return a new cache file path for the given stream.""" - batch_id = batch_id or str(ulid.ULID()) - target_dir = Path(self.cache.cache_dir) - target_dir.mkdir(parents=True, exist_ok=True) - return target_dir / f"{stream_name}_{batch_id}.jsonl.gz" - - @overrides - def _write_batch( + file_path: Path, + ) -> IO[bytes]: + """Open a new file for writing.""" + return cast(IO[bytes], gzip.open(file_path, "w")) + + def _write_record_dict( self, - stream_name: str, - batch_id: str, - record_batch: pa.Table, - ) -> FileWriterBatchHandle: - """Process a record batch. - - Return the path to the cache file. - """ - _ = batch_id # unused - output_file_path = self.get_new_cache_file_path(stream_name) - - with gzip.open(output_file_path, "w") as jsonl_file: - for record in record_batch.to_pylist(): - jsonl_file.write(orjson.dumps(record) + b"\n") - - batch_handle = FileWriterBatchHandle() - batch_handle.files.append(output_file_path) - return batch_handle + record_dict: dict, + open_file_writer: gzip.GzipFile | IO[bytes], + ) -> None: + open_file_writer.write(orjson.dumps(record_dict) + b"\n") diff --git a/airbyte/_processors/file/parquet.py b/airbyte/_processors/file/parquet.py deleted file mode 100644 index 5d1f83c0..00000000 --- a/airbyte/_processors/file/parquet.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (c) 2023 Airbyte, Inc., all rights reserved -"""A Parquet file writer implementation. - -NOTE: Parquet is a strongly typed columnar storage format, which has known issues when applied to -variable schemas, schemas with indeterminate types, and schemas that have empty data nodes. -This implementation is deprecated for now in favor of jsonl.gz, and may be removed or revamped in -the future. -""" -from __future__ import annotations - -from pathlib import Path -from typing import cast - -import pyarrow as pa -import ulid -from overrides import overrides -from pyarrow import parquet - -from airbyte import exceptions as exc -from airbyte._processors.file.base import ( - FileWriterBase, - FileWriterBatchHandle, -) -from airbyte._util.text_util import lower_case_set - - -class ParquetWriter(FileWriterBase): - """A Parquet cache implementation.""" - - def get_new_cache_file_path( - self, - stream_name: str, - batch_id: str | None = None, # ULID of the batch - ) -> Path: - """Return a new cache file path for the given stream.""" - batch_id = batch_id or str(ulid.ULID()) - target_dir = Path(self.cache.cache_dir) - target_dir.mkdir(parents=True, exist_ok=True) - return target_dir / f"{stream_name}_{batch_id}.parquet" - - def _get_missing_columns( - self, - stream_name: str, - record_batch: pa.Table, - ) -> list[str]: - """Return a list of columns that are missing in the batch. - - The comparison is based on a case-insensitive comparison of the column names. - """ - if not self._catalog_manager: - raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.") - stream = self._catalog_manager.get_stream_config(stream_name) - stream_property_names = stream.stream.json_schema["properties"].keys() - return [ - col - for col in stream_property_names - if col.lower() not in lower_case_set(record_batch.schema.names) - ] - - @overrides - def _write_batch( - self, - stream_name: str, - batch_id: str, - record_batch: pa.Table, - ) -> FileWriterBatchHandle: - """Process a record batch. - - Return the path to the cache file. - """ - _ = batch_id # unused - output_file_path = self.get_new_cache_file_path(stream_name) - - missing_columns = self._get_missing_columns(stream_name, record_batch) - if missing_columns: - # We need to append columns with the missing column name(s) and a null type - null_array = cast(pa.Array, pa.array([None] * len(record_batch), type=pa.null())) - for col in missing_columns: - record_batch = record_batch.append_column(col, null_array) - - try: - with parquet.ParquetWriter(output_file_path, schema=record_batch.schema) as writer: - writer.write_table(record_batch) - except Exception as e: - raise exc.AirbyteLibInternalError( - message=f"Failed to write record batch to Parquet file: {e}", - context={ - "stream_name": stream_name, - "batch_id": batch_id, - "output_file_path": output_file_path, - "schema": record_batch.schema, - "record_batch": record_batch, - }, - ) from e - - batch_handle = FileWriterBatchHandle() - batch_handle.files.append(output_file_path) - return batch_handle diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 682aa103..9beb0b97 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -3,10 +3,11 @@ from __future__ import annotations +import contextlib import enum from contextlib import contextmanager from functools import cached_property -from typing import TYPE_CHECKING, cast, final +from typing import TYPE_CHECKING, final import pandas as pd import sqlalchemy @@ -27,11 +28,11 @@ from sqlalchemy.sql.elements import TextClause from airbyte import exceptions as exc -from airbyte._processors.base import BatchHandle, RecordProcessor -from airbyte._processors.file.base import FileWriterBase, FileWriterBatchHandle +from airbyte._processors.base import RecordProcessor from airbyte._util.text_util import lower_case_set from airbyte.caches._catalog_manager import CatalogManager from airbyte.datasets._sql import CachedDataset +from airbyte.progress import progress from airbyte.strategies import WriteStrategy from airbyte.types import SQLTypeConverter @@ -40,17 +41,19 @@ from collections.abc import Generator from pathlib import Path - import pyarrow as pa from sqlalchemy.engine import Connection, Engine from sqlalchemy.engine.cursor import CursorResult from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql.base import Executable from airbyte_protocol.models import ( + AirbyteRecordMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ) + from airbyte._batch_handles import BatchHandle + from airbyte._processors.file.base import FileWriterBase from airbyte.caches.base import CacheBase @@ -90,9 +93,7 @@ def __init__( engine=self.get_sql_engine(), table_name_resolver=lambda stream_name: self.get_sql_table_name(stream_name), ) - self.file_writer = file_writer or self.file_writer_class( - cache, catalog_manager=self._catalog_manager - ) + self.file_writer = file_writer or self.file_writer_class(cache) self.type_converter = self.type_converter_class() self._cached_table_definitions: dict[str, sqlalchemy.Table] = {} @@ -160,11 +161,6 @@ def register_source( This method is called by the source when it is initialized. """ self._source_name = source_name - self.file_writer.register_source( - source_name, - incoming_source_catalog, - stream_names=stream_names, - ) self._ensure_schema_exists() super().register_source( source_name, @@ -233,6 +229,19 @@ def get_pandas_dataframe( engine = self.get_sql_engine() return pd.read_sql_table(table_name, engine) + def process_record_message( + self, + record_msg: AirbyteRecordMessage, + ) -> None: + """Write a record to the cache. + + This method is called for each record message, before the batch is written. + + In most cases, the SQL processor will not perform any action, but will pass this along to to + the file processor. + """ + self.file_writer.process_record_message(record_msg) + # Protected members (non-public interface): def _init_connection_settings(self, connection: Connection) -> None: @@ -474,43 +483,15 @@ def _get_sql_column_definitions( # columns["_airbyte_loaded_at"] = sqlalchemy.TIMESTAMP() return columns - @overrides - def _write_batch( - self, - stream_name: str, - batch_id: str, - record_batch: pa.Table, - ) -> FileWriterBatchHandle: - """Process a record batch. - - Return the path to the cache file. - """ - return self.file_writer.write_batch(stream_name, batch_id, record_batch) - - def _cleanup_batch( - self, - stream_name: str, - batch_id: str, - batch_handle: BatchHandle, - ) -> None: - """Clean up the cache. - - For SQL caches, we only need to call the cleanup operation on the file writer. - - Subclasses should call super() if they override this method. - """ - self.file_writer.cleanup_batch(stream_name, batch_id, batch_handle) - @final - @overrides - def _finalize_batches( + def write_stream_data( self, stream_name: str, write_strategy: WriteStrategy, - ) -> dict[str, BatchHandle]: + ) -> list[BatchHandle]: """Finalize all uncommitted batches. - This is a generic 'final' implementation, which should not be overridden. + This is a generic 'final' SQL implementation, which should not be overridden. Returns a mapping of batch IDs to batch handles, for those processed batches. @@ -518,7 +499,10 @@ def _finalize_batches( Some sources will send us duplicate records within the same stream, although this is a fairly rare edge case we can ignore in V1. """ - with self._finalizing_batches(stream_name) as batches_to_finalize: + # Flush any pending writes + self.file_writer.flush_active_batches() + + with self.finalizing_batches(stream_name) as batches_to_finalize: # Make sure the target schema and target table exist. self._ensure_schema_exists() final_table_name = self._ensure_final_table_exists( @@ -532,15 +516,14 @@ def _finalize_batches( if not batches_to_finalize: # If there are no batches to finalize, return after ensuring the table exists. - return {} + return [] files: list[Path] = [] # Get a list of all files to finalize from all pending batches. - for batch_handle in batches_to_finalize.values(): - batch_handle = cast(FileWriterBatchHandle, batch_handle) + for batch_handle in batches_to_finalize: files += batch_handle.files # Use the max batch ID as the batch ID for table names. - max_batch_id = max(batches_to_finalize.keys()) + max_batch_id = max([batch.batch_id for batch in batches_to_finalize]) temp_table_name = self._write_files_to_new_table( files=files, @@ -557,26 +540,45 @@ def _finalize_batches( finally: self._drop_temp_table(temp_table_name, if_exists=True) - # Return the batch handles as measure of work completed. - return batches_to_finalize + # Return the batch handles as measure of work completed. + return batches_to_finalize - @overrides - def _finalize_state_messages( + @final + def cleanup_all(self) -> None: + """Clean resources.""" + self.file_writer.cleanup_all() + + # Finalizing context manager + + @final + @contextlib.contextmanager + def finalizing_batches( self, stream_name: str, - state_messages: list[AirbyteStateMessage], - ) -> None: - """Handle state messages by passing them to the catalog manager.""" - if not self._catalog_manager: - raise exc.AirbyteLibInternalError( - message="Catalog manager should exist but does not.", - ) - if state_messages and self._source_name: - self._catalog_manager.save_state( - source_name=self._source_name, - stream_name=stream_name, - state=state_messages[-1], - ) + ) -> Generator[list[BatchHandle], str, None]: + """Context manager to use for finalizing batches, if applicable. + + Returns a mapping of batch IDs to batch handles, for those processed batches. + """ + batches_to_finalize: list[BatchHandle] = self.file_writer.get_pending_batches(stream_name) + state_messages_to_finalize: list[AirbyteStateMessage] = self._pending_state_messages[ + stream_name + ].copy() + self._pending_state_messages[stream_name].clear() + + progress.log_batches_finalizing(stream_name, len(batches_to_finalize)) + yield batches_to_finalize + self._finalize_state_messages(stream_name, state_messages_to_finalize) + progress.log_batches_finalized(stream_name, len(batches_to_finalize)) + + for batch_handle in batches_to_finalize: + batch_handle.finalized = True + + self._finalized_state_messages[stream_name] += state_messages_to_finalize + + if self.cache.cleanup: + for batch_handle in batches_to_finalize: + batch_handle.delete_files() def _execute_sql(self, sql: str | TextClause | Executable) -> CursorResult: """Execute the given SQL statement.""" diff --git a/airbyte/_processors/sql/duckdb.py b/airbyte/_processors/sql/duckdb.py index ba27f03e..a31f39a3 100644 --- a/airbyte/_processors/sql/duckdb.py +++ b/airbyte/_processors/sql/duckdb.py @@ -77,12 +77,8 @@ def _write_files_to_new_table( ) -> str: """Write a file(s) to a new table. - We use DuckDB's `read_parquet` function to efficiently read the files and insert + We use DuckDB native SQL functions to efficiently read the files and insert them into the table in a single operation. - - Note: This implementation is fragile in regards to column ordering. However, since - we are inserting into a temp table we have just created, there should be no - drift between the table schema and the file schema. """ temp_table_name = self._create_table_for_loading( stream_name=stream_name, diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index f39f5c28..7f5f34f6 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -5,10 +5,12 @@ import abc from pathlib import Path -from typing import TYPE_CHECKING, Any, Optional, final +from typing import TYPE_CHECKING, Any, Optional, cast, final from pydantic import BaseModel, PrivateAttr +from airbyte import exceptions as exc +from airbyte.caches._catalog_manager import CatalogManager from airbyte.datasets._sql import CachedDataset @@ -76,13 +78,43 @@ def streams( """Return a temporary table name.""" result = {} stream_names = self.processor.expected_streams - if self.processor._catalog_manager is not None: # noqa: SLF001 - stream_names |= set(self.processor._catalog_manager.stream_names) # noqa: SLF001 + if self._has_catalog_manager: + stream_names |= set(self._catalog_manager.stream_names) for stream_name in stream_names: result[stream_name] = CachedDataset(self, stream_name) return result + def _get_state( + self, + source_name: str, + streams: list[str] | None, + ) -> list[dict[str, Any]] | None: + return self._catalog_manager.get_state( + source_name=source_name, + streams=streams, + ) + + @property + def _has_catalog_manager( + self, + ) -> bool: + """Return whether the cache has a catalog manager.""" + # Member is private until we have a public API for it. + return self.processor._catalog_manager is not None # noqa: SLF001 + + @property + def _catalog_manager( + self, + ) -> CatalogManager: + if not self._has_catalog_manager: + raise exc.AirbyteLibInternalError( + message="Catalog manager should exist but does not.", + ) + + # Member is private until we have a public API for it. + return cast(CatalogManager, self.processor._catalog_manager) # noqa: SLF001 + def __getitem__(self, stream: str) -> DatasetBase: return self.streams[stream] diff --git a/airbyte/datasets/_sql.py b/airbyte/datasets/_sql.py index 7911fbfb..b23cccad 100644 --- a/airbyte/datasets/_sql.py +++ b/airbyte/datasets/_sql.py @@ -39,7 +39,7 @@ def __init__( self._stream_name: str = stream_name self._query_statement: Selectable = query_statement super().__init__( - stream_metadata=cache.processor._get_stream_config( # noqa: SLF001 # Member is private until we have a public API for it. + stream_metadata=cache.processor._get_stream_config( # noqa: SLF001 # Member is private until we have a public API for it. stream_name=stream_name ), ) diff --git a/airbyte/source.py b/airbyte/source.py index c20cc3b8..96d8bdc2 100644 --- a/airbyte/source.py +++ b/airbyte/source.py @@ -615,11 +615,9 @@ def read( incoming_source_catalog=self.configured_catalog, stream_names=set(self._selected_stream_names), ) - if not cache.processor._catalog_manager: # noqa: SLF001 - raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.") state = ( - cache.processor._catalog_manager.get_state( # noqa: SLF001 + cache._get_state( # noqa: SLF001 # Private method until we have a public API for it. source_name=self.name, streams=self._selected_stream_names, ) diff --git a/airbyte/types.py b/airbyte/types.py index 64322bda..a95dbf59 100644 --- a/airbyte/types.py +++ b/airbyte/types.py @@ -5,7 +5,6 @@ from typing import cast -import pyarrow as pa import sqlalchemy from rich import print @@ -81,61 +80,6 @@ def _get_airbyte_type( # noqa: PLR0911 # Too many return statements raise SQLTypeConversionError(err_msg) -def _get_pyarrow_type( # noqa: PLR0911 # Too many return statements - json_schema_property_def: dict[str, str | dict | list], -) -> pa.DataType: - json_schema_type = json_schema_property_def.get("type", None) - json_schema_format = json_schema_property_def.get("format", None) - - # if json_schema_type is an array of two strings with one of them being null, pick the other one - # this strategy is often used by connectors to indicate a field might not be set all the time - if isinstance(json_schema_type, list): - non_null_types = [t for t in json_schema_type if t != "null"] - if len(non_null_types) == 1: - json_schema_type = non_null_types[0] - - if json_schema_type == "string": - if json_schema_format == "date": - return pa.date64() - - if json_schema_format == "date-time": - return pa.timestamp("ns") - - if json_schema_format == "time": - return pa.timestamp("ns") - - if json_schema_type == "string": - return pa.string() - - if json_schema_type == "number": - return pa.float64() - - if json_schema_type == "integer": - return pa.int64() - - if json_schema_type == "boolean": - return pa.bool_() - - if json_schema_type == "object": - return pa.struct( - fields={ - k: _get_pyarrow_type(v) - for k, v in cast(dict, json_schema_property_def.get("properties", {})).items() - } - ) - - if json_schema_type == "array": - items_def = json_schema_property_def.get("items", None) - if isinstance(items_def, dict): - subtype: pa.DataType = _get_pyarrow_type(items_def) - return pa.list_(subtype) - - return pa.list_(pa.string()) - - err_msg = f"Could not determine PyArrow type from JSON schema type: {json_schema_property_def}" - raise SQLTypeConversionError(err_msg) - - class SQLTypeConverter: """A base class to perform type conversions.""" diff --git a/airbyte/validate.py b/airbyte/validate.py index 9b3520b5..dea7d2f4 100644 --- a/airbyte/validate.py +++ b/airbyte/validate.py @@ -91,7 +91,7 @@ def full_tests(connector_name: str, sample_config: str) -> None: def install_only_test(connector_name: str) -> None: print("Creating source and validating spec is returned successfully...") source = ab.get_source(connector_name) - source._get_spec(force_refresh=True) # noqa: SLF001 + source._get_spec(force_refresh=True) # noqa: SLF001 # Member is private until we have a public API for it. def run() -> None: diff --git a/poetry.lock b/poetry.lock index 49c66861..6d346db2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -670,18 +670,18 @@ grpc = ["grpcio (>=1.38.0,<2.0dev)", "grpcio-status (>=1.38.0,<2.0.dev0)"] [[package]] name = "google-cloud-secret-manager" -version = "2.18.2" +version = "2.18.3" description = "Google Cloud Secret Manager API client library" optional = false python-versions = ">=3.7" files = [ - {file = "google-cloud-secret-manager-2.18.2.tar.gz", hash = "sha256:a00d62115a70083e86b1d44ca7ebcae041b36a44cc62ea57de4005731f8d3c88"}, - {file = "google_cloud_secret_manager-2.18.2-py2.py3-none-any.whl", hash = "sha256:bb2f2b23a588f9eeae51df4a028d6198a603231a056f838e6a476b5aecadbbca"}, + {file = "google-cloud-secret-manager-2.18.3.tar.gz", hash = "sha256:1db2f409324536e34f985081d389e3974ca3a3668df7845cad0be03ab8c0fa7d"}, + {file = "google_cloud_secret_manager-2.18.3-py2.py3-none-any.whl", hash = "sha256:4d4af82bddd9099ebdbe79e0c6b68f6c6cabea8323a3c1275bcead8f56310fb7"}, ] [package.dependencies] google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0dev", extras = ["grpc"]} -google-auth = ">=2.14.1,<3.0.0dev" +google-auth = ">=2.14.1,<2.24.0 || >2.24.0,<2.25.0 || >2.25.0,<3.0.0dev" grpc-google-iam-v1 = ">=0.12.4,<1.0.0dev" proto-plus = ">=1.22.3,<2.0.0dev" protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0dev" @@ -1639,64 +1639,6 @@ files = [ {file = "psycopg2_binary-2.9.9-cp39-cp39-win_amd64.whl", hash = "sha256:f7ae5d65ccfbebdfa761585228eb4d0df3a8b15cfb53bd953e713e09fbb12957"}, ] -[[package]] -name = "pyarrow" -version = "14.0.2" -description = "Python library for Apache Arrow" -optional = false -python-versions = ">=3.8" -files = [ - {file = "pyarrow-14.0.2-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:ba9fe808596c5dbd08b3aeffe901e5f81095baaa28e7d5118e01354c64f22807"}, - {file = "pyarrow-14.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:22a768987a16bb46220cef490c56c671993fbee8fd0475febac0b3e16b00a10e"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2dbba05e98f247f17e64303eb876f4a80fcd32f73c7e9ad975a83834d81f3fda"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a898d134d00b1eca04998e9d286e19653f9d0fcb99587310cd10270907452a6b"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:87e879323f256cb04267bb365add7208f302df942eb943c93a9dfeb8f44840b1"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:76fc257559404ea5f1306ea9a3ff0541bf996ff3f7b9209fc517b5e83811fa8e"}, - {file = "pyarrow-14.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:b0c4a18e00f3a32398a7f31da47fefcd7a927545b396e1f15d0c85c2f2c778cd"}, - {file = "pyarrow-14.0.2-cp311-cp311-macosx_10_14_x86_64.whl", hash = "sha256:87482af32e5a0c0cce2d12eb3c039dd1d853bd905b04f3f953f147c7a196915b"}, - {file = "pyarrow-14.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:059bd8f12a70519e46cd64e1ba40e97eae55e0cbe1695edd95384653d7626b23"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3f16111f9ab27e60b391c5f6d197510e3ad6654e73857b4e394861fc79c37200"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06ff1264fe4448e8d02073f5ce45a9f934c0f3db0a04460d0b01ff28befc3696"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:6dd4f4b472ccf4042f1eab77e6c8bce574543f54d2135c7e396f413046397d5a"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:32356bfb58b36059773f49e4e214996888eeea3a08893e7dbde44753799b2a02"}, - {file = "pyarrow-14.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:52809ee69d4dbf2241c0e4366d949ba035cbcf48409bf404f071f624ed313a2b"}, - {file = "pyarrow-14.0.2-cp312-cp312-macosx_10_14_x86_64.whl", hash = "sha256:c87824a5ac52be210d32906c715f4ed7053d0180c1060ae3ff9b7e560f53f944"}, - {file = "pyarrow-14.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a25eb2421a58e861f6ca91f43339d215476f4fe159eca603c55950c14f378cc5"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5c1da70d668af5620b8ba0a23f229030a4cd6c5f24a616a146f30d2386fec422"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2cc61593c8e66194c7cdfae594503e91b926a228fba40b5cf25cc593563bcd07"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:78ea56f62fb7c0ae8ecb9afdd7893e3a7dbeb0b04106f5c08dbb23f9c0157591"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:37c233ddbce0c67a76c0985612fef27c0c92aef9413cf5aa56952f359fcb7379"}, - {file = "pyarrow-14.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:e4b123ad0f6add92de898214d404e488167b87b5dd86e9a434126bc2b7a5578d"}, - {file = "pyarrow-14.0.2-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:e354fba8490de258be7687f341bc04aba181fc8aa1f71e4584f9890d9cb2dec2"}, - {file = "pyarrow-14.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:20e003a23a13da963f43e2b432483fdd8c38dc8882cd145f09f21792e1cf22a1"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc0de7575e841f1595ac07e5bc631084fd06ca8b03c0f2ecece733d23cd5102a"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:66e986dc859712acb0bd45601229021f3ffcdfc49044b64c6d071aaf4fa49e98"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:f7d029f20ef56673a9730766023459ece397a05001f4e4d13805111d7c2108c0"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:209bac546942b0d8edc8debda248364f7f668e4aad4741bae58e67d40e5fcf75"}, - {file = "pyarrow-14.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:1e6987c5274fb87d66bb36816afb6f65707546b3c45c44c28e3c4133c010a881"}, - {file = "pyarrow-14.0.2-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:a01d0052d2a294a5f56cc1862933014e696aa08cc7b620e8c0cce5a5d362e976"}, - {file = "pyarrow-14.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a51fee3a7db4d37f8cda3ea96f32530620d43b0489d169b285d774da48ca9785"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:64df2bf1ef2ef14cee531e2dfe03dd924017650ffaa6f9513d7a1bb291e59c15"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c0fa3bfdb0305ffe09810f9d3e2e50a2787e3a07063001dcd7adae0cee3601a"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:c65bf4fd06584f058420238bc47a316e80dda01ec0dfb3044594128a6c2db794"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:63ac901baec9369d6aae1cbe6cca11178fb018a8d45068aaf5bb54f94804a866"}, - {file = "pyarrow-14.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:75ee0efe7a87a687ae303d63037d08a48ef9ea0127064df18267252cfe2e9541"}, - {file = "pyarrow-14.0.2.tar.gz", hash = "sha256:36cef6ba12b499d864d1def3e990f97949e0b79400d08b7cf74504ffbd3eb025"}, -] - -[package.dependencies] -numpy = ">=1.16.6" - -[[package]] -name = "pyarrow-stubs" -version = "10.0.1.7" -description = "Type annotations for pyarrow" -optional = false -python-versions = ">=3.7,<4.0" -files = [ - {file = "pyarrow_stubs-10.0.1.7-py3-none-any.whl", hash = "sha256:cccc7a46eddeea4e3cb85330eb8972c116a615da6188b8ae1f7a44cb724b21ac"}, -] - [[package]] name = "pyasn1" version = "0.5.1" @@ -2795,4 +2737,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "a6b886f25692bf3dc4f8503c6d81ef0b7d690fe93432e0bf58812c534b1fe037" +content-hash = "3490632ee893aa38e9b9bd6fc3435f0122cfa4beaf1694902d2558e529e9df3d" diff --git a/pyproject.toml b/pyproject.toml index 0166f599..c25556b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,6 @@ pendulum = "<=3.0.0" psycopg2-binary = "^2.9.9" # psycopg = {extras = ["binary", "pool"], version = "^3.1.16"} # Psycopg3 is not supported in SQLAlchemy 1.x: -pyarrow = "^14.0.2" pydantic = "<=2.0" python-dotenv = "^1.0.1" python-ulid = "^2.2.0" @@ -49,7 +48,6 @@ faker = "^21.0.0" mypy = "^1.7.1" pandas-stubs = "^2.1.4.231218" pdoc = "^14.3.0" -pyarrow-stubs = "^10.0.1.7" pytest = "^7.4.3" pytest-docker = "^2.0.1" pytest-mypy = "^0.10.3"