Skip to content

Commit

Permalink
Bulk Load CDK: Parquet toolkit and S3V2 Support (#47122)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 21, 2024
1 parent 7faa1dc commit 7ec23bb
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ class AvroRecordToAirbyteValue {
throw UnsupportedOperationException("ObjectTypeWithEmptySchema is not supported")
is ObjectTypeWithoutSchema ->
throw UnsupportedOperationException("ObjectTypeWithoutSchema is not supported")
is StringType -> return StringValue((avroValue as Utf8).toString())
is StringType ->
return StringValue(
when (avroValue) {
is Utf8 -> avroValue.toString() // Avro
is String -> avroValue // Avro via Parquet
else ->
throw IllegalArgumentException("Unsupported string type: $avroValue")
}
)
is TimeType -> throw UnsupportedOperationException("TimeType is not supported")
is TimestampType ->
throw UnsupportedOperationException("TimestampType is not supported")
Expand Down
5 changes: 5 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')

// TODO: Separate individual format types from the core format spec
// and migrate them to their respective toolkits, so that these
// dependencies can be removed.
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro')
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-parquet')

api("org.apache.commons:commons-csv:1.10.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ interface ObjectStorageFormatSpecificationProvider {
@JsonSubTypes(
JsonSubTypes.Type(value = JsonFormatSpecification::class, name = "JSONL"),
JsonSubTypes.Type(value = CSVFormatSpecification::class, name = "CSV"),
JsonSubTypes.Type(value = AvroFormatSpecification::class, name = "AVRO"),
JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "PARQUET")
JsonSubTypes.Type(value = AvroFormatSpecification::class, name = "Avro"),
JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "Parquet")
)
sealed class ObjectStorageFormatSpecification(
@JsonSchemaTitle("Format Type") @JsonProperty("format_type") open val formatType: Type
) {
enum class Type(@get:JsonValue val typeName: String) {
JSONL("JSONL"),
CSV("CSV"),
AVRO("AVRO"),
PARQUET("PARQUET")
AVRO("Avro"),
PARQUET("Parquet")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration
import io.airbyte.cdk.load.data.avro.toAirbyteValue
import io.airbyte.cdk.load.data.avro.toAvroSchema
import io.airbyte.cdk.load.data.toAirbyteValue
Expand All @@ -19,6 +20,7 @@ import io.airbyte.cdk.load.file.avro.toAvroReader
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.airbyte.cdk.load.file.parquet.toParquetReader
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.toOutputRecord
import io.airbyte.cdk.load.util.deserializeToNode
Expand Down Expand Up @@ -95,6 +97,15 @@ class ObjectStorageDataDumper(
.toList()
}
}
else -> error("Unsupported format")
is ParquetFormatConfiguration -> {
inputStream
.toParquetReader(stream.schemaWithMeta.toAvroSchema(stream.descriptor))
.use { reader ->
reader
.recordSequence()
.map { it.toAirbyteValue(stream.schemaWithMeta).toOutputRecord() }
.toList()
}
}
}
}
17 changes: 17 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-parquet/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro')

api ('org.apache.hadoop:hadoop-common:3.4.0') {
exclude group: 'org.apache.zookeeper'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-common'
}
api ('org.apache.hadoop:hadoop-mapreduce-client-core:3.4.0') {
exclude group: 'org.apache.zookeeper'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-common'
}
api 'org.apache.parquet:parquet-avro:1.14.2'

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.parquet

import java.io.Closeable
import java.io.File
import java.io.InputStream
import kotlin.io.path.outputStream
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.avro.AvroReadSupport
import org.apache.parquet.hadoop.ParquetReader as ApacheParquetReader

class ParquetReader(
private val reader: ApacheParquetReader<GenericRecord>,
private val tmpFile: File
) : Closeable {
private fun read(): GenericRecord? {
return reader.read()
}

fun recordSequence(): Sequence<GenericRecord> = generateSequence { read() }

override fun close() {
reader.close()
tmpFile.delete()
}
}

fun InputStream.toParquetReader(avroSchema: Schema): ParquetReader {

val tmpFile =
kotlin.io.path.createTempFile(
prefix = "${avroSchema.namespace}.${avroSchema.name}",
suffix = ".avro"
)
tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) }
val reader =
AvroParquetReader.builder<GenericRecord>(
AvroReadSupport(),
Path(tmpFile.toAbsolutePath().toString())
)
.build()

return ParquetReader(reader, tmpFile.toFile())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.parquet

import java.io.Closeable
import java.io.OutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter as ApacheParquetWriter
import org.apache.parquet.io.OutputFile
import org.apache.parquet.io.PositionOutputStream

class ParquetWriter(private val writer: ApacheParquetWriter<GenericRecord>) : Closeable {
fun write(record: GenericRecord) = writer.write(record)
override fun close() = writer.close()
}

fun OutputStream.toParquetWriter(avroSchema: Schema): ParquetWriter {
// Custom OutputFile implementation wrapping the OutputStream
val outputFile =
object : OutputFile {
var position: Long = 0
override fun create(blockSizeHint: Long) =
object : PositionOutputStream() {
override fun write(b: Int) {
position += 1
this@toParquetWriter.write(b)
}
override fun write(bytes: ByteArray, off: Int, len: Int) {
position += len
this@toParquetWriter.write(bytes, off, len)
}
override fun flush() = this@toParquetWriter.flush()
override fun close() = this@toParquetWriter.close()
override fun getPos() = position
}

override fun createOrOverwrite(blockSizeHint: Long) = create(blockSizeHint)
override fun supportsBlockSize() = false
override fun defaultBlockSize() = 0L
}

// Initialize AvroParquetWriter with the custom OutputFile
val writer =
AvroParquetWriter.builder<GenericRecord>(outputFile)
.withSchema(avroSchema)
.withConf(Configuration())
.build()

return ParquetWriter(writer)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
Expand Down Expand Up @@ -51,4 +51,9 @@ data:
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-S3-V2-PARQUET
fileName: s3_dest_v2_parquet_config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@
package io.airbyte.integrations.destination.s3_v2

import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.s3.S3ClientFactory
import io.airbyte.cdk.load.file.s3.S3Object
import io.airbyte.cdk.load.util.write
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.exceptions.ConfigurationException
import jakarta.inject.Singleton
import java.io.OutputStream
import kotlinx.coroutines.flow.toList
Expand All @@ -27,15 +23,6 @@ class S3V2Checker<T : OutputStream>(private val timeProvider: TimeProvider) :

override fun check(config: S3V2Configuration<T>) {
runBlocking {
if (
config.objectStorageFormatConfiguration !is JsonFormatConfiguration &&
config.objectStorageFormatConfiguration !is CSVFormatConfiguration &&
config.objectStorageFormatConfiguration !is AvroFormatConfiguration
) {
throw ConfigurationException(
"Currently only JSON, CSV, and Avro formats are supported"
)
}
val s3Client = S3ClientFactory.make(config)
val pathFactory = ObjectStoragePathFactory.from(config, timeProvider)
val path = pathFactory.getStagingDirectory(mockStream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration
import io.airbyte.cdk.load.data.DestinationRecordToAirbyteValueWithMeta
import io.airbyte.cdk.load.data.avro.toAvroRecord
import io.airbyte.cdk.load.data.avro.toAvroSchema
Expand All @@ -18,6 +19,7 @@ import io.airbyte.cdk.load.data.toCsvRecord
import io.airbyte.cdk.load.data.toJson
import io.airbyte.cdk.load.file.avro.toAvroWriter
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.parquet.toParquetWriter
import io.airbyte.cdk.load.file.s3.S3Client
import io.airbyte.cdk.load.file.s3.S3Object
import io.airbyte.cdk.load.message.Batch
Expand Down Expand Up @@ -55,7 +57,11 @@ class S3V2Writer(
inner class S3V2StreamLoader(override val stream: DestinationStream) : StreamLoader {
private val partNumber = AtomicLong(0L) // TODO: Get from destination state
private val avroSchema =
if (formatConfig.objectStorageFormatConfiguration is AvroFormatConfiguration) {
if (
formatConfig.objectStorageFormatConfiguration.let {
it is AvroFormatConfiguration || it is ParquetFormatConfiguration
}
) {
stream.schemaWithMeta.toAvroSchema(stream.descriptor)
} else {
null
Expand Down Expand Up @@ -98,6 +104,15 @@ class S3V2Writer(
}
}
}
is ParquetFormatConfiguration -> {
outputStream.toParquetWriter(avroSchema!!).use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
}
}
}
else -> throw IllegalStateException("Unsupported format")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object S3V2TestUtils {
const val CSV_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_csv_config.json"
const val CSV_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_csv_gzip_config.json"
const val AVRO_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_avro_config.json"
const val PARQUET_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_parquet_config.json"
fun getConfig(configPath: String): S3V2Specification =
ValidatedJsonUtils.parseOne(
S3V2Specification::class.java,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ class S3V2WriteTestCsvUncompressed : S3V2WriteTest(S3V2TestUtils.CSV_UNCOMPRESSE
class S3V2WriteTestCsvGzip : S3V2WriteTest(S3V2TestUtils.CSV_GZIP_CONFIG_PATH)

class S3V2WriteTestAvro : S3V2WriteTest(S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH)

class S3V2WriteTestParquet : S3V2WriteTest(S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH)
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "AVRO" ],
"default" : "AVRO"
"enum" : [ "Avro" ],
"default" : "Avro"
}
},
"required" : [ "format_type" ]
Expand All @@ -141,8 +141,8 @@
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "PARQUET" ],
"default" : "PARQUET"
"enum" : [ "Parquet" ],
"default" : "Parquet"
}
},
"required" : [ "format_type" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "AVRO" ],
"default" : "AVRO"
"enum" : [ "Avro" ],
"default" : "Avro"
}
},
"required" : [ "format_type" ]
Expand All @@ -141,8 +141,8 @@
"properties" : {
"format_type" : {
"type" : "string",
"enum" : [ "PARQUET" ],
"default" : "PARQUET"
"enum" : [ "Parquet" ],
"default" : "Parquet"
}
},
"required" : [ "format_type" ]
Expand Down

0 comments on commit 7ec23bb

Please sign in to comment.