Pass _airbyte_emitted_at
or airbyte run identifier to DBT to allow DBT transformation to locate records specific to an Airbyte sync job
#36067
yuhuishi-convect
started this conversation in
Connector Ideas and Features
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Tell us about the problem you're trying to solve
I have a stream that syncs product information in FULL_REFRESH mode from source and APPEND the latest records to a destination table. Therefore, the destination table contains snapshots of the product info at different timestamps (or as results of different Airbyte sync jobs).
A downstream DBT transformation is set up to always fetch the latest snapshot of the ingested product info, clean it and load it to a final table that only keeps the one latest snapshot of the cleaned product records.
So when fetching the records, DBT needs to locate the records that correspond to the latest sync job. However, there is no clear identifier that DBT can use as a filter to get records resulting from a specific Airbyte sync job.
_airbyte_emitted_at
might be one candidate if we only want the records resulting from the latest sync job (by making_airbyte_emitted_at == max(_airbyte_emitted_at)
)Describe the solution you’d like
I am wondering if it is possible that I can pass the
_airbyte_emitted_at
or other job identifier (such as the Airbyte job id) as a variable from Airbyte to DBT as CLI arguments to tell the transformation which snapshot of records it is going to operate on?If this is possible, it will be more friendly to re-running "backfill" jobs from DBT on history snapshots of records, since the snapshot is explicitly passed as an argument.
One example this is needed is when a latest ingested snapshot is corrupted, and I want to fall back to an older snapshot.
Describe the alternative you’ve considered or used
Right now, I am using a MACRO function in dbt to query the product destination table to get the max value of
_airbyte_emitted_at
, so all downstream cleaning operations can use it as a filter to only get the latest snapshot records.But during backfilling, this won't work since we a history
_airbyte_emitted_at
or identifier is needed .Additional context
I am thinking that Airbyte may want to (or already has) expose some ENV variable to DBT, like the timestamp when the ingestion is triggered, i.e.,
_airbyte_emitted_at
, ingestion job id or so. So the downstream jobs can use these variables as filters to locate records corresponding to the target Airbyte job in an destination table (that uses APPEND sync mode).Are you willing to submit a PR?
Would love to help but this seems like a major change so better leave it with pro
Beta Was this translation helpful? Give feedback.
All reactions