Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(airbyte-cdk): Fix yielding parent records in SubstreamPartitionRouter #46918

Merged
merged 15 commits into from
Oct 18, 2024

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Oct 15, 2024

What

This fix addresses two key issues related to the SubstreamPartitionRouter:

  1. The first issue was that parent records were being buffered instead of being yielded sequentially as they were read. This led to incorrect intermediate states and the potential loss of child records if the sync was interrupted.
  2. The second issue involves itertools.tee buffering the results of stream_slices. When the iterator was called a second time, it returned stored results rather than reprocessing them, bypassing the necessary cursor observation logic in simple_retriever.

Resolves: https://github.com/airbytehq/airbyte-internal-issues/issues/10095

How

  • The logic in SubstreamPartitionRouter has been simplified to ensure parent records are yielded sequentially, regardless of the incremental_dependency flag.
  • Overrides the _get_checkpoint_reader method for DeclarativeStream to avoid using itertools.tee, ensuring all parent records are correctly processed and observed.

Review guide

  1. substream_partition_router.py
  2. declarative_stream.py
  3. test_substream_partition_router.py: Fixes the issue where output messages refer to the same state object, ensuring correct values for intermediate states by copying messages.

User Impact

This change ensures that users no longer lose child records if the sync is interrupted, and the state is properly updated after each parent record is processed. It also prevents the issue where only the parent stream was read initially, ensuring sequential processing of both parent and child streams.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented Oct 15, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Oct 18, 2024 11:47am

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Oct 15, 2024
@tolik0 tolik0 self-assigned this Oct 15, 2024
@tolik0 tolik0 marked this pull request as ready for review October 16, 2024 13:50
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is to understand why the fix on the itertools.tee only needs to be applied to DeclarativeStream.

Regardless, this is solid work and I can't imagine the amount of debugging it needed. I have a request that we can outscope or just do partially: would there have been information that would have been useful to have as logs in order to debug? If so, should we add debug logs here and there to ease upcoming investigations?

checkpoint_mode = self._checkpoint_mode

if cursor is not None:
self.has_multiple_slices = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a cursor/stream_slicer concept and not a stream one. Would it make sense to move that on the cursor side? I'm asking because I fear that this is not true for cases like IncrementalSingleSlice i.e. it is not because we have a cursor that we have multiple slices. That being said, I understand that for the source-monday case I just referenced that it wouldn't change anything as self.supports_incremental == True so self.has_multiple_slices is not considered but to be more precise, the case I fear is that we would have a cursor without cursor field, something like a NoopCursor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason it's here is because this has_multiple_slices field is utilized for both low-code and regular python sources and it was already being stored on the Stream class.

Are you proposing that we have a property method on DeclarativeStream or something that calls the method/property on DatetimeBasedCursor + PerPartitionRouter + SubstreamPartitionRouter? If thats the case, I don't have strong opinions either but seems fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it today, I would assume it would be on the StreamSlicer interface.

I think what I'm struggling to understand when I see the name has_multiple_slices is basically could_produce_multiple_slices or something as it is used to:

  • Define if a stream is resumable
  • Assign a SubstreamResumableFullRefreshCursor cursor instead of a ResumableFullRefreshCursor in HttpStream
    It feels like we wouldn't have to consume slices for classification if we would have the StreamSlicer explicitly mention is. This means that it pushes the complexity to the StreamSlicer object to know (which is simpler to implement because each stream slicer should know that but is a bit more challenging in terms of migration...) If we put a default value of "false" my understanding is that the worst case is that it won't be flagged as resumable and therefore won't leverage RFR which is not the worst.

Given that we already have a working solution for that, I'm incline to not change that. However, I think this is still an interesting solution to have in the back of our mind in case we face more issues with that. I don't have a example for the NoopCursor I was mentioning so I'm fine with ignoring this concern as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the method to only bypass classification for the PerPartitionCursor, as the issue is specific to streams with parent streams.

The problem occurs because the classification logic, when used with itertools.tee, creates a copy of the stream slices. When stream_slices is called the second time, the parent records generated during the classification phase are lost. This happens because itertools.tee only buffers the results, preventing the simple_retriever from observing and updating the cursor again.

Moving this logic to the cursor would require a major CDK release, as all cursors would need to implement this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so basically, the state is set before read is called but the first slice is not generated again so we are losing the progress on the state.

My question is mostly: can this also be an issue for non-declarative cursors? If so, do we have a plan for these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this issue can affect any stream where stream_slices updates internal values. I've created an issue for further investigation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for filing the issue @tolik0 . I definitely agree this can affect Python sources. Although I think the likely case of why we don't see happen w/ python sources often if ever is the base implementation of HttpSubStream for python sources never really had been coded to use an incremental parent state and it was basically a full refresh every time.

Using parent incremental state was a low-code feature we never back ported. I pray that we never have to unless we find a good reason to @maxi297 this is also something we'll need to factor into our design of the substream cursor in concurrent CDK

return list(source.read(logger, config, catalog, state))
result = []
for m in source.read(logger, config, catalog, state):
result.append(copy.deepcopy(m))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a deepcopy here? I assume it is because we pull the state and the cursor modifies the state but it seems a bit dangerous that an Airbyte message is mutable...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's because the same stream_state dictionary is referenced. It doesn't affect the sync since we don't buffer the output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking the code, it seems like we return a new dict on every call. Hence, my hypothesis of "we re-use the same state" seems wrong. Do you think it is worth investigating more? It seems like a bad smell to me that we need to add that and I think there might be value in confirming if this is a change in behavior or it is just because of the new assertions we added and the behavior is the same as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change is needed because of the update I made to the test with intermediate states. I’ll investigate further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the outcome on this?

Copy link
Contributor Author

@tolik0 tolik0 Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was caused by the _parent_state in SubstreamPartitionRouter: 62fd7aa
Currently, we can avoid using copy when assigning the parent state. However, using copy provides additional safety and prevents similar issues in the future if the parent stream returns the same object instead of creating a new one.

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added my comments as part of the discussion, I don't have any specific changes to request beyond us just talking through what the best option for the missing final state message is

checkpoint_mode = self._checkpoint_mode

if cursor is not None:
self.has_multiple_slices = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason it's here is because this has_multiple_slices field is utilized for both low-code and regular python sources and it was already being stored on the Stream class.

Are you proposing that we have a property method on DeclarativeStream or something that calls the method/property on DatetimeBasedCursor + PerPartitionRouter + SubstreamPartitionRouter? If thats the case, I don't have strong opinions either but seems fine

return self._cursor.get_stream_state()
else:
return None
return self._cursor.get_stream_state()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to discuss the issue you brought up @tolik0 around missing the last stream state on the PR itself so context isn't lost.

So if I'm understanding the issue, because we are emitting parent slices without waiting, by the time we update self._parent_state to its final value at the end of SubstreamPartitionRouter.stream_slices(), all records and slices were already processed. However, we have not emitted the final parent state. And the checkpoint reader is marked as _finished_sync = True`, so we don't emit the final state?

If so are the options

  1. How you have it now and accept that we emit at least one extra state message per-stream?
  2. Modify the internal PerPartitionCursor implementation to somehow emit final state. I haven't thought about how to do this, but is this your thinking?
  3. Add conditional branch that if this is a PerPartitionCursor then we return self._cursor.get_stream_state()

I think I prefer option 1 over option 3 because bleeding low-code implementation details is something I'd like avoid if possible. 2 feels like the best option if this doesn't prove to be too difficult. But if it proves very hard, I feel like option 1 is not too bad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you’re correct. We’re not emitting the final state because we only get the final state after stream_slices() finishes. I don’t see how we can implement the second option, as we don’t have access to the stream, and we can’t emit the state directly from PerPartitionCursor. I wasn’t sure why we had this workaround, so I was considering options 1 and 3. However, I agree that option 1 is much better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay cool we are on the same page. My only worry is that we might have some unit/mock server tests break because we might be checking for duplicates or an exact number of state message counts. But updating those seems fine and how you have it (basically option 1) looks fine to me.

Thanks for confirming

@tolik0
Copy link
Contributor Author

tolik0 commented Oct 17, 2024

@maxi297 Regarding the additional logs: I’m not sure they’re necessary. During debugging, I added prints exactly where I needed them, and even then, it was challenging to navigate through them. For example, I logged the observation and closing of the cursor, and while the logs were correct and in the right order, the result was still wrong. I believe adding debug logs before every internal value change could overwhelm the logs and make them harder to use effectively.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving assuming the CI passes! Thanks for your diligent work on this @tolik0 . This is great!

Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work @tolik0 this had so many tricky edge cases and bugs you found and fixed!

return self._cursor.get_stream_state()
else:
return None
return self._cursor.get_stream_state()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay cool we are on the same page. My only worry is that we might have some unit/mock server tests break because we might be checking for duplicates or an exact number of state message counts. But updating those seems fine and how you have it (basically option 1) looks fine to me.

Thanks for confirming

checkpoint_mode = self._checkpoint_mode

if cursor is not None:
self.has_multiple_slices = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for filing the issue @tolik0 . I definitely agree this can affect Python sources. Although I think the likely case of why we don't see happen w/ python sources often if ever is the base implementation of HttpSubStream for python sources never really had been coded to use an incremental parent state and it was basically a full refresh every time.

Using parent incremental state was a low-code feature we never back ported. I pray that we never have to unless we find a good reason to @maxi297 this is also something we'll need to factor into our design of the substream cursor in concurrent CDK

@bazarnov
Copy link
Collaborator

bazarnov commented Oct 18, 2024

/approve-regression-tests

Check job output.

✅ Approving regression tests

@bazarnov
Copy link
Collaborator

bazarnov commented Oct 18, 2024

/approve-regression-tests

Check job output.

✅ Approving regression tests

@tolik0 tolik0 merged commit 569ed5c into master Oct 18, 2024
37 checks passed
@tolik0 tolik0 deleted the tolik0/airbyte-cdk/fix-substream-partition-router branch October 18, 2024 13:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants