diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskExceptionHandler.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskExceptionHandler.kt index 4d8d94a71cf2..f2063ae47308 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskExceptionHandler.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskExceptionHandler.kt @@ -92,6 +92,7 @@ T : ScopedTask { log.warn { "Sync task $innerTask was cancelled." } throw e } catch (e: Exception) { + log.error { "Caught exception in sync task $innerTask: $e" } handleSyncFailure(e) } } @@ -127,6 +128,7 @@ T : ScopedTask { log.warn { "Stream task $innerTask was cancelled." } throw e } catch (e: Exception) { + log.error { "Caught exception in sync task $innerTask: $e" } handleStreamFailure(stream, e) } } diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/command/avro/AvroFormatCompressionCodec.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/command/avro/AvroFormatCompressionCodecSpecification.kt similarity index 58% rename from airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/command/avro/AvroFormatCompressionCodec.kt rename to airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/command/avro/AvroFormatCompressionCodecSpecification.kt index 333c417f26dc..4f9eec20e97e 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/command/avro/AvroFormatCompressionCodec.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/command/avro/AvroFormatCompressionCodecSpecification.kt @@ -17,14 +17,17 @@ import org.apache.avro.file.CodecFactory property = "codec" ) @JsonSubTypes( - JsonSubTypes.Type(value = AvroFormatNoCompressionCodec::class, name = "no compression"), - JsonSubTypes.Type(value = AvroFormatDeflateCodec::class, name = "Deflate"), - JsonSubTypes.Type(value = AvroFormatBzip2Codec::class, name = "bzip2"), - JsonSubTypes.Type(value = AvroFormatXzCodec::class, name = "xz"), - JsonSubTypes.Type(value = AvroFormatZstandardCodec::class, name = "zstandard"), - JsonSubTypes.Type(value = AvroFormatSnappyCodec::class, name = "snappy") + JsonSubTypes.Type( + value = AvroFormatNoCompressionCodecSpecification::class, + name = "no compression" + ), + JsonSubTypes.Type(value = AvroFormatDeflateCodecSpecification::class, name = "Deflate"), + JsonSubTypes.Type(value = AvroFormatBzip2CodecSpecification::class, name = "bzip2"), + JsonSubTypes.Type(value = AvroFormatXzCodecSpecification::class, name = "xz"), + JsonSubTypes.Type(value = AvroFormatZstandardCodecSpecification::class, name = "zstandard"), + JsonSubTypes.Type(value = AvroFormatSnappyCodecSpecification::class, name = "snappy") ) -sealed class AvroFormatCompressionCodec( +sealed class AvroFormatCompressionCodecSpecification( @JsonSchemaTitle("Compression Codec") @JsonProperty("codec") open val type: Type ) { enum class Type(@get:JsonValue val typeName: String) { @@ -40,46 +43,47 @@ sealed class AvroFormatCompressionCodec( AvroCompressionConfiguration( compressionCodec = when (this) { - is AvroFormatNoCompressionCodec -> CodecFactory.nullCodec() - is AvroFormatDeflateCodec -> CodecFactory.deflateCodec(compressionLevel) - is AvroFormatBzip2Codec -> CodecFactory.bzip2Codec() - is AvroFormatXzCodec -> CodecFactory.xzCodec(compressionLevel) - is AvroFormatZstandardCodec -> + is AvroFormatNoCompressionCodecSpecification -> CodecFactory.nullCodec() + is AvroFormatDeflateCodecSpecification -> + CodecFactory.deflateCodec(compressionLevel) + is AvroFormatBzip2CodecSpecification -> CodecFactory.bzip2Codec() + is AvroFormatXzCodecSpecification -> CodecFactory.xzCodec(compressionLevel) + is AvroFormatZstandardCodecSpecification -> CodecFactory.zstandardCodec(compressionLevel, includeChecksum) - is AvroFormatSnappyCodec -> CodecFactory.snappyCodec() + is AvroFormatSnappyCodecSpecification -> CodecFactory.snappyCodec() } ) } -data class AvroFormatNoCompressionCodec( +data class AvroFormatNoCompressionCodecSpecification( @JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.NO_COMPRESSION, -) : AvroFormatCompressionCodec(type) +) : AvroFormatCompressionCodecSpecification(type) -data class AvroFormatDeflateCodec( +data class AvroFormatDeflateCodecSpecification( @JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.DEFLATE, @JsonSchemaTitle("Deflate Level") @JsonProperty("compression_level") val compressionLevel: Int = 0 -) : AvroFormatCompressionCodec(type) +) : AvroFormatCompressionCodecSpecification(type) -data class AvroFormatBzip2Codec( +data class AvroFormatBzip2CodecSpecification( @JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.BZIP2, -) : AvroFormatCompressionCodec(type) +) : AvroFormatCompressionCodecSpecification(type) -data class AvroFormatXzCodec( +data class AvroFormatXzCodecSpecification( @JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.XZ, @JsonSchemaTitle("Compression Level") @JsonProperty("compression_level") val compressionLevel: Int = 6 -) : AvroFormatCompressionCodec(type) +) : AvroFormatCompressionCodecSpecification(type) -data class AvroFormatZstandardCodec( +data class AvroFormatZstandardCodecSpecification( @JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.ZSTANDARD, @@ -89,13 +93,13 @@ data class AvroFormatZstandardCodec( @JsonSchemaTitle("Include Checksum") @JsonProperty("include_checksum") val includeChecksum: Boolean = false -) : AvroFormatCompressionCodec(type) +) : AvroFormatCompressionCodecSpecification(type) -data class AvroFormatSnappyCodec( +data class AvroFormatSnappyCodecSpecification( @JsonSchemaTitle("Compression Codec") @JsonProperty("codec") override val type: Type = Type.SNAPPY, -) : AvroFormatCompressionCodec(type) +) : AvroFormatCompressionCodecSpecification(type) data class AvroCompressionConfiguration(val compressionCodec: CodecFactory) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt index 8b66064c829d..d5b3c35ed859 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/command/object_storage/ObjectStorageFormatSpecification.kt @@ -12,8 +12,10 @@ import com.fasterxml.jackson.annotation.JsonValue import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.load.command.avro.AvroCompressionConfiguration import io.airbyte.cdk.load.command.avro.AvroCompressionConfigurationProvider -import io.airbyte.cdk.load.command.avro.AvroFormatCompressionCodec -import io.airbyte.cdk.load.command.avro.AvroFormatNoCompressionCodec +import io.airbyte.cdk.load.command.avro.AvroFormatCompressionCodecSpecification +import io.airbyte.cdk.load.command.avro.AvroFormatNoCompressionCodecSpecification +import io.airbyte.cdk.load.file.parquet.ParquetWriterConfiguration +import io.airbyte.cdk.load.file.parquet.ParquetWriterConfigurationProvider /** * Mix-in to provide file format configuration. @@ -43,7 +45,21 @@ interface ObjectStorageFormatSpecificationProvider { .compressionCodec .toCompressionConfiguration() ) - is ParquetFormatSpecification -> ParquetFormatConfiguration() + is ParquetFormatSpecification -> { + (format as ParquetFormatSpecification).let { + ParquetFormatConfiguration( + parquetWriterConfiguration = + ParquetWriterConfiguration( + compressionCodecName = it.compressionCodec!!.name, + blockSizeMb = it.blockSizeMb!!, + maxPaddingSizeMb = it.maxPaddingSizeMb!!, + pageSizeKb = it.pageSizeKb!!, + dictionaryPageSizeKb = it.dictionaryPageSizeKb!!, + dictionaryEncoding = it.dictionaryEncoding!! + ) + ) + } + } } } @@ -112,14 +128,66 @@ class AvroFormatSpecification( "The compression algorithm used to compress data. Default to no compression." ) @JsonProperty("compression_codec") - val compressionCodec: AvroFormatCompressionCodec = AvroFormatNoCompressionCodec() + val compressionCodec: AvroFormatCompressionCodecSpecification = + AvroFormatNoCompressionCodecSpecification() } /** Parquet */ @JsonSchemaTitle("Parquet: Columnar Storage") class ParquetFormatSpecification( - @JsonProperty("format_type") override val formatType: Type = Type.PARQUET -) : ObjectStorageFormatSpecification(formatType) + @JsonSchemaTitle("Format Type") + @JsonProperty("format_type") + override val formatType: Type = Type.PARQUET +) : ObjectStorageFormatSpecification(formatType) { + enum class ParquetFormatCompressionCodec { + UNCOMPRESSED, + SNAPPY, + GZIP, + LZO, + BROTLI, // TODO: Broken locally in both this and s3 v1; Validate whether it works in prod + LZ4, + ZSTD + } + + @JsonSchemaTitle("Compression Codec") + @JsonPropertyDescription("The compression algorithm used to compress data pages.") + @JsonProperty("compression_codec") + val compressionCodec: ParquetFormatCompressionCodec? = + ParquetFormatCompressionCodec.UNCOMPRESSED + + @JsonSchemaTitle("Block Size (Row Group Size) (MB)") + @JsonPropertyDescription( + "This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. Default: 128 MB." + ) + @JsonProperty("block_size_mb") + val blockSizeMb: Int? = 128 + + @JsonSchemaTitle("Max Padding Size (MB)") + @JsonPropertyDescription( + "Maximum size allowed as padding to align row groups. This is also the minimum size of a row group. Default: 8 MB." + ) + @JsonProperty("max_padding_size_mb") + val maxPaddingSizeMb: Int? = 8 + + @JsonSchemaTitle("Page Size (KB)") + @JsonPropertyDescription( + "The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. Default: 1024 KB." + ) + @JsonProperty("page_size_kb") + val pageSizeKb: Int? = 1024 + + @JsonSchemaTitle("Dictionary Page Size (KB)") + @JsonPropertyDescription( + "There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. Default: 1024 KB." + ) + @JsonProperty("dictionary_page_size_kb") + val dictionaryPageSizeKb: Int? = 1024 + + @JsonSchemaTitle("Dictionary Encoding") + @JsonPropertyDescription("Default: true.") + @JsonProperty("dictionary_encoding") + val dictionaryEncoding: Boolean? = true +} /** Configuration */ interface OutputFormatConfigurationProvider { @@ -141,8 +209,10 @@ data class AvroFormatConfiguration( override val avroCompressionConfiguration: AvroCompressionConfiguration, ) : ObjectStorageFormatConfiguration, AvroCompressionConfigurationProvider -data class ParquetFormatConfiguration(override val extension: String = "parquet") : - ObjectStorageFormatConfiguration +data class ParquetFormatConfiguration( + override val extension: String = "parquet", + override val parquetWriterConfiguration: ParquetWriterConfiguration +) : ObjectStorageFormatConfiguration, ParquetWriterConfigurationProvider interface ObjectStorageFormatConfigurationProvider { val objectStorageFormatConfiguration: ObjectStorageFormatConfiguration diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/build.gradle b/airbyte-cdk/bulk/toolkits/load-parquet/build.gradle index 51fdebd3e384..9fc2bab743e4 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-parquet/build.gradle @@ -3,6 +3,8 @@ dependencies { implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro') + implementation("org.lz4:lz4-java:1.8.0") + runtimeOnly 'com.hadoop.gplcompression:hadoop-lzo:0.4.20' api ('org.apache.hadoop:hadoop-common:3.4.0') { exclude group: 'org.apache.zookeeper' exclude group: 'org.apache.hadoop', module: 'hadoop-yarn-common' diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt index 4bd915ed5a70..a64fd51e137d 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt @@ -11,6 +11,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter as ApacheParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.io.OutputFile import org.apache.parquet.io.PositionOutputStream @@ -19,7 +20,10 @@ class ParquetWriter(private val writer: ApacheParquetWriter) : Cl override fun close() = writer.close() } -fun OutputStream.toParquetWriter(avroSchema: Schema): ParquetWriter { +fun OutputStream.toParquetWriter( + avroSchema: Schema, + config: ParquetWriterConfiguration +): ParquetWriter { // Custom OutputFile implementation wrapping the OutputStream val outputFile = object : OutputFile { @@ -49,7 +53,29 @@ fun OutputStream.toParquetWriter(avroSchema: Schema): ParquetWriter { AvroParquetWriter.builder(outputFile) .withSchema(avroSchema) .withConf(Configuration()) + .withCompressionCodec(config.compressionCodec) + .withRowGroupSize(config.blockSizeMb * 1024 * 1024L) + .withPageSize(config.pageSizeKb * 1024) + .withDictionaryPageSize(config.dictionaryPageSizeKb * 1024) + .withDictionaryEncoding(config.dictionaryEncoding) + .withMaxPaddingSize(config.maxPaddingSizeMb * 1024 * 1024) .build() return ParquetWriter(writer) } + +data class ParquetWriterConfiguration( + val compressionCodecName: String, + val blockSizeMb: Int, + val maxPaddingSizeMb: Int, + val pageSizeKb: Int, + val dictionaryPageSizeKb: Int, + val dictionaryEncoding: Boolean +) { + val compressionCodec + get() = CompressionCodecName.valueOf(compressionCodecName) +} + +interface ParquetWriterConfigurationProvider { + val parquetWriterConfiguration: ParquetWriterConfiguration +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 36df9f9a0552..652f35c16806 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.1.9 + dockerImageTag: 0.1.10 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg @@ -61,4 +61,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2-PARQUET-SNAPPY + fileName: s3_dest_v2_parquet_snappy_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index 9b3293f1c77d..d8799ca3547c 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -111,13 +111,18 @@ class S3V2Writer( } } is ParquetFormatConfiguration -> { - outputStream.toParquetWriter(avroSchema!!).use { writer -> - records.forEach { - writer.write( - recordDecorator.decorate(it).toAvroRecord(avroSchema) - ) + outputStream + .toParquetWriter( + avroSchema!!, + formatConfig.parquetWriterConfiguration + ) + .use { writer -> + records.forEach { + writer.write( + recordDecorator.decorate(it).toAvroRecord(avroSchema) + ) + } } - } } else -> throw IllegalStateException("Unsupported format") } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt index 3338fde3812f..a84b662c03f8 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt @@ -16,6 +16,7 @@ object S3V2TestUtils { const val AVRO_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_avro_config.json" const val AVRO_BZIP2_CONFIG_PATH = "secrets/s3_dest_v2_avro_bzip2_config.json" const val PARQUET_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_parquet_config.json" + const val PARQUET_SNAPPY_CONFIG_PATH = "secrets/s3_dest_v2_parquet_snappy_config.json" fun getConfig(configPath: String): S3V2Specification = ValidatedJsonUtils.parseOne( S3V2Specification::class.java, diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 38d5b60c9e0c..da57e9391676 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -42,4 +42,7 @@ class S3V2WriteTestAvroUncompressed : S3V2WriteTest(S3V2TestUtils.AVRO_UNCOMPRES class S3V2WriteTestAvroBzip2 : S3V2WriteTest(S3V2TestUtils.AVRO_BZIP2_CONFIG_PATH) -class S3V2WriteTestParquet : S3V2WriteTest(S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH) +class S3V2WriteTestParquetUncompressed : + S3V2WriteTest(S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH) + +class S3V2WriteTestParquetSnappy : S3V2WriteTest(S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json index 2f7ba3e116d6..fa1d33f36519 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-cloud.json @@ -237,6 +237,37 @@ "type" : "string", "enum" : [ "Parquet" ], "default" : "Parquet" + }, + "compression_codec" : { + "type" : "string", + "enum" : [ "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "BROTLI", "LZ4", "ZSTD" ], + "description" : "The compression algorithm used to compress data pages.", + "title" : "Compression Codec" + }, + "block_size_mb" : { + "type" : "integer", + "description" : "This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. Default: 128 MB.", + "title" : "Block Size (Row Group Size) (MB)" + }, + "max_padding_size_mb" : { + "type" : "integer", + "description" : "Maximum size allowed as padding to align row groups. This is also the minimum size of a row group. Default: 8 MB.", + "title" : "Max Padding Size (MB)" + }, + "page_size_kb" : { + "type" : "integer", + "description" : "The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. Default: 1024 KB.", + "title" : "Page Size (KB)" + }, + "dictionary_page_size_kb" : { + "type" : "integer", + "description" : "There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. Default: 1024 KB.", + "title" : "Dictionary Page Size (KB)" + }, + "dictionary_encoding" : { + "type" : "boolean", + "description" : "Default: true.", + "title" : "Dictionary Encoding" } }, "required" : [ "format_type" ] diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json index 2f7ba3e116d6..fa1d33f36519 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/resources/expected-spec-oss.json @@ -237,6 +237,37 @@ "type" : "string", "enum" : [ "Parquet" ], "default" : "Parquet" + }, + "compression_codec" : { + "type" : "string", + "enum" : [ "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "BROTLI", "LZ4", "ZSTD" ], + "description" : "The compression algorithm used to compress data pages.", + "title" : "Compression Codec" + }, + "block_size_mb" : { + "type" : "integer", + "description" : "This is the size of a row group being buffered in memory. It limits the memory usage when writing. Larger values will improve the IO when reading, but consume more memory when writing. Default: 128 MB.", + "title" : "Block Size (Row Group Size) (MB)" + }, + "max_padding_size_mb" : { + "type" : "integer", + "description" : "Maximum size allowed as padding to align row groups. This is also the minimum size of a row group. Default: 8 MB.", + "title" : "Max Padding Size (MB)" + }, + "page_size_kb" : { + "type" : "integer", + "description" : "The page size is for compression. A block is composed of pages. A page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. Default: 1024 KB.", + "title" : "Page Size (KB)" + }, + "dictionary_page_size_kb" : { + "type" : "integer", + "description" : "There is one dictionary page per column per row group when dictionary encoding is used. The dictionary page size works like the page size but for dictionary. Default: 1024 KB.", + "title" : "Dictionary Page Size (KB)" + }, + "dictionary_encoding" : { + "type" : "boolean", + "description" : "Default: true.", + "title" : "Dictionary Encoding" } }, "required" : [ "format_type" ]