Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector builder] BUG: Incremental substream state is appended rather than replaced (and start_time is not updated) #33854

Closed
trevor-petach opened this issue Jan 1, 2024 · 4 comments · Fixed by #35471

Comments

@trevor-petach
Copy link

trevor-petach commented Jan 1, 2024

Problem

Topic

Connector builder

Relevant information

I built a connector using the low-code connector framework (attached - uploaded as .txt since Github won't accept .yml).

I tried to create an incremental substream. My substream is https://api.alchemer.com/v5/survey/<survey_id>/surveyresponse, where I iterate through the surveys to pull the survey responses for each survey. Each response has a submitted_at field. The API allows filtering based on this field. So, for each survey, I would like to pull only the responses that were submitted since the last sync (and I will iterate through all surveys on each sync).

However, the connector deployed from the connector builder .yml attached extracts and loads all survey responses for each survey on every sync.

The state below is the value after running a daily sync for several days.

It appears that Airbyte is not updating the state, but rather appending the state each sync for the substream, and thus keeps using the user-supplied start_time in the API request, rather than setting the start_time to be that of the last record from the previous sync.

[
  {
    "streamDescriptor": {
      "name": "survey"
    },
    "streamState": {
      "modified_on": "2024-01-01+06:26:48"
    }
  },
  {
    "streamDescriptor": {
      "name": "surveyresponse"
    },
    "streamState": {
      "states": [
        {
          "cursor": {
            "date_submitted": "2023-12-29+06:13:29"
          },
          "partition": {
            "parent_id": "7628838",
            "parent_slice": {
              "end_time": "2023-12-29+06:13:29",
              "start_time": "2023-12-01+00:00:00"
            }
          }
        },
        {
          "cursor": {
            "date_submitted": "2023-12-30+06:16:31"
          },
          "partition": {
            "parent_id": "7628838",
            "parent_slice": {
              "end_time": "2023-12-30+06:16:31",
              "start_time": "2023-12-01+00:00:00"
            }
          }
        },
        {
          "cursor": {
            "date_submitted": "2023-12-31+06:17:51"
          },
          "partition": {
            "parent_id": "7628838",
            "parent_slice": {
              "end_time": "2023-12-31+06:17:51",
              "start_time": "2023-12-01+00:00:00"
            }
          }
        },
        {
          "cursor": {
            "date_submitted": "2024-01-01+06:26:50"
          },
          "partition": {
            "parent_id": "7628838",
            "parent_slice": {
              "end_time": "2024-01-01+06:26:49",
              "start_time": "2023-12-01+00:00:00"
            }
          }
        }
      ]
    }
  }
]

alchemer_testing_copy.txt

Implementation Notes

Note: Alchemer's api is only accessible to enterprise customers.

The working hypothesis for why this is happening is as follows:

  • This problem occurs for all incremental substreams that rely on a parent stream that is also incremental.
  • Currently, the PerPartitionCursor that uses a SubstreamPartitionRouter manages state per parent record by creating a key made up of hash of the parent record's id and slice it belongs to parent_slice.
  • This doesn't work properly for incremental parent streams because the first iteration will yield a final slice with an end time that is around when the sync original started.
  • However, on the subsequent incremental sync (hypothetically 1 day later), when the connector's SubstreamPartitionRouter creates the new set of date time ranges, the last slice will have an end time of the current time
  • The state from the first sync is accessed during subsequent syncs using the partition key (combination of parent_id and parent_slice). The end date of the previous state and the current running sync will never match up and this causes the sync to end up doing a full refresh for each parent record and leads to us appending a new state entry instead of overwriting because the partition keys don't match
  • I've written a few tests to confirm that this was happening as well

Options Discussed:

Option 1: Make SubstreamPartitionRouter emit partition and cursor values in separate fields

  • The SubstreamPartitionRouter currently gets the parent stream’s partitions or cursor values using the stream_slices(). It returns these back to PerPartitionCursor as an array of objects for each record where each object has a parent_id set to the record ID and a parent_slice set to the partition/cursor value.
  • This approach still forces us to have conditional branches during

Option 2: SubstreamPartitionRouter should not emit parent_slice if parent stream is incremental

  • This approach fixes the issue because now the SubstreamPartitionRouter that the PerPartitionRouter depends on will only take the parent_key and parent_value into consideration when reading from the incoming state. It also retains the existing functionality where other partition routers that don’t use time based cursors aren’t affected
  • The biggest drawback to this approach is that it raises the overall complexity of an already complicated module by having more branching conditional logic. And from the perspective of the user, its slightly confusing behavior to no longer have an incremental parent stream’s parent_slice not available.

We should spike this proactively to verify that an incremental substream works for a parent stream:

  • Incremental parent stream
  • List partition parent stream
  • List partition grandparent stream, incremental parent stream (PerPartitionCursor parent) <- brain explodes
  • List partition grandparent stream, list partition parent stream
  • Incremental grandparent stream. incremental parent stream

Acceptance Criteria

  • We are most likely leaning towards option 1 and the acceptance criteria is as described
  • Conduct a spike to verify option 1 is viable based on criteria mentioned before
  • This scenario needs to be unit tested at two levels. Although the fix is at the SubstreamPartitionRouter level, due to the complexity, we should also at minimum also test this at the PerPartitionRouter that uses a SubstreamPartitionRouter
  • If it doesn't pose too much of a challenge, trying to get this working e2e w/ a working manifest would be good validation. The closest example I can find is source-greenhouse which supports an incremental parent stream. But it currently uses custom components, so the test manifest might need to be rewritten unfortunately
@trevor-petach trevor-petach changed the title [connector builder] Incremental substream state is appended rather than replaced [connector builder] BUG: Incremental substream state is appended rather than replaced Jan 2, 2024
@trevor-petach trevor-petach changed the title [connector builder] BUG: Incremental substream state is appended rather than replaced [connector builder] BUG: Incremental substream state is appended rather than replaced and state start_time is not updated Jan 2, 2024
@trevor-petach trevor-petach changed the title [connector builder] BUG: Incremental substream state is appended rather than replaced and state start_time is not updated [connector builder] BUG: Incremental substream state is appended rather than replaced (and start_time is not updated) Jan 2, 2024
@brianjlai
Copy link
Contributor

grooming notes:

  • another option is in the PerPartitionRouter, we check the map to see if there is a slice made up of both the parent_id + parent_slice, and if it does not exist, we check if there is a slice made up of just parent_id. This retains existing functionality, while also still finding the right parent value.
  • within the SubstreamPartitionRouter.stream_slices(), we should emit a new section for the time window. It would look roughly like:
{
  "parent_id": "1234",
  "parent_slice": {
    "partition_key_1": "5678" // Not relevant to the above example but used if the parent was a partition router
  },
  "cursor": {
    "end_time": "2023-12-29+06:13:29", // this object gets populated with the ranges if parent was a incremental cursor
    "start_time": "2023-12-01+00:00:00"
  }
}
  • do we have low-code connectors using 3 levels of parent substreams?
  • we also need to consider the case of multiple parent substreams since the above case could happen?
  • we should probably spike this since our discussion on the why and how to fix got very complicated: when we query stream slices, the resulting slice object should have separate fields for the partition and the time window.

@brianjlai
Copy link
Contributor

We've had a lot of discussion around how to best tackle this because of how it impacts the Python CDK and the low-code CDK interface methods. Specifically the stream_slices() method on the Stream interface. I have a few other high priority tickets on my hands that are going to take prioirty, but we want to keep this in the top of our product backlog because this is something we really should address.

Temporarily moving this out of the current sprint, but the plan will be to pick this back up once a few other pieces of work get wrapped up. I'm hoping we can find a way to introduce this change w/o the interface change to fix the issue at hand even if the ideal fix can come later or in a follow up

@pnilan
Copy link
Contributor

pnilan commented Feb 8, 2024

@brianjlai +1 for this issue. Ran into it for the attached_item stream for Source Chargebee during certification process.

@girarda girarda assigned girarda and unassigned brianjlai Feb 15, 2024
@girarda
Copy link
Contributor

girarda commented Feb 22, 2024

end of sprint update: PR is in progress. should be wrapped up soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants