Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 low-code: Fix incremental substreams #35471

Merged
merged 74 commits into from
Mar 6, 2024

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Feb 21, 2024

What

How

  • Rename PerPartitionStreamSlice to StreamSlice and move it to types.py
    • Also update the partition and cursor_slice methods to recursively look up the nested PerPartitionStreamSlice when needed
  • Update all stream slicers so they always return a PerPartitionStreamSlice instead of any mappings
  • Update substream partition router so it only considers the partition part of the parent stream slice when setting the parent_slice field
    • This is the bulk of the change.
    • Technically a breaking change: SubstreamPartitionRouter expects the parent streams to be declarative to ensure they return PerPartitionStreamSlice
  • Fix mypy issues

Recommended reading order

  1. airbyte-cdk/python/airbyte_cdk/sources/declarative/types.py
  2. airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py
  3. airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
  4. airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py
  5. airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py
  6. airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/list_partition_router.py

Files with only cosmetic changes:

  1. airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py
  2. airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py
  3. airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py
  4. airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/jinja.py
  5. airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/cursor.py

🚨 User Impact 🚨

  • SubstreamPartitionRouter require the parent streams to be DeclarativeStreams
  • Incremental substreams will work in incremental mode instead of always reading all the data and appending a new state to the blob

Pre-merge Actions

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Connector version is set to 0.0.1
    • Dockerfile has version 0.0.1
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog with an entry for the initial version. See changelog example
    • docs/integrations/README.md

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Unit & integration tests added

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds then checking in your changes
  • Documentation which references the generator is updated as needed
Updating the Python CDK

Airbyter

Before merging:

  • Pull Request description explains what problem it is solving
  • Code change is unit tested
  • Build and my-py check pass
  • Smoke test the change on at least one affected connector
    • On Github: Run this workflow, passing --use-local-cdk --name=source-<connector> as options
    • Locally: airbyte-ci connectors --use-local-cdk --name=source-<connector> test
  • PR is reviewed and approved

After merging:

  • Publish the CDK
    • The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise.
    • Write a thoughtful changelog message so we know what was updated.
  • Merge the platform PR that was auto-created for updating the Connector Builder's CDK version
    • This step is optional if the change does not affect the connector builder or declarative connectors.

Copy link

vercel bot commented Feb 21, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Mar 6, 2024 2:43am

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit labels Feb 21, 2024
Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

…hq/airbyte into alex/explore_substream_state_bug
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to block as overall changes look good, but I had some comments around the main part of the changes in substream_partition_router.py that I wanted a little more clarity on

@@ -28,7 +28,7 @@ def get_request_params(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
) -> Mapping[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This in theory could have an affect if we were relying on manipulating the contents of this return value. Do you by chance have context why this specific return was originally mutable?

parent_field = parent_stream_config.parent_key.eval(self.config)
stream_state_field = parent_stream_config.partition_field.eval(self.config)
parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string
partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string
for parent_stream_slice in parent_stream.stream_slices(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None
):
empty_parent_slice = True
parent_slice = parent_stream_slice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assignment looks redundant. We don't manipulate either of these variables

for parent_stream_slice in parent_stream.stream_slices(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None
):
empty_parent_slice = True
parent_slice = parent_stream_slice
if parent_slice:
parent_partition = {k: v for k, v in parent_slice.items() if k in parent_slice.partition.keys()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. The if parent_slice: check is to protect against None stream slices that could come from the parent stream right?
  2. How come we do this complicated comprehension statement to create the dictionary if the parent slice item key is in the partition keys? Is there a reason why we can't just call parent_slice.partition().items()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. That's right. I don't really expect this to happen for declarative stream, but it is technically possible for a stream's stream_slice method to return None...
  2. Good catch. Simplified to your suggestion!

@octavia-squidington-iii octavia-squidington-iii removed the area/connectors Connector related issues label Mar 5, 2024
Copy link
Contributor

@natikgadzhi natikgadzhi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL a few things 👀 thank you for cleaning this up.

if self.end_datetime and not isinstance(self.end_datetime, MinMaxDatetime):
self.end_datetime = MinMaxDatetime(self.end_datetime, parameters)
self._start_datetime = (
MinMaxDatetime(self.start_datetime, parameters) if not isinstance(self.start_datetime, MinMaxDatetime) else self.start_datetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how exactly MinMaxDatetime works, but could we make it's constructor always return an object of MinMaxTime and accept MinMaxtime too so that we can always wrap start_datetime into it, cutting on one if statement?

Sure, that's one more function call, but perhaps more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. Added a factory method for MinMaxDatetime and simplified this call

self._end_datetime = (
None
if not self.end_datetime
else MinMaxDatetime(self.end_datetime, parameters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, nested ifs are harder to read — perhaps if MinMaxtime is always called on that, we cut out one if, and it's easier to read instantly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. Added a factory method for MinMaxDatetime and simplified this call

@@ -109,17 +116,20 @@ def set_initial_state(self, stream_state: StreamState) -> None:

:param stream_state: The state of the stream as returned by get_stream_state
"""
self._cursor = stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None
self._cursor = stream_state.get(self._cursor_field.eval(self.config)) if stream_state else None

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why narrow it?

@@ -179,16 +123,16 @@ def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[Str
return None

@staticmethod
def _is_new_state(stream_state):
def _is_new_state(stream_state: Mapping[str, Any]) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not be strict on type here? We don't have a special StreamState type as that mapping yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's correct. We have types for StreamSlice and Record, but StreamState is still TBD

@girarda girarda merged commit f55abc1 into master Mar 6, 2024
27 checks passed
@girarda girarda deleted the alex/explore_substream_state_bug branch March 6, 2024 02:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit connectors/source/chargebee
Projects
None yet
5 participants