Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Github: Refactor incremental state handling #39513

Merged
merged 8 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.7.6
dockerImageTag: 1.7.7
dockerRepository: airbyte/source-github
documentationUrl: https://docs.airbyte.com/integrations/sources/github
githubIssueLabel: source-github
Expand Down
484 changes: 388 additions & 96 deletions airbyte-integrations/connectors/source-github/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-github/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.7.6"
version = "1.7.7"
name = "source-github"
description = "Source implementation for GitHub."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,7 +17,7 @@ include = "source_github"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.80.0"
airbyte-cdk = "0.90.0"
sgqlc = "==16.3"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import CheckpointMixin
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from airbyte_cdk.utils import AirbyteTracedException
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(self, api_url: str = "https://api.github.com", access_token_type: s

self.access_token_type = access_token_type
self.api_url = api_url
self.state = {}

@property
def url_base(self) -> str:
Expand Down Expand Up @@ -255,7 +257,7 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
return record


class SemiIncrementalMixin:
class SemiIncrementalMixin(CheckpointMixin):
"""
Semi incremental streams are also incremental but with one difference, they:
- read all records;
Expand All @@ -279,6 +281,14 @@ def __init__(self, start_date: str = "", **kwargs):
self._start_date = start_date
self._starting_point_cache = {}

@property
def state(self) -> MutableMapping[str, Any]:
return self._state

@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value

@property
def slice_keys(self):
if hasattr(self, "repositories"):
Expand All @@ -295,7 +305,7 @@ def state_checkpoint_interval(self) -> Optional[int]:
if self.is_sorted == "asc":
return self.page_size

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state
object and returning an updated state object.
Expand Down Expand Up @@ -338,6 +348,7 @@ def read_records(
cursor_value = self.convert_cursor_value(record[self.cursor_field])
if not start_point or cursor_value > start_point:
yield record
self.state = self._get_updated_state(self.state, record)
elif self.is_sorted == "desc" and cursor_value < start_point:
break

Expand Down Expand Up @@ -719,7 +730,7 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,

return record

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
branch = latest_record["branch"]
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1004,7 +1015,7 @@ def request_body_json(
# Reactions streams


class ReactionStream(GithubStream, ABC):
class ReactionStream(GithubStream, CheckpointMixin, ABC):

parent_key = "id"
copy_parent_key = "comment_id"
Expand All @@ -1023,6 +1034,14 @@ def parent_entity(self):
Specify the class of the parent stream for which receive reactions
"""

@property
def state(self) -> MutableMapping[str, Any]:
return self._state

@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
parent_path = self._parent_stream.path(stream_slice=stream_slice, **kwargs)
return f"{parent_path}/{stream_slice[self.copy_parent_key]}/reactions"
Expand All @@ -1032,7 +1051,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
yield {self.copy_parent_key: parent_record[self.parent_key], "repository": stream_slice["repository"]}

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
parent_id = str(latest_record[self.copy_parent_key])
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1066,6 +1085,7 @@ def read_records(
):
if not starting_point or record[self.cursor_field] > starting_point:
yield record
self.state = self._get_updated_state(self.state, record)

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record = super().transform(record, stream_slice)
Expand Down Expand Up @@ -1321,6 +1341,7 @@ def read_records(
):
if not starting_point or record[self.cursor_field] > starting_point:
yield record
self.state = self._get_updated_state(self.state, record)

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
if stream_state:
Expand All @@ -1333,7 +1354,7 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
project_id = str(latest_record["project_id"])
updated_state = latest_record[self.cursor_field]
Expand Down Expand Up @@ -1391,6 +1412,7 @@ def read_records(
):
if not starting_point or record[self.cursor_field] > starting_point:
yield record
self.state = self._get_updated_state(self.state, record)

def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any]) -> str:
if stream_state:
Expand All @@ -1404,7 +1426,7 @@ def get_starting_point(self, stream_state: Mapping[str, Any], stream_slice: Mapp
return stream_state_value
return self._start_date

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
repository = latest_record["repository"]
project_id = str(latest_record["project_id"])
column_id = str(latest_record["column_id"])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
{
"credentials": {
"personal_access_token": "personal_access_token"
},
"credentials": { "personal_access_token": "personal_access_token" },
"repository": "airbytehq/airbyte airbytehq/airbyte-platform",
"start_date": "2000-01-01T00:00:00Z",
"branch": "airbytehq/airbyte/master airbytehq/airbyte-platform/main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str,
for slice in slices:
records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state)
for record in records:
stream_state = stream_instance.get_updated_state(stream_state, record)
stream_state = stream_instance._get_updated_state(stream_state, record)
res.append(record)
return res

Expand Down
Loading
Loading