-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(airbyte-cdk): Fix yielding parent records in SubstreamPartitionRouter #46918
fix(airbyte-cdk): Fix yielding parent records in SubstreamPartitionRouter #46918
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is to understand why the fix on the itertools.tee only needs to be applied to DeclarativeStream.
Regardless, this is solid work and I can't imagine the amount of debugging it needed. I have a request that we can outscope or just do partially: would there have been information that would have been useful to have as logs in order to debug? If so, should we add debug logs here and there to ease upcoming investigations?
checkpoint_mode = self._checkpoint_mode | ||
|
||
if cursor is not None: | ||
self.has_multiple_slices = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a cursor/stream_slicer concept and not a stream one. Would it make sense to move that on the cursor side? I'm asking because I fear that this is not true for cases like IncrementalSingleSlice i.e. it is not because we have a cursor that we have multiple slices. That being said, I understand that for the source-monday case I just referenced that it wouldn't change anything as self.supports_incremental == True
so self.has_multiple_slices
is not considered but to be more precise, the case I fear is that we would have a cursor without cursor field, something like a NoopCursor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason it's here is because this has_multiple_slices
field is utilized for both low-code and regular python sources and it was already being stored on the Stream
class.
Are you proposing that we have a property method on DeclarativeStream
or something that calls the method/property on DatetimeBasedCursor + PerPartitionRouter + SubstreamPartitionRouter? If thats the case, I don't have strong opinions either but seems fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it today, I would assume it would be on the StreamSlicer interface.
I think what I'm struggling to understand when I see the name has_multiple_slices
is basically could_produce_multiple_slices
or something as it is used to:
- Define if a stream is resumable
- Assign a
SubstreamResumableFullRefreshCursor
cursor instead of aResumableFullRefreshCursor
inHttpStream
It feels like we wouldn't have to consume slices for classification if we would have theStreamSlicer
explicitly mention is. This means that it pushes the complexity to the StreamSlicer object to know (which is simpler to implement because each stream slicer should know that but is a bit more challenging in terms of migration...) If we put a default value of "false" my understanding is that the worst case is that it won't be flagged as resumable and therefore won't leverage RFR which is not the worst.
Given that we already have a working solution for that, I'm incline to not change that. However, I think this is still an interesting solution to have in the back of our mind in case we face more issues with that. I don't have a example for the NoopCursor
I was mentioning so I'm fine with ignoring this concern as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the method to only bypass classification for the PerPartitionCursor
, as the issue is specific to streams with parent streams.
The problem occurs because the classification logic, when used with itertools.tee
, creates a copy of the stream slices. When stream_slices
is called the second time, the parent records generated during the classification phase are lost. This happens because itertools.tee
only buffers the results, preventing the simple_retriever
from observing and updating the cursor again.
Moving this logic to the cursor would require a major CDK release, as all cursors would need to implement this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so basically, the state is set before read
is called but the first slice is not generated again so we are losing the progress on the state.
My question is mostly: can this also be an issue for non-declarative cursors? If so, do we have a plan for these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this issue can affect any stream where stream_slices
updates internal values. I've created an issue for further investigation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for filing the issue @tolik0 . I definitely agree this can affect Python sources. Although I think the likely case of why we don't see happen w/ python sources often if ever is the base implementation of HttpSubStream
for python sources never really had been coded to use an incremental parent state and it was basically a full refresh every time.
Using parent incremental state was a low-code feature we never back ported. I pray that we never have to unless we find a good reason to @maxi297 this is also something we'll need to factor into our design of the substream cursor in concurrent CDK
airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py
Outdated
Show resolved
Hide resolved
return list(source.read(logger, config, catalog, state)) | ||
result = [] | ||
for m in source.read(logger, config, catalog, state): | ||
result.append(copy.deepcopy(m)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a deepcopy here? I assume it is because we pull the state and the cursor modifies the state but it seems a bit dangerous that an Airbyte message is mutable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it's because the same stream_state
dictionary is referenced. It doesn't affect the sync since we don't buffer the output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After checking the code, it seems like we return a new dict on every call. Hence, my hypothesis of "we re-use the same state" seems wrong. Do you think it is worth investigating more? It seems like a bad smell to me that we need to add that and I think there might be value in confirming if this is a change in behavior or it is just because of the new assertions we added and the behavior is the same as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this change is needed because of the update I made to the test with intermediate states. I’ll investigate further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the outcome on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue was caused by the _parent_state
in SubstreamPartitionRouter
: 62fd7aa
Currently, we can avoid using copy when assigning the parent state. However, using copy provides additional safety and prevents similar issues in the future if the parent stream returns the same object instead of creating a new one.
airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added my comments as part of the discussion, I don't have any specific changes to request beyond us just talking through what the best option for the missing final state message is
checkpoint_mode = self._checkpoint_mode | ||
|
||
if cursor is not None: | ||
self.has_multiple_slices = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason it's here is because this has_multiple_slices
field is utilized for both low-code and regular python sources and it was already being stored on the Stream
class.
Are you proposing that we have a property method on DeclarativeStream
or something that calls the method/property on DatetimeBasedCursor + PerPartitionRouter + SubstreamPartitionRouter? If thats the case, I don't have strong opinions either but seems fine
return self._cursor.get_stream_state() | ||
else: | ||
return None | ||
return self._cursor.get_stream_state() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to discuss the issue you brought up @tolik0 around missing the last stream state on the PR itself so context isn't lost.
So if I'm understanding the issue, because we are emitting parent slices without waiting, by the time we update self._parent_state
to its final value at the end of SubstreamPartitionRouter.stream_slices(), all records and slices were already processed. However, we have not emitted the final parent state. And the checkpoint reader is marked as
_finished_sync = True`, so we don't emit the final state?
If so are the options
- How you have it now and accept that we emit at least one extra state message per-stream?
- Modify the internal
PerPartitionCursor
implementation to somehow emit final state. I haven't thought about how to do this, but is this your thinking? - Add conditional branch that if this is a
PerPartitionCursor
then wereturn self._cursor.get_stream_state()
I think I prefer option 1 over option 3 because bleeding low-code implementation details is something I'd like avoid if possible. 2 feels like the best option if this doesn't prove to be too difficult. But if it proves very hard, I feel like option 1 is not too bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you’re correct. We’re not emitting the final state because we only get the final state after stream_slices()
finishes. I don’t see how we can implement the second option, as we don’t have access to the stream, and we can’t emit the state directly from PerPartitionCursor
. I wasn’t sure why we had this workaround, so I was considering options 1 and 3. However, I agree that option 1 is much better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay cool we are on the same page. My only worry is that we might have some unit/mock server tests break because we might be checking for duplicates or an exact number of state message counts. But updating those seems fine and how you have it (basically option 1) looks fine to me.
Thanks for confirming
@maxi297 Regarding the additional logs: I’m not sure they’re necessary. During debugging, I added prints exactly where I needed them, and even then, it was challenging to navigate through them. For example, I logged the observation and closing of the cursor, and while the logs were correct and in the right order, the result was still wrong. I believe adding debug logs before every internal value change could overwhelm the logs and make them harder to use effectively. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving assuming the CI passes! Thanks for your diligent work on this @tolik0 . This is great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work @tolik0 this had so many tricky edge cases and bugs you found and fixed!
return self._cursor.get_stream_state() | ||
else: | ||
return None | ||
return self._cursor.get_stream_state() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay cool we are on the same page. My only worry is that we might have some unit/mock server tests break because we might be checking for duplicates or an exact number of state message counts. But updating those seems fine and how you have it (basically option 1) looks fine to me.
Thanks for confirming
checkpoint_mode = self._checkpoint_mode | ||
|
||
if cursor is not None: | ||
self.has_multiple_slices = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for filing the issue @tolik0 . I definitely agree this can affect Python sources. Although I think the likely case of why we don't see happen w/ python sources often if ever is the base implementation of HttpSubStream
for python sources never really had been coded to use an incremental parent state and it was basically a full refresh every time.
Using parent incremental state was a low-code feature we never back ported. I pray that we never have to unless we find a good reason to @maxi297 this is also something we'll need to factor into our design of the substream cursor in concurrent CDK
/approve-regression-tests
|
/approve-regression-tests
|
What
This fix addresses two key issues related to the
SubstreamPartitionRouter
:itertools.tee
buffering the results ofstream_slices
. When the iterator was called a second time, it returned stored results rather than reprocessing them, bypassing the necessary cursor observation logic insimple_retriever
.Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/10095
How
SubstreamPartitionRouter
has been simplified to ensure parent records are yielded sequentially, regardless of theincremental_dependency
flag._get_checkpoint_reader
method forDeclarativeStream
to avoid usingitertools.tee
, ensuring all parent records are correctly processed and observed.Review guide
substream_partition_router.py
declarative_stream.py
test_substream_partition_router.py
: Fixes the issue where output messages refer to the same state object, ensuring correct values for intermediate states by copying messages.User Impact
This change ensures that users no longer lose child records if the sync is interrupted, and the state is properly updated after each parent record is processed. It also prevents the issue where only the parent stream was read initially, ensuring sequential processing of both parent and child streams.
Can this PR be safely reverted and rolled back?