Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Github: Refactor incremental state handling #39513

Merged
merged 8 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.7.6
dockerImageTag: 1.7.7
dockerRepository: airbyte/source-github
documentationUrl: https://docs.airbyte.com/integrations/sources/github
githubIssueLabel: source-github
Expand Down
484 changes: 388 additions & 96 deletions airbyte-integrations/connectors/source-github/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-github/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.7.6"
version = "1.7.7"
name = "source-github"
description = "Source implementation for GitHub."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,7 +17,7 @@ include = "source_github"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "0.90.0"
sgqlc = "==16.3"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import CheckpointMixin
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.utils import AirbyteTracedException
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(self, api_url: str = "https://api.github.com", access_token_type: s

self.access_token_type = access_token_type
self.api_url = api_url
self.state = {}

@property
def url_base(self) -> str:
Expand Down Expand Up @@ -255,7 +257,7 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
return record


class SemiIncrementalMixin:
class SemiIncrementalMixin(CheckpointMixin):
"""
Semi incremental streams are also incremental but with one difference, they:
- read all records;
Expand All @@ -279,6 +281,14 @@ def __init__(self, start_date: str = "", **kwargs):
self._start_date = start_date
self._starting_point_cache = {}

@property
def state(self) -> MutableMapping[str, Any]:
return self._state

@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value

@property
def slice_keys(self):
if hasattr(self, "repositories"):
Expand All @@ -295,7 +305,7 @@ def state_checkpoint_interval(self) -> Optional[int]:
if self.is_sorted == "asc":
return self.page_size

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state
object and returning an updated state object.
Expand Down Expand Up @@ -337,6 +347,7 @@ def read_records(
):
cursor_value = self.convert_cursor_value(record[self.cursor_field])
if not start_point or cursor_value > start_point:
self.state = self._get_updated_state(self.state, record)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works under the condition that we don't use self._state in stream_slices because of the availability strategy. This seems a bit dangerous but I don't have a better way to do that...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also assumes that the records read during the availability check are older than the first one synced in the read_records. This also seems like a fair assumption but the flow is starting to get very complex it feels

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting here that the call to update state has been shifted to occur AFTER a record is yielded, to ensure state is only updated after a record has been processed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed like a good solution when we discussed and I still think this is the better solution but it means that checkpoint_interval will be one record being as the logic is:

  • source gets record
  • stream yields record
  • emit state based on checkpoint interval
  • source gets another record
  • stream update state
  • stream yields record
  • etc...

I don't mind this drawback. Just noting that it can probably still be improved

yield record
elif self.is_sorted == "desc" and cursor_value < start_point:
break
Expand Down Expand Up @@ -719,7 +730,7 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,

return record

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
branch = latest_record["branch"]
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1004,7 +1015,7 @@ def request_body_json(
# Reactions streams


class ReactionStream(GithubStream, ABC):
class ReactionStream(GithubStream, CheckpointMixin, ABC):

parent_key = "id"
copy_parent_key = "comment_id"
Expand All @@ -1023,6 +1034,14 @@ def parent_entity(self):
Specify the class of the parent stream for which receive reactions
"""

@property
def state(self) -> MutableMapping[str, Any]:
return self._state

@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
parent_path = self._parent_stream.path(stream_slice=stream_slice, **kwargs)
return f"{parent_path}/{stream_slice[self.copy_parent_key]}/reactions"
Expand All @@ -1032,7 +1051,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
yield {self.copy_parent_key: parent_record[self.parent_key], "repository": stream_slice["repository"]}

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
parent_id = str(latest_record[self.copy_parent_key])
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1065,6 +1084,7 @@ def read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
if not starting_point or record[self.cursor_field] > starting_point:
self.state = self._get_updated_state(self.state, record)
yield record

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -1320,6 +1340,7 @@ def read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
if not starting_point or record[self.cursor_field] > starting_point:
self.state = self._get_updated_state(self.state, record)
yield record

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
Expand All @@ -1333,7 +1354,7 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
project_id = str(latest_record["project_id"])
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1390,6 +1411,7 @@ def read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
if not starting_point or record[self.cursor_field] > starting_point:
self.state = self._get_updated_state(self.state, record)
yield record

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
Expand All @@ -1404,7 +1426,7 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
project_id = str(latest_record["project_id"])
column_id = str(latest_record["column_id"])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
{
"credentials": {
"personal_access_token": "personal_access_token"
},
"credentials": { "personal_access_token": "personal_access_token" },
"repository": "airbytehq/airbyte airbytehq/airbyte-platform",
"start_date": "2000-01-01T00:00:00Z",
"branch": "airbytehq/airbyte/master airbytehq/airbyte-platform/main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str,
for slice in slices:
records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state)
for record in records:
stream_state = stream_instance.get_updated_state(stream_state, record)
stream_state = stream_instance._get_updated_state(stream_state, record)
res.append(record)
return res

Expand Down
Loading
Loading