Skip to content

Commit

Permalink
🐛 bug(source-klaviyo): fix problem with pods running out of memory wh…
Browse files Browse the repository at this point in the history
…en syning events stream historical data (#46741)
  • Loading branch information
aldogonzalez8 authored Oct 14, 2024
1 parent a222903 commit c17138a
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916
dockerImageTag: 2.10.9
dockerImageTag: 2.10.10
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
26 changes: 20 additions & 6 deletions airbyte-integrations/connectors/source-klaviyo/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.10.9"
version = "2.10.10"
name = "source-klaviyo"
description = "Source implementation for Klaviyo."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -26,3 +26,4 @@ source-klaviyo = "source_klaviyo.run:run"
pytest = "^6.1"
pytest-mock = "^3.12.0"
requests-mock = "^1.9.3"
freezegun = "*"
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,31 @@ def get_request_params(
field = self.cursor_field.eval(self.config)
value = stream_slice.get(self._partition_field_start.eval(self.config))
return {"filter": f"greater-than({field},{value})", "sort": field}


@dataclass
class KlaviyoCheckpointDatetimeBasedCursor(DatetimeBasedCursor):
"""
You can configure the declarative stream with a step to checkpoint after the slice is completed
e.g.
incremental_sync:
type: CustomIncrementalSync
... some configuration
step: P1M
cursor_granularity: PT1S
"""

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
if not stream_slice:
return {}

field = self.cursor_field.eval(self.config)
start_value = stream_slice.get(self._partition_field_start.eval(self.config))
end_value = stream_slice.get(self._partition_field_end.eval(self.config))
return {"filter": f"greater-or-equal({field},{start_value}),less-or-equal({field},{end_value})", "sort": field}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,29 @@ definitions:
field_name: "{{ parameters.get('cursor_field', 'updated') }}"
inject_into: request_parameter

base_incremental_checkpoint_stream:
$ref: "#/definitions/base_stream"
retriever: "#/definitions/base_retriever"
incremental_sync:
type: CustomIncrementalSync
class_name: source_klaviyo.components.datetime_based_cursor.KlaviyoCheckpointDatetimeBasedCursor
cursor_field: "{{ parameters.get('cursor_field', 'updated') }}"
start_datetime: "{{ config.get('start_date', '2012-01-01T00:00:00Z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%dT%H:%M:%S%z"
- "%Y-%m-%d %H:%M:%S%z"
start_time_option:
type: RequestOption
field_name: "{{ parameters.get('cursor_field', 'updated') }}"
inject_into: request_parameter
# Syncing historical data from events endpoint can take days to sync and cause memory issues.
# Checkpoint after each slice, which means the next sync will start from the last successful slice.
# This ensures that even if the sync runs out of memory or fails, it won’t start over from the beginning.
step: P1M
cursor_granularity: PT1S

base_semi_incremental_stream:
$ref: "#/definitions/base_stream"
retriever: "#/definitions/semi_incremental_retriever"
Expand Down Expand Up @@ -161,7 +184,7 @@ definitions:
events_stream:
# Docs: https://developers.klaviyo.com/en/reference/get_events
name: "events"
$ref: "#/definitions/base_incremental_stream"
$ref: "#/definitions/base_incremental_checkpoint_stream"
retriever:
$ref: "#/definitions/base_retriever"
requester:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
#


import urllib.parse
from datetime import datetime, timedelta
from typing import Any, List, Mapping, Optional
from unittest import mock
from unittest.mock import patch

import freezegun
import pendulum
import pytest
import requests
from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from dateutil.relativedelta import relativedelta
from pydantic import BaseModel
from source_klaviyo.availability_strategy import KlaviyoAvailabilityStrategy
from source_klaviyo.source import SourceKlaviyo
Expand All @@ -23,6 +27,19 @@
START_DATE = pendulum.datetime(2020, 10, 10)
CONFIG = {"api_key": API_KEY, "start_date": START_DATE}

EVENTS_STREAM_DEFAULT_START_DATE = "2012-01-01T00:00:00+00:00"
EVENTS_STREAM_CONFIG_START_DATE = "2021-11-08T00:00:00+00:00"
EVENTS_STREAM_STATE_DATE = (datetime.fromisoformat(EVENTS_STREAM_CONFIG_START_DATE) + relativedelta(years=1)).isoformat()
EVENTS_STREAM_TESTING_FREEZE_TIME = "2023-12-12 12:00:00"

def get_months_diff(provided_date: str) -> int:
"""
This function returns the difference in months between provided date and freeze time.
"""
provided_date = datetime.fromisoformat(provided_date).replace(tzinfo=None)
freeze_date = datetime.strptime(EVENTS_STREAM_TESTING_FREEZE_TIME, "%Y-%m-%d %H:%M:%S")
difference = relativedelta(freeze_date, provided_date)
return difference.years * 12 + difference.months

def get_stream_by_name(stream_name: str, config: Mapping[str, Any]) -> Stream:
source = SourceKlaviyo()
Expand Down Expand Up @@ -160,6 +177,36 @@ def test_backoff_time_large_retry_after(self):


class TestIncrementalKlaviyoStream:

@staticmethod
def generate_api_urls(start_date_str: str) -> list[(str, str)]:
"""
This function generates API URLs.
Each URL will cover one month of data starting from the input date up to the current moment.
"""
start_date = datetime.fromisoformat(start_date_str)
current_date = datetime.now(start_date.tzinfo)
urls = []
while start_date < current_date:
end_date = start_date + relativedelta(months=1) - timedelta(seconds=1)
if end_date > current_date:
end_date = current_date
start_date_str = start_date.strftime("%Y-%m-%dT%H:%M:%S") + start_date.strftime("%z")
end_date_str = end_date.strftime("%Y-%m-%dT%H:%M:%S") + end_date.strftime("%z")
base_url = 'https://a.klaviyo.com/api/events'
query_params = {
'fields[metric]': 'name,created,updated,integration',
'include': 'metric',
'filter': f'greater-or-equal(datetime,{start_date_str}),less-or-equal(datetime,{end_date_str})',
'sort': 'datetime'
}
encoded_query = urllib.parse.urlencode(query_params)
encoded_url = f"{base_url}?{encoded_query}"
dummy_record = {"attributes": {"datetime": start_date_str}, "datetime": start_date_str}
urls.append((encoded_url, dummy_record))
start_date = start_date + relativedelta(months=1)
return urls

def test_cursor_field_is_required(self):
with pytest.raises(
expected_exception=TypeError,
Expand Down Expand Up @@ -239,6 +286,51 @@ def test_get_updated_state(self, config_start_date, current_cursor, latest_curso
) == {stream.cursor_field: expected_cursor}


@freezegun.freeze_time("2023-12-12 12:00:00")
@pytest.mark.parametrize(
# expected_amount_of_results: we put 1 record for every request
("config_start_date", "stream_state", "expected_amount_of_results"),
(
(
# we pick the state
EVENTS_STREAM_CONFIG_START_DATE,
EVENTS_STREAM_STATE_DATE,
get_months_diff(EVENTS_STREAM_STATE_DATE) + 1 # adding last request
),
(
# we pick the config start date
EVENTS_STREAM_CONFIG_START_DATE,
None,
get_months_diff(EVENTS_STREAM_CONFIG_START_DATE) + 1 # adding last request
),
(
"",
"",
get_months_diff(EVENTS_STREAM_DEFAULT_START_DATE) + 1 # adding last request
),
),
)
def test_read_records_events(self, config_start_date, stream_state, expected_amount_of_results, requests_mock):
if config_start_date:
test_config = CONFIG | {"start_date": config_start_date}
else:
test_config = {**CONFIG}
test_config.pop("start_date")
stream = get_stream_by_name("events", test_config)
dummy_records = []

initial_date_for_urls = stream_state or config_start_date or EVENTS_STREAM_DEFAULT_START_DATE
urls = self.generate_api_urls(initial_date_for_urls)
for url, dummy_record in urls:
requests_mock.register_uri("GET", url, status_code=200, json={"data": dummy_record})
dummy_records.append(dummy_record)

stream.state = {stream.cursor_field: stream_state if stream_state else config_start_date}
records = get_records(stream=stream, sync_mode=SyncMode.incremental)
assert records == dummy_records
assert len(records) == expected_amount_of_results


class TestSemiIncrementalKlaviyoStream:
@pytest.mark.parametrize(
("start_date", "stream_state", "input_records", "expected_records"),
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ contain the `predictive_analytics` field and workflows depending on this field w

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|
| 2.10.10 | 2024-10-14 | [46741](https://github.com/airbytehq/airbyte/pull/46741) | Add checkpointing to events stream to improve large syncs after clear data |
| 2.10.9 | 2024-10-12 | [46787](https://github.com/airbytehq/airbyte/pull/46787) | Update dependencies |
| 2.10.8 | 2024-10-05 | [46503](https://github.com/airbytehq/airbyte/pull/46503) | Update dependencies |
| 2.10.7 | 2024-09-28 | [46174](https://github.com/airbytehq/airbyte/pull/46174) | Update dependencies |
Expand Down

0 comments on commit c17138a

Please sign in to comment.