Skip to content

Commit

Permalink
Bulk Load CDK: Avro Compression Codec, S3V2Usage (#47128)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 21, 2024
1 parent 6e2271a commit 66b1af7
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ interface SpillToDiskTask : StreamLevel, InternalScope
* TODO: Allow for the record batch size to be supplied per-stream. (Needed?)
*/
class DefaultSpillToDiskTask(
private val tmpFileProvider: SpillFileProvider,
private val spillFileProvider: SpillFileProvider,
private val queue: QueueReader<Reserved<DestinationRecordWrapped>>,
private val flushStrategy: FlushStrategy,
override val stream: DestinationStream,
Expand All @@ -53,7 +53,7 @@ class DefaultSpillToDiskTask(
)

override suspend fun execute() {
val tmpFile = tmpFileProvider.createTempFile()
val tmpFile = spillFileProvider.createTempFile()
val result =
tmpFile.outputStream().use { outputStream ->
queue
Expand Down Expand Up @@ -103,7 +103,7 @@ interface SpillToDiskTaskFactory {

@Singleton
class DefaultSpillToDiskTaskFactory(
private val tmpFileProvider: SpillFileProvider,
private val spillFileProvider: SpillFileProvider,
private val queueSupplier:
MessageQueueSupplier<DestinationStream.Descriptor, Reserved<DestinationRecordWrapped>>,
private val flushStrategy: FlushStrategy,
Expand All @@ -113,7 +113,7 @@ class DefaultSpillToDiskTaskFactory(
stream: DestinationStream
): SpillToDiskTask {
return DefaultSpillToDiskTask(
tmpFileProvider,
spillFileProvider,
queueSupplier.get(stream.descriptor),
flushStrategy,
stream,
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/bulk/toolkits/load-avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')

implementation("org.tukaani:xz:1.9")
api("org.apache.avro:avro:1.11.0")

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.command.avro

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import org.apache.avro.file.CodecFactory

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "codec"
)
@JsonSubTypes(
JsonSubTypes.Type(value = AvroFormatNoCompressionCodec::class, name = "no compression"),
JsonSubTypes.Type(value = AvroFormatDeflateCodec::class, name = "Deflate"),
JsonSubTypes.Type(value = AvroFormatBzip2Codec::class, name = "bzip2"),
JsonSubTypes.Type(value = AvroFormatXzCodec::class, name = "xz"),
JsonSubTypes.Type(value = AvroFormatZstandardCodec::class, name = "zstandard"),
JsonSubTypes.Type(value = AvroFormatSnappyCodec::class, name = "snappy")
)
sealed class AvroFormatCompressionCodec(
@JsonSchemaTitle("Compression Codec") @JsonProperty("codec") open val type: Type
) {
enum class Type(@get:JsonValue val typeName: String) {
NO_COMPRESSION("no compression"),
DEFLATE("Deflate"),
BZIP2("bzip2"),
XZ("xz"),
ZSTANDARD("zstandard"),
SNAPPY("snappy")
}

fun toCompressionConfiguration() =
AvroCompressionConfiguration(
compressionCodec =
when (this) {
is AvroFormatNoCompressionCodec -> CodecFactory.nullCodec()
is AvroFormatDeflateCodec -> CodecFactory.deflateCodec(compressionLevel)
is AvroFormatBzip2Codec -> CodecFactory.bzip2Codec()
is AvroFormatXzCodec -> CodecFactory.xzCodec(compressionLevel)
is AvroFormatZstandardCodec ->
CodecFactory.zstandardCodec(compressionLevel, includeChecksum)
is AvroFormatSnappyCodec -> CodecFactory.snappyCodec()
}
)
}

data class AvroFormatNoCompressionCodec(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.NO_COMPRESSION,
) : AvroFormatCompressionCodec(type)

data class AvroFormatDeflateCodec(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.DEFLATE,
@JsonSchemaTitle("Deflate Level")
@JsonProperty("compression_level")
val compressionLevel: Int = 0
) : AvroFormatCompressionCodec(type)

data class AvroFormatBzip2Codec(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.BZIP2,
) : AvroFormatCompressionCodec(type)

data class AvroFormatXzCodec(
@JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.XZ,
@JsonSchemaTitle("Compression Level")
@JsonProperty("compression_level")
val compressionLevel: Int = 6
) : AvroFormatCompressionCodec(type)

data class AvroFormatZstandardCodec(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.ZSTANDARD,
@JsonSchemaTitle("Compression Level")
@JsonProperty("compression_level")
val compressionLevel: Int = 3,
@JsonSchemaTitle("Include Checksum")
@JsonProperty("include_checksum")
val includeChecksum: Boolean = false
) : AvroFormatCompressionCodec(type)

data class AvroFormatSnappyCodec(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.SNAPPY,
) : AvroFormatCompressionCodec(type)

data class AvroCompressionConfiguration(val compressionCodec: CodecFactory)

interface AvroCompressionConfigurationProvider {
val avroCompressionConfiguration: AvroCompressionConfiguration
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.file.avro

import io.airbyte.cdk.load.command.avro.AvroCompressionConfiguration
import java.io.Closeable
import java.io.OutputStream
import org.apache.avro.Schema
Expand All @@ -27,9 +28,13 @@ class AvroWriter(
}
}

fun OutputStream.toAvroWriter(avroSchema: Schema): AvroWriter {
fun OutputStream.toAvroWriter(
avroSchema: Schema,
config: AvroCompressionConfiguration
): AvroWriter {
val datumWriter = GenericDatumWriter<GenericRecord>(avroSchema)
val dataFileWriter = DataFileWriter(datumWriter)
dataFileWriter.setCodec(config.compressionCodec)
dataFileWriter.create(avroSchema, this)
return AvroWriter(dataFileWriter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.load.command.avro.AvroCompressionConfiguration
import io.airbyte.cdk.load.command.avro.AvroCompressionConfigurationProvider
import io.airbyte.cdk.load.command.avro.AvroFormatCompressionCodec
import io.airbyte.cdk.load.command.avro.AvroFormatNoCompressionCodec

/**
* Mix-in to provide file format configuration.
Expand All @@ -32,7 +36,13 @@ interface ObjectStorageFormatSpecificationProvider {
return when (format) {
is JsonFormatSpecification -> JsonFormatConfiguration()
is CSVFormatSpecification -> CSVFormatConfiguration()
is AvroFormatSpecification -> AvroFormatConfiguration()
is AvroFormatSpecification ->
AvroFormatConfiguration(
avroCompressionConfiguration =
(format as AvroFormatSpecification)
.compressionCodec
.toCompressionConfiguration()
)
is ParquetFormatSpecification -> ParquetFormatConfiguration()
}
}
Expand All @@ -59,7 +69,7 @@ interface ObjectStorageFormatSpecificationProvider {
JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "Parquet")
)
sealed class ObjectStorageFormatSpecification(
@JsonSchemaTitle("Format Type") @JsonProperty("format_type") open val formatType: Type
@JsonSchemaTitle("Format Type") open val formatType: Type
) {
enum class Type(@get:JsonValue val typeName: String) {
JSONL("JSONL"),
Expand All @@ -69,30 +79,49 @@ sealed class ObjectStorageFormatSpecification(
}
}

/** JSONL */
@JsonSchemaTitle("JSON Lines: Newline-delimited JSON")
class JsonFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.JSONL
@JsonSchemaTitle("Format Type")
@JsonProperty("format_type")
override val formatType: Type = Type.JSONL
) : ObjectStorageFormatSpecification(formatType), ObjectStorageCompressionSpecificationProvider {
override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification()
}

/** CSV */
@JsonSchemaTitle("CSV: Comma-Separated Values")
class CSVFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.CSV
@JsonSchemaTitle("Format Type")
@JsonProperty("format_type")
override val formatType: Type = Type.CSV
) : ObjectStorageFormatSpecification(formatType), ObjectStorageCompressionSpecificationProvider {
override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification()
}

/** AVRO */
@JsonSchemaTitle("Avro: Apache Avro")
class AvroFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.AVRO
) : ObjectStorageFormatSpecification(formatType)
@JsonSchemaTitle("Format Type")
@JsonProperty("format_type")
override val formatType: Type = Type.AVRO
) : ObjectStorageFormatSpecification(formatType) {

@JsonSchemaTitle("Compression Codec")
@JsonPropertyDescription(
"The compression algorithm used to compress data. Default to no compression."
)
@JsonProperty("compression_codec")
val compressionCodec: AvroFormatCompressionCodec = AvroFormatNoCompressionCodec()
}

/** Parquet */
@JsonSchemaTitle("Parquet: Columnar Storage")
class ParquetFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.PARQUET
) : ObjectStorageFormatSpecification(formatType)

/** Configuration */
interface OutputFormatConfigurationProvider {
val outputFormat: ObjectStorageFormatConfiguration
}
Expand All @@ -107,8 +136,10 @@ data class JsonFormatConfiguration(override val extension: String = "jsonl") :
data class CSVFormatConfiguration(override val extension: String = "csv") :
ObjectStorageFormatConfiguration

data class AvroFormatConfiguration(override val extension: String = "avro") :
ObjectStorageFormatConfiguration
data class AvroFormatConfiguration(
override val extension: String = "avro",
override val avroCompressionConfiguration: AvroCompressionConfiguration,
) : ObjectStorageFormatConfiguration, AvroCompressionConfigurationProvider

data class ParquetFormatConfiguration(override val extension: String = "parquet") :
ObjectStorageFormatConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
Expand Down Expand Up @@ -51,6 +51,11 @@ data:
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-S3-V2-AVRO-BZIP2
fileName: s3_dest_v2_avro_bzip2_config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-S3-V2-PARQUET
fileName: s3_dest_v2_parquet_config.json
secretStore:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class S3V2Writer(
private val s3Client: S3Client,
private val pathFactory: ObjectStoragePathFactory,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta,
private val formatConfig: ObjectStorageFormatConfigurationProvider
private val formatConfigProvider: ObjectStorageFormatConfigurationProvider
) : DestinationWriter {
sealed interface S3V2Batch : Batch
data class StagedObject(
Expand All @@ -49,6 +49,8 @@ class S3V2Writer(
val s3Object: S3Object,
) : S3V2Batch

private val formatConfig = formatConfigProvider.objectStorageFormatConfiguration

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return S3V2StreamLoader(stream)
}
Expand All @@ -58,9 +60,8 @@ class S3V2Writer(
private val partNumber = AtomicLong(0L) // TODO: Get from destination state
private val avroSchema =
if (
formatConfig.objectStorageFormatConfiguration.let {
it is AvroFormatConfiguration || it is ParquetFormatConfiguration
}
formatConfig is AvroFormatConfiguration ||
formatConfig is ParquetFormatConfiguration
) {
stream.schemaWithMeta.toAvroSchema(stream.descriptor)
} else {
Expand All @@ -75,7 +76,7 @@ class S3V2Writer(
val key = pathFactory.getPathToFile(stream, partNumber, isStaging = true).toString()
val s3Object =
s3Client.streamingUpload(key) { outputStream ->
when (formatConfig.objectStorageFormatConfiguration) {
when (formatConfig) {
is JsonFormatConfiguration -> {
records.forEach {
val serialized =
Expand All @@ -96,13 +97,18 @@ class S3V2Writer(
}
}
is AvroFormatConfiguration -> {
outputStream.toAvroWriter(avroSchema!!).use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
outputStream
.toAvroWriter(
avroSchema!!,
formatConfig.avroCompressionConfiguration
)
.use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
}
}
}
}
is ParquetFormatConfiguration -> {
outputStream.toParquetWriter(avroSchema!!).use { writer ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ object S3V2TestUtils {
const val CSV_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_csv_config.json"
const val CSV_GZIP_CONFIG_PATH = "secrets/s3_dest_v2_csv_gzip_config.json"
const val AVRO_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_avro_config.json"
const val AVRO_BZIP2_CONFIG_PATH = "secrets/s3_dest_v2_avro_bzip2_config.json"
const val PARQUET_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_parquet_config.json"
fun getConfig(configPath: String): S3V2Specification =
ValidatedJsonUtils.parseOne(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class S3V2WriteTestCsvUncompressed : S3V2WriteTest(S3V2TestUtils.CSV_UNCOMPRESSE

class S3V2WriteTestCsvGzip : S3V2WriteTest(S3V2TestUtils.CSV_GZIP_CONFIG_PATH)

class S3V2WriteTestAvro : S3V2WriteTest(S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH)
class S3V2WriteTestAvroUncompressed : S3V2WriteTest(S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH)

class S3V2WriteTestAvroBzip2 : S3V2WriteTest(S3V2TestUtils.AVRO_BZIP2_CONFIG_PATH)

class S3V2WriteTestParquet : S3V2WriteTest(S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH)
Loading

0 comments on commit 66b1af7

Please sign in to comment.