-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk Load CDK: Root-Level Flattening & S3V2 Usage #48377
Bulk Load CDK: Root-Level Flattening & S3V2 Usage #48377
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple small comments, otherwise lgtm
) | ||
) | ||
if (flatten) { | ||
(schema as ObjectType).properties.forEach { (name, field) -> properties[name] = field } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should throw if a property clashes with an airbyte field? (honestly not sure, I've never figured out what the expected behavior is for people chaining source -> airbyte -> destination -> airbyte -> destination)
@@ -60,6 +61,19 @@ class AirbyteValueWithMetaToOutputRecord { | |||
} | |||
} | |||
|
|||
fun AirbyteValue.maybeUnflatten(wasFlattened: Boolean): ObjectValue { | |||
this as ObjectValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this? (unless it's for the sake of doing a smart cast, in which case add a comment? or maybe define this as fun ObjectValue.maybeUnflatten
)
is DateValue -> value.value | ||
is IntValue -> value.value.toString() | ||
is TimeValue -> value.value | ||
is UnknownValue -> "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should preserve the original json for unknown value?
import io.airbyte.cdk.load.data.UnknownType | ||
import io.airbyte.cdk.load.data.json.toAirbyteValue | ||
import io.airbyte.cdk.load.util.deserializeToNode | ||
import org.apache.commons.csv.CSVRecord | ||
|
||
class CsvRowToAirbyteValue { | ||
fun convert(row: CSVRecord, schema: AirbyteType): AirbyteValue { | ||
print("converting row: $row") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print("converting row: $row") |
25e79bd
to
99e4115
Compare
e3fea36
to
45a6f89
Compare
45a6f89
to
1b09fbf
Compare
What
Adds support for root-level flattening to ObjectStorage + S3V2 usage (configurable for Json/CSV, required for Avro/Parquet).
Root level flattening means that the client fields are written to the top level of the resulting record instead of to an
_airbyte_data
subfield.So given the client data
{"id": 1, "name": "Bob"}
{"_airbyte_raw_id": "<uuid>", "_airbyte_generation_id": 10, ..., "id": 1, "name": "Bob" }
{"_airbyte_raw_id": "<uuid>", "_airbyte_generation_id": 10, ..., "_airbyte_data": { "id": 1, "name": "Bob" } }
Specifically
schema -> schema with meta
andrecord -> record data with meta