Skip to content

Commit

Permalink
Bulk Load CDK: Parquet block config and codecs; S3V2 Usage (#47129)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 21, 2024
1 parent 66b1af7 commit f14a06d
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ T : ScopedTask {
log.warn { "Sync task $innerTask was cancelled." }
throw e
} catch (e: Exception) {
log.error { "Caught exception in sync task $innerTask: $e" }
handleSyncFailure(e)
}
}
Expand Down Expand Up @@ -127,6 +128,7 @@ T : ScopedTask {
log.warn { "Stream task $innerTask was cancelled." }
throw e
} catch (e: Exception) {
log.error { "Caught exception in sync task $innerTask: $e" }
handleStreamFailure(stream, e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import org.apache.avro.file.CodecFactory
property = "codec"
)
@JsonSubTypes(
JsonSubTypes.Type(value = AvroFormatNoCompressionCodec::class, name = "no compression"),
JsonSubTypes.Type(value = AvroFormatDeflateCodec::class, name = "Deflate"),
JsonSubTypes.Type(value = AvroFormatBzip2Codec::class, name = "bzip2"),
JsonSubTypes.Type(value = AvroFormatXzCodec::class, name = "xz"),
JsonSubTypes.Type(value = AvroFormatZstandardCodec::class, name = "zstandard"),
JsonSubTypes.Type(value = AvroFormatSnappyCodec::class, name = "snappy")
JsonSubTypes.Type(
value = AvroFormatNoCompressionCodecSpecification::class,
name = "no compression"
),
JsonSubTypes.Type(value = AvroFormatDeflateCodecSpecification::class, name = "Deflate"),
JsonSubTypes.Type(value = AvroFormatBzip2CodecSpecification::class, name = "bzip2"),
JsonSubTypes.Type(value = AvroFormatXzCodecSpecification::class, name = "xz"),
JsonSubTypes.Type(value = AvroFormatZstandardCodecSpecification::class, name = "zstandard"),
JsonSubTypes.Type(value = AvroFormatSnappyCodecSpecification::class, name = "snappy")
)
sealed class AvroFormatCompressionCodec(
sealed class AvroFormatCompressionCodecSpecification(
@JsonSchemaTitle("Compression Codec") @JsonProperty("codec") open val type: Type
) {
enum class Type(@get:JsonValue val typeName: String) {
Expand All @@ -40,46 +43,47 @@ sealed class AvroFormatCompressionCodec(
AvroCompressionConfiguration(
compressionCodec =
when (this) {
is AvroFormatNoCompressionCodec -> CodecFactory.nullCodec()
is AvroFormatDeflateCodec -> CodecFactory.deflateCodec(compressionLevel)
is AvroFormatBzip2Codec -> CodecFactory.bzip2Codec()
is AvroFormatXzCodec -> CodecFactory.xzCodec(compressionLevel)
is AvroFormatZstandardCodec ->
is AvroFormatNoCompressionCodecSpecification -> CodecFactory.nullCodec()
is AvroFormatDeflateCodecSpecification ->
CodecFactory.deflateCodec(compressionLevel)
is AvroFormatBzip2CodecSpecification -> CodecFactory.bzip2Codec()
is AvroFormatXzCodecSpecification -> CodecFactory.xzCodec(compressionLevel)
is AvroFormatZstandardCodecSpecification ->
CodecFactory.zstandardCodec(compressionLevel, includeChecksum)
is AvroFormatSnappyCodec -> CodecFactory.snappyCodec()
is AvroFormatSnappyCodecSpecification -> CodecFactory.snappyCodec()
}
)
}

data class AvroFormatNoCompressionCodec(
data class AvroFormatNoCompressionCodecSpecification(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.NO_COMPRESSION,
) : AvroFormatCompressionCodec(type)
) : AvroFormatCompressionCodecSpecification(type)

data class AvroFormatDeflateCodec(
data class AvroFormatDeflateCodecSpecification(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.DEFLATE,
@JsonSchemaTitle("Deflate Level")
@JsonProperty("compression_level")
val compressionLevel: Int = 0
) : AvroFormatCompressionCodec(type)
) : AvroFormatCompressionCodecSpecification(type)

data class AvroFormatBzip2Codec(
data class AvroFormatBzip2CodecSpecification(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.BZIP2,
) : AvroFormatCompressionCodec(type)
) : AvroFormatCompressionCodecSpecification(type)

data class AvroFormatXzCodec(
data class AvroFormatXzCodecSpecification(
@JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.XZ,
@JsonSchemaTitle("Compression Level")
@JsonProperty("compression_level")
val compressionLevel: Int = 6
) : AvroFormatCompressionCodec(type)
) : AvroFormatCompressionCodecSpecification(type)

data class AvroFormatZstandardCodec(
data class AvroFormatZstandardCodecSpecification(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.ZSTANDARD,
Expand All @@ -89,13 +93,13 @@ data class AvroFormatZstandardCodec(
@JsonSchemaTitle("Include Checksum")
@JsonProperty("include_checksum")
val includeChecksum: Boolean = false
) : AvroFormatCompressionCodec(type)
) : AvroFormatCompressionCodecSpecification(type)

data class AvroFormatSnappyCodec(
data class AvroFormatSnappyCodecSpecification(
@JsonSchemaTitle("Compression Codec")
@JsonProperty("codec")
override val type: Type = Type.SNAPPY,
) : AvroFormatCompressionCodec(type)
) : AvroFormatCompressionCodecSpecification(type)

data class AvroCompressionConfiguration(val compressionCodec: CodecFactory)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.load.command.avro.AvroCompressionConfiguration
import io.airbyte.cdk.load.command.avro.AvroCompressionConfigurationProvider
import io.airbyte.cdk.load.command.avro.AvroFormatCompressionCodec
import io.airbyte.cdk.load.command.avro.AvroFormatNoCompressionCodec
import io.airbyte.cdk.load.command.avro.AvroFormatCompressionCodecSpecification
import io.airbyte.cdk.load.command.avro.AvroFormatNoCompressionCodecSpecification
import io.airbyte.cdk.load.file.parquet.ParquetWriterConfiguration
import io.airbyte.cdk.load.file.parquet.ParquetWriterConfigurationProvider

/**
* Mix-in to provide file format configuration.
Expand Down Expand Up @@ -43,7 +45,21 @@ interface ObjectStorageFormatSpecificationProvider {
.compressionCodec
.toCompressionConfiguration()
)
is ParquetFormatSpecification -> ParquetFormatConfiguration()
is ParquetFormatSpecification -> {
(format as ParquetFormatSpecification).let {
ParquetFormatConfiguration(
parquetWriterConfiguration =
ParquetWriterConfiguration(
compressionCodecName = it.compressionCodec!!.name,
blockSizeMb = it.blockSizeMb!!,
maxPaddingSizeMb = it.maxPaddingSizeMb!!,
pageSizeKb = it.pageSizeKb!!,
dictionaryPageSizeKb = it.dictionaryPageSizeKb!!,
dictionaryEncoding = it.dictionaryEncoding!!
)
)
}
}
}
}

Expand Down Expand Up @@ -112,14 +128,66 @@ class AvroFormatSpecification(
"The compression algorithm used to compress data. Default to no compression."
)
@JsonProperty("compression_codec")
val compressionCodec: AvroFormatCompressionCodec = AvroFormatNoCompressionCodec()
val compressionCodec: AvroFormatCompressionCodecSpecification =
AvroFormatNoCompressionCodecSpecification()
}

/** Parquet */
@JsonSchemaTitle("Parquet: Columnar Storage")
class ParquetFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.PARQUET
) : ObjectStorageFormatSpecification(formatType)
@JsonSchemaTitle("Format Type")
@JsonProperty("format_type")
override val formatType: Type = Type.PARQUET
) : ObjectStorageFormatSpecification(formatType) {
enum class ParquetFormatCompressionCodec {
UNCOMPRESSED,
SNAPPY,
GZIP,
LZO,
BROTLI, // TODO: Broken locally in both this and s3 v1; Validate whether it works in prod
LZ4,
ZSTD
}

@JsonSchemaTitle("Compression Codec")
@JsonPropertyDescription("The compression algorithm used to compress data pages.")
@JsonProperty("compression_codec")
val compressionCodec: ParquetFormatCompressionCodec? =
ParquetFormatCompressionCodec.UNCOMPRESSED

@JsonSchemaTitle("Block Size (Row Group Size) (MB)")
@JsonPropertyDescription(
"This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. Default: 128 MB."
)
@JsonProperty("block_size_mb")
val blockSizeMb: Int? = 128

@JsonSchemaTitle("Max Padding Size (MB)")
@JsonPropertyDescription(
"Maximum size allowed as padding to align row groups. This is also the minimum size of a row group. Default: 8 MB."
)
@JsonProperty("max_padding_size_mb")
val maxPaddingSizeMb: Int? = 8

@JsonSchemaTitle("Page Size (KB)")
@JsonPropertyDescription(
"The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. Default: 1024 KB."
)
@JsonProperty("page_size_kb")
val pageSizeKb: Int? = 1024

@JsonSchemaTitle("Dictionary Page Size (KB)")
@JsonPropertyDescription(
"There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. Default: 1024 KB."
)
@JsonProperty("dictionary_page_size_kb")
val dictionaryPageSizeKb: Int? = 1024

@JsonSchemaTitle("Dictionary Encoding")
@JsonPropertyDescription("Default: true.")
@JsonProperty("dictionary_encoding")
val dictionaryEncoding: Boolean? = true
}

/** Configuration */
interface OutputFormatConfigurationProvider {
Expand All @@ -141,8 +209,10 @@ data class AvroFormatConfiguration(
override val avroCompressionConfiguration: AvroCompressionConfiguration,
) : ObjectStorageFormatConfiguration, AvroCompressionConfigurationProvider

data class ParquetFormatConfiguration(override val extension: String = "parquet") :
ObjectStorageFormatConfiguration
data class ParquetFormatConfiguration(
override val extension: String = "parquet",
override val parquetWriterConfiguration: ParquetWriterConfiguration
) : ObjectStorageFormatConfiguration, ParquetWriterConfigurationProvider

interface ObjectStorageFormatConfigurationProvider {
val objectStorageFormatConfiguration: ObjectStorageFormatConfiguration
Expand Down
2 changes: 2 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-parquet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro')

implementation("org.lz4:lz4-java:1.8.0")
runtimeOnly 'com.hadoop.gplcompression:hadoop-lzo:0.4.20'
api ('org.apache.hadoop:hadoop-common:3.4.0') {
exclude group: 'org.apache.zookeeper'
exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-common'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter as ApacheParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.OutputFile
import org.apache.parquet.io.PositionOutputStream

Expand All @@ -19,7 +20,10 @@ class ParquetWriter(private val writer: ApacheParquetWriter<GenericRecord>) : Cl
override fun close() = writer.close()
}

fun OutputStream.toParquetWriter(avroSchema: Schema): ParquetWriter {
fun OutputStream.toParquetWriter(
avroSchema: Schema,
config: ParquetWriterConfiguration
): ParquetWriter {
// Custom OutputFile implementation wrapping the OutputStream
val outputFile =
object : OutputFile {
Expand Down Expand Up @@ -49,7 +53,29 @@ fun OutputStream.toParquetWriter(avroSchema: Schema): ParquetWriter {
AvroParquetWriter.builder<GenericRecord>(outputFile)
.withSchema(avroSchema)
.withConf(Configuration())
.withCompressionCodec(config.compressionCodec)
.withRowGroupSize(config.blockSizeMb * 1024 * 1024L)
.withPageSize(config.pageSizeKb * 1024)
.withDictionaryPageSize(config.dictionaryPageSizeKb * 1024)
.withDictionaryEncoding(config.dictionaryEncoding)
.withMaxPaddingSize(config.maxPaddingSizeMb * 1024 * 1024)
.build()

return ParquetWriter(writer)
}

data class ParquetWriterConfiguration(
val compressionCodecName: String,
val blockSizeMb: Int,
val maxPaddingSizeMb: Int,
val pageSizeKb: Int,
val dictionaryPageSizeKb: Int,
val dictionaryEncoding: Boolean
) {
val compressionCodec
get() = CompressionCodecName.valueOf(compressionCodecName)
}

interface ParquetWriterConfigurationProvider {
val parquetWriterConfiguration: ParquetWriterConfiguration
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
icon: s3.svg
Expand Down Expand Up @@ -61,4 +61,9 @@ data:
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-S3-V2-PARQUET-SNAPPY
fileName: s3_dest_v2_parquet_snappy_config.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,18 @@ class S3V2Writer(
}
}
is ParquetFormatConfiguration -> {
outputStream.toParquetWriter(avroSchema!!).use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
outputStream
.toParquetWriter(
avroSchema!!,
formatConfig.parquetWriterConfiguration
)
.use { writer ->
records.forEach {
writer.write(
recordDecorator.decorate(it).toAvroRecord(avroSchema)
)
}
}
}
}
else -> throw IllegalStateException("Unsupported format")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object S3V2TestUtils {
const val AVRO_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_avro_config.json"
const val AVRO_BZIP2_CONFIG_PATH = "secrets/s3_dest_v2_avro_bzip2_config.json"
const val PARQUET_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_parquet_config.json"
const val PARQUET_SNAPPY_CONFIG_PATH = "secrets/s3_dest_v2_parquet_snappy_config.json"
fun getConfig(configPath: String): S3V2Specification =
ValidatedJsonUtils.parseOne(
S3V2Specification::class.java,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ class S3V2WriteTestAvroUncompressed : S3V2WriteTest(S3V2TestUtils.AVRO_UNCOMPRES

class S3V2WriteTestAvroBzip2 : S3V2WriteTest(S3V2TestUtils.AVRO_BZIP2_CONFIG_PATH)

class S3V2WriteTestParquet : S3V2WriteTest(S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH)
class S3V2WriteTestParquetUncompressed :
S3V2WriteTest(S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH)

class S3V2WriteTestParquetSnappy : S3V2WriteTest(S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH)
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,37 @@
"type" : "string",
"enum" : [ "Parquet" ],
"default" : "Parquet"
},
"compression_codec" : {
"type" : "string",
"enum" : [ "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "BROTLI", "LZ4", "ZSTD" ],
"description" : "The compression algorithm used to compress data pages.",
"title" : "Compression Codec"
},
"block_size_mb" : {
"type" : "integer",
"description" : "This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. Default: 128 MB.",
"title" : "Block Size (Row Group Size) (MB)"
},
"max_padding_size_mb" : {
"type" : "integer",
"description" : "Maximum size allowed as padding to align row groups. This is also the minimum size of a row group. Default: 8 MB.",
"title" : "Max Padding Size (MB)"
},
"page_size_kb" : {
"type" : "integer",
"description" : "The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. Default: 1024 KB.",
"title" : "Page Size (KB)"
},
"dictionary_page_size_kb" : {
"type" : "integer",
"description" : "There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. Default: 1024 KB.",
"title" : "Dictionary Page Size (KB)"
},
"dictionary_encoding" : {
"type" : "boolean",
"description" : "Default: true.",
"title" : "Dictionary Encoding"
}
},
"required" : [ "format_type" ]
Expand Down
Loading

0 comments on commit f14a06d

Please sign in to comment.