-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Github: Refactor incremental state handling #39513
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
@@ -337,6 +347,7 @@ def read_records( | |||
): | |||
cursor_value = self.convert_cursor_value(record[self.cursor_field]) | |||
if not start_point or cursor_value > start_point: | |||
self.state = self._get_updated_state(self.state, record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works under the condition that we don't use self._state
in stream_slices
because of the availability strategy. This seems a bit dangerous but I don't have a better way to do that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also assumes that the records read during the availability check are older than the first one synced in the read_records. This also seems like a fair assumption but the flow is starting to get very complex it feels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting here that the call to update state has been shifted to occur AFTER a record is yielded, to ensure state is only updated after a record has been processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seemed like a good solution when we discussed and I still think this is the better solution but it means that checkpoint_interval
will be one record being as the logic is:
- source gets record
- stream yields record
- emit state based on checkpoint interval
- source gets another record
- stream update state
- stream yields record
- etc...
I don't mind this drawback. Just noting that it can probably still be improved
…irbyte into christo/github/state
What
Refactors incremental streams in source github to use the
Checkpoint
mixin class for handling state. Theget_updated_state
method is converted to a private method that is invoked by the state setter during the call to read records.User Impact
No impact. Link to regression test results
Can this PR be safely reverted and rolled back?