From c17138a97bb01e467380ca9e149888402bf4b323 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez <168454423+aldogonzalez8@users.noreply.github.com> Date: Mon, 14 Oct 2024 15:33:53 -0600 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20bug(source-klaviyo):=20fix=20pro?= =?UTF-8?q?blem=20with=20pods=20running=20out=20of=20memory=20when=20synin?= =?UTF-8?q?g=20events=20stream=20historical=20data=20(#46741)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connectors/source-klaviyo/metadata.yaml | 2 +- .../connectors/source-klaviyo/poetry.lock | 26 ++++-- .../connectors/source-klaviyo/pyproject.toml | 3 +- .../components/datetime_based_cursor.py | 28 ++++++ .../source_klaviyo/manifest.yaml | 25 ++++- .../source-klaviyo/unit_tests/test_streams.py | 92 +++++++++++++++++++ docs/integrations/sources/klaviyo.md | 1 + 7 files changed, 168 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/metadata.yaml b/airbyte-integrations/connectors/source-klaviyo/metadata.yaml index 985e969829b5..b3d15a1e7b48 100644 --- a/airbyte-integrations/connectors/source-klaviyo/metadata.yaml +++ b/airbyte-integrations/connectors/source-klaviyo/metadata.yaml @@ -8,7 +8,7 @@ data: definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde connectorBuildOptions: baseImage: docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916 - dockerImageTag: 2.10.9 + dockerImageTag: 2.10.10 dockerRepository: airbyte/source-klaviyo githubIssueLabel: source-klaviyo icon: klaviyo.svg diff --git a/airbyte-integrations/connectors/source-klaviyo/poetry.lock b/airbyte-integrations/connectors/source-klaviyo/poetry.lock index 99be6f09b85b..48ef7c17efc0 100644 --- a/airbyte-integrations/connectors/source-klaviyo/poetry.lock +++ b/airbyte-integrations/connectors/source-klaviyo/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "airbyte-cdk" @@ -69,13 +69,13 @@ files = [ [[package]] name = "anyio" -version = "4.6.0" +version = "4.6.1" description = "High level compatibility layer for multiple asynchronous event loop implementations" optional = false python-versions = ">=3.9" files = [ - {file = "anyio-4.6.0-py3-none-any.whl", hash = "sha256:c7d2e9d63e31599eeb636c8c5c03a7e108d73b345f064f1c19fdc87b79036a9a"}, - {file = "anyio-4.6.0.tar.gz", hash = "sha256:137b4559cbb034c477165047febb6ff83f390fc3b20bf181c1fc0a728cb8beeb"}, + {file = "anyio-4.6.1-py3-none-any.whl", hash = "sha256:0632863c9044798a494a05cab0b159dfad6a3f064094863a45878320eb4e8ed2"}, + {file = "anyio-4.6.1.tar.gz", hash = "sha256:936e6613a08e8f71a300cfffca1c1c0806335607247696ac45f9b32c63bfb9aa"}, ] [package.dependencies] @@ -86,7 +86,7 @@ typing-extensions = {version = ">=4.1", markers = "python_version < \"3.11\""} [package.extras] doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] -test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.21.0b1)"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21.0b1)"] trio = ["trio (>=0.26.1)"] [[package]] @@ -502,6 +502,20 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "freezegun" +version = "1.5.1" +description = "Let your Python tests travel through time" +optional = false +python-versions = ">=3.7" +files = [ + {file = "freezegun-1.5.1-py3-none-any.whl", hash = "sha256:bf111d7138a8abe55ab48a71755673dbaa4ab87f4cff5634a4442dfec34c15f1"}, + {file = "freezegun-1.5.1.tar.gz", hash = "sha256:b29dedfcda6d5e8e083ce71b2b542753ad48cfec44037b3fc79702e2980a89e9"}, +] + +[package.dependencies] +python-dateutil = ">=2.7" + [[package]] name = "genson" version = "1.2.2" @@ -1736,4 +1750,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.10,<3.12" -content-hash = "e70985541524035b3e9405f541b313a20bc645a545bb36bf989a09cfcfde07c1" +content-hash = "1a51c577d6a82450bc128925daa6c163cb96729a9df2835522313a6d7938f1d5" diff --git a/airbyte-integrations/connectors/source-klaviyo/pyproject.toml b/airbyte-integrations/connectors/source-klaviyo/pyproject.toml index d5a076f4c14f..86d50b96e9e3 100644 --- a/airbyte-integrations/connectors/source-klaviyo/pyproject.toml +++ b/airbyte-integrations/connectors/source-klaviyo/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.10.9" +version = "2.10.10" name = "source-klaviyo" description = "Source implementation for Klaviyo." authors = [ "Airbyte ",] @@ -26,3 +26,4 @@ source-klaviyo = "source_klaviyo.run:run" pytest = "^6.1" pytest-mock = "^3.12.0" requests-mock = "^1.9.3" +freezegun = "*" diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/components/datetime_based_cursor.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/components/datetime_based_cursor.py index 39207268d9a4..d8580fded7d3 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/components/datetime_based_cursor.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/components/datetime_based_cursor.py @@ -25,3 +25,31 @@ def get_request_params( field = self.cursor_field.eval(self.config) value = stream_slice.get(self._partition_field_start.eval(self.config)) return {"filter": f"greater-than({field},{value})", "sort": field} + + +@dataclass +class KlaviyoCheckpointDatetimeBasedCursor(DatetimeBasedCursor): + """ + You can configure the declarative stream with a step to checkpoint after the slice is completed + e.g. + incremental_sync: + type: CustomIncrementalSync + ... some configuration + step: P1M + cursor_granularity: PT1S + """ + + def get_request_params( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + if not stream_slice: + return {} + + field = self.cursor_field.eval(self.config) + start_value = stream_slice.get(self._partition_field_start.eval(self.config)) + end_value = stream_slice.get(self._partition_field_end.eval(self.config)) + return {"filter": f"greater-or-equal({field},{start_value}),less-or-equal({field},{end_value})", "sort": field} diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/manifest.yaml b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/manifest.yaml index 605a005e0c19..4ea2663b513f 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/manifest.yaml +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/manifest.yaml @@ -118,6 +118,29 @@ definitions: field_name: "{{ parameters.get('cursor_field', 'updated') }}" inject_into: request_parameter + base_incremental_checkpoint_stream: + $ref: "#/definitions/base_stream" + retriever: "#/definitions/base_retriever" + incremental_sync: + type: CustomIncrementalSync + class_name: source_klaviyo.components.datetime_based_cursor.KlaviyoCheckpointDatetimeBasedCursor + cursor_field: "{{ parameters.get('cursor_field', 'updated') }}" + start_datetime: "{{ config.get('start_date', '2012-01-01T00:00:00Z') }}" + datetime_format: "%Y-%m-%dT%H:%M:%S%z" + cursor_datetime_formats: + - "%Y-%m-%dT%H:%M:%S.%f%z" + - "%Y-%m-%dT%H:%M:%S%z" + - "%Y-%m-%d %H:%M:%S%z" + start_time_option: + type: RequestOption + field_name: "{{ parameters.get('cursor_field', 'updated') }}" + inject_into: request_parameter + # Syncing historical data from events endpoint can take days to sync and cause memory issues. + # Checkpoint after each slice, which means the next sync will start from the last successful slice. + # This ensures that even if the sync runs out of memory or fails, it won’t start over from the beginning. + step: P1M + cursor_granularity: PT1S + base_semi_incremental_stream: $ref: "#/definitions/base_stream" retriever: "#/definitions/semi_incremental_retriever" @@ -161,7 +184,7 @@ definitions: events_stream: # Docs: https://developers.klaviyo.com/en/reference/get_events name: "events" - $ref: "#/definitions/base_incremental_stream" + $ref: "#/definitions/base_incremental_checkpoint_stream" retriever: $ref: "#/definitions/base_retriever" requester: diff --git a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py index 6dd478c0ced0..196c7e736f27 100644 --- a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py @@ -3,16 +3,20 @@ # +import urllib.parse +from datetime import datetime, timedelta from typing import Any, List, Mapping, Optional from unittest import mock from unittest.mock import patch +import freezegun import pendulum import pytest import requests from airbyte_cdk import AirbyteTracedException from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams import Stream +from dateutil.relativedelta import relativedelta from pydantic import BaseModel from source_klaviyo.availability_strategy import KlaviyoAvailabilityStrategy from source_klaviyo.source import SourceKlaviyo @@ -23,6 +27,19 @@ START_DATE = pendulum.datetime(2020, 10, 10) CONFIG = {"api_key": API_KEY, "start_date": START_DATE} +EVENTS_STREAM_DEFAULT_START_DATE = "2012-01-01T00:00:00+00:00" +EVENTS_STREAM_CONFIG_START_DATE = "2021-11-08T00:00:00+00:00" +EVENTS_STREAM_STATE_DATE = (datetime.fromisoformat(EVENTS_STREAM_CONFIG_START_DATE) + relativedelta(years=1)).isoformat() +EVENTS_STREAM_TESTING_FREEZE_TIME = "2023-12-12 12:00:00" + +def get_months_diff(provided_date: str) -> int: + """ + This function returns the difference in months between provided date and freeze time. + """ + provided_date = datetime.fromisoformat(provided_date).replace(tzinfo=None) + freeze_date = datetime.strptime(EVENTS_STREAM_TESTING_FREEZE_TIME, "%Y-%m-%d %H:%M:%S") + difference = relativedelta(freeze_date, provided_date) + return difference.years * 12 + difference.months def get_stream_by_name(stream_name: str, config: Mapping[str, Any]) -> Stream: source = SourceKlaviyo() @@ -160,6 +177,36 @@ def test_backoff_time_large_retry_after(self): class TestIncrementalKlaviyoStream: + + @staticmethod + def generate_api_urls(start_date_str: str) -> list[(str, str)]: + """ + This function generates API URLs. + Each URL will cover one month of data starting from the input date up to the current moment. + """ + start_date = datetime.fromisoformat(start_date_str) + current_date = datetime.now(start_date.tzinfo) + urls = [] + while start_date < current_date: + end_date = start_date + relativedelta(months=1) - timedelta(seconds=1) + if end_date > current_date: + end_date = current_date + start_date_str = start_date.strftime("%Y-%m-%dT%H:%M:%S") + start_date.strftime("%z") + end_date_str = end_date.strftime("%Y-%m-%dT%H:%M:%S") + end_date.strftime("%z") + base_url = 'https://a.klaviyo.com/api/events' + query_params = { + 'fields[metric]': 'name,created,updated,integration', + 'include': 'metric', + 'filter': f'greater-or-equal(datetime,{start_date_str}),less-or-equal(datetime,{end_date_str})', + 'sort': 'datetime' + } + encoded_query = urllib.parse.urlencode(query_params) + encoded_url = f"{base_url}?{encoded_query}" + dummy_record = {"attributes": {"datetime": start_date_str}, "datetime": start_date_str} + urls.append((encoded_url, dummy_record)) + start_date = start_date + relativedelta(months=1) + return urls + def test_cursor_field_is_required(self): with pytest.raises( expected_exception=TypeError, @@ -239,6 +286,51 @@ def test_get_updated_state(self, config_start_date, current_cursor, latest_curso ) == {stream.cursor_field: expected_cursor} + @freezegun.freeze_time("2023-12-12 12:00:00") + @pytest.mark.parametrize( + # expected_amount_of_results: we put 1 record for every request + ("config_start_date", "stream_state", "expected_amount_of_results"), + ( + ( + # we pick the state + EVENTS_STREAM_CONFIG_START_DATE, + EVENTS_STREAM_STATE_DATE, + get_months_diff(EVENTS_STREAM_STATE_DATE) + 1 # adding last request + ), + ( + # we pick the config start date + EVENTS_STREAM_CONFIG_START_DATE, + None, + get_months_diff(EVENTS_STREAM_CONFIG_START_DATE) + 1 # adding last request + ), + ( + "", + "", + get_months_diff(EVENTS_STREAM_DEFAULT_START_DATE) + 1 # adding last request + ), + ), + ) + def test_read_records_events(self, config_start_date, stream_state, expected_amount_of_results, requests_mock): + if config_start_date: + test_config = CONFIG | {"start_date": config_start_date} + else: + test_config = {**CONFIG} + test_config.pop("start_date") + stream = get_stream_by_name("events", test_config) + dummy_records = [] + + initial_date_for_urls = stream_state or config_start_date or EVENTS_STREAM_DEFAULT_START_DATE + urls = self.generate_api_urls(initial_date_for_urls) + for url, dummy_record in urls: + requests_mock.register_uri("GET", url, status_code=200, json={"data": dummy_record}) + dummy_records.append(dummy_record) + + stream.state = {stream.cursor_field: stream_state if stream_state else config_start_date} + records = get_records(stream=stream, sync_mode=SyncMode.incremental) + assert records == dummy_records + assert len(records) == expected_amount_of_results + + class TestSemiIncrementalKlaviyoStream: @pytest.mark.parametrize( ("start_date", "stream_state", "input_records", "expected_records"), diff --git a/docs/integrations/sources/klaviyo.md b/docs/integrations/sources/klaviyo.md index ef82ec2ae91b..a445b571707c 100644 --- a/docs/integrations/sources/klaviyo.md +++ b/docs/integrations/sources/klaviyo.md @@ -95,6 +95,7 @@ contain the `predictive_analytics` field and workflows depending on this field w | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------| +| 2.10.10 | 2024-10-14 | [46741](https://github.com/airbytehq/airbyte/pull/46741) | Add checkpointing to events stream to improve large syncs after clear data | | 2.10.9 | 2024-10-12 | [46787](https://github.com/airbytehq/airbyte/pull/46787) | Update dependencies | | 2.10.8 | 2024-10-05 | [46503](https://github.com/airbytehq/airbyte/pull/46503) | Update dependencies | | 2.10.7 | 2024-09-28 | [46174](https://github.com/airbytehq/airbyte/pull/46174) | Update dependencies |