From f268be0decab8b07d4251e12e220af1d5d580d7b Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Mon, 8 Apr 2024 11:50:18 -0700 Subject: [PATCH] CDK s3-destinations: fixes for s3 connector compilation (#36868) --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../adaptive/AdaptiveDestinationRunner.kt | 5 ++- .../src/main/resources/version.properties | 2 +- .../staging/GeneralStagingFunctions.kt | 42 +++++++------------ .../destination/DestinationAcceptanceTest.kt | 7 +++- .../argproviders/DataArgumentsProvider.kt | 1 + .../destination/s3/BaseS3Destination.kt | 4 +- .../destination/s3/BlobStorageOperations.kt | 6 +-- .../destination/s3/S3BaseChecks.kt | 1 + .../destination/s3/S3ConsumerFactory.kt | 6 +-- .../s3/S3DestinationConfigFactory.kt | 4 +- .../destination/s3/S3StorageOperations.kt | 12 +++--- .../S3AvroParquetDestinationAcceptanceTest.kt | 1 + .../s3/S3BaseCsvDestinationAcceptanceTest.kt | 15 ++++--- .../S3BaseCsvGzipDestinationAcceptanceTest.kt | 3 +- .../S3BaseJsonlDestinationAcceptanceTest.kt | 16 +++---- ...3BaseJsonlGzipDestinationAcceptanceTest.kt | 3 +- .../s3/S3DestinationAcceptanceTest.kt | 17 ++++---- 18 files changed, 72 insertions(+), 74 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 188915fb5aac..c0f18a3adf31 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.29.8 | 2024-04-08 | [\#36868](https://github.com/airbytehq/airbyte/pull/36868) | Destinations: s3-destinations Compilation fixes for connector | | 0.29.7 | 2024-04-08 | [\#36768](https://github.com/airbytehq/airbyte/pull/36768) | Destinations: Make destination state fetch/commit logic more resilient to errors | | 0.29.6 | 2024-04-05 | [\#36577](https://github.com/airbytehq/airbyte/pull/36577) | Do not send system_error trace message for config exceptions. | | 0.29.5 | 2024-04-05 | [\#36620](https://github.com/airbytehq/airbyte/pull/36620) | Missed changes - open for extension for destination-postgres | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.kt index 6e8e4aadbe95..40c9797bf2af 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/adaptive/AdaptiveDestinationRunner.kt @@ -20,12 +20,13 @@ object AdaptiveDestinationRunner { private const val DEPLOYMENT_MODE_KEY = EnvVariableFeatureFlags.DEPLOYMENT_MODE private const val CLOUD_MODE = "CLOUD" + @JvmStatic fun baseOnEnv(): OssDestinationBuilder { val mode = System.getenv(DEPLOYMENT_MODE_KEY) return OssDestinationBuilder(mode) } - class OssDestinationBuilder(private val deploymentMode: String) { + class OssDestinationBuilder(private val deploymentMode: String?) { fun withOssDestination( ossDestinationSupplier: Supplier ): CloudDestinationBuilder { @@ -34,7 +35,7 @@ object AdaptiveDestinationRunner { } class CloudDestinationBuilder( - private val deploymentMode: String, + private val deploymentMode: String?, private val ossDestinationSupplier: Supplier ) { fun withCloudDestination( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 3c77625b16f4..22e430d83820 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.29.7 +version=0.29.8 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt index e516d50eef01..2440466fe42d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt @@ -38,10 +38,9 @@ object GeneralStagingFunctions { typerDeduper: TyperDeduper ): OnStartFunction { return OnStartFunction { - log.info( - "Preparing raw tables in destination started for {} streams", - writeConfigs.size - ) + log.info { + "Preparing raw tables in destination started for ${writeConfigs.size} streams" + } typerDeduper.prepareSchemasAndRunMigrations() // Create raw tables @@ -53,20 +52,16 @@ object GeneralStagingFunctions { val stageName = stagingOperations.getStageName(schema, dstTableName) val stagingPath = stagingOperations.getStagingPath( - SerialStagingConsumerFactory.Companion.RANDOM_CONNECTION_ID, + RANDOM_CONNECTION_ID, schema, stream, writeConfig.outputTableName, writeConfig.writeDatetime ) - log.info( - "Preparing staging area in destination started for schema {} stream {}: target table: {}, stage: {}", - schema, - stream, - dstTableName, - stagingPath - ) + log.info { + "Preparing staging area in destination started for schema $schema stream $stream: target table: $dstTableName, stage: $stagingPath" + } stagingOperations.createSchemaIfNotExists(database, schema) stagingOperations.createTableIfNotExists(database, schema, dstTableName) @@ -84,16 +79,14 @@ object GeneralStagingFunctions { "Unrecognized sync mode: " + writeConfig.syncMode ) } - log.info( - "Preparing staging area in destination completed for schema {} stream {}", - schema, - stream - ) + log.info { + "Preparing staging area in destination completed for schema $schema stream $stream" + } } typerDeduper.prepareFinalTables() - log.info("Executing finalization of tables.") + log.info { "Executing finalization of tables." } stagingOperations.executeTransaction(database, queryList) } } @@ -167,7 +160,7 @@ object GeneralStagingFunctions { // After moving data from staging area to the target table (airybte_raw) clean up the // staging // area (if user configured) - log.info("Cleaning up destination started for {} streams", writeConfigs.size) + log.info { "Cleaning up destination started for ${writeConfigs.size} streams" } typerDeduper.typeAndDedupe(streamSyncSummaries) for (writeConfig in writeConfigs) { val schemaName = writeConfig.outputSchemaName @@ -182,12 +175,9 @@ object GeneralStagingFunctions { writeConfig.outputTableName, writeConfig.writeDatetime ) - log.info( - "Cleaning stage in destination started for stream {}. schema {}, stage: {}", - writeConfig.streamName, - schemaName, - stagePath - ) + log.info { + "Cleaning stage in destination started for stream ${writeConfig.streamName}. schema $schemaName, stage: $stagePath" + } // TODO: This is another weird manifestation of Redshift vs Snowflake using // either or variables from // stageName/StagingPath. @@ -196,7 +186,7 @@ object GeneralStagingFunctions { } typerDeduper.commitFinalTables() typerDeduper.cleanup() - log.info("Cleaning up destination completed.") + log.info { "Cleaning up destination completed." } } } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index a852a456f78f..fd727e916ebe 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -429,6 +429,7 @@ abstract class DestinationAcceptanceTest { val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog) val messages: List = MoreResources.readResource(messagesFilename) + .trim() .lines() .map { Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) @@ -458,6 +459,7 @@ abstract class DestinationAcceptanceTest { val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog) val messages: List = MoreResources.readResource(messagesFilename) + .trim() .lines() .map { Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) @@ -515,6 +517,7 @@ abstract class DestinationAcceptanceTest { getProtocolVersion() ) ) + .trim() .lines() .map { Jsons.deserialize( @@ -712,6 +715,7 @@ abstract class DestinationAcceptanceTest { getProtocolVersion() ) ) + .trim() .lines() .map { Jsons.deserialize(it, AirbyteMessage::class.java) } .toList() @@ -1406,6 +1410,7 @@ abstract class DestinationAcceptanceTest { getProtocolVersion() ) ) + .trim() .lines() .map { Jsons.deserialize(it, AirbyteMessage::class.java) } val config = getConfig() @@ -2328,7 +2333,7 @@ abstract class DestinationAcceptanceTest { private fun readMessagesFromFile( messagesFilename: String ): List { - return MoreResources.readResource(messagesFilename).lines().map { + return MoreResources.readResource(messagesFilename).trim().lines().map { Jsons.deserialize(it, AirbyteMessage::class.java) } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/argproviders/DataArgumentsProvider.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/argproviders/DataArgumentsProvider.kt index 478f7bc1d6af..62ac460c5d28 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/argproviders/DataArgumentsProvider.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/argproviders/DataArgumentsProvider.kt @@ -43,6 +43,7 @@ class DataArgumentsProvider : ArgumentsProvider { } companion object { + @JvmField val EXCHANGE_RATE_CONFIG: CatalogMessageTestConfigPair = CatalogMessageTestConfigPair("exchange_rate_catalog.json", "exchange_rate_messages.txt") val EDGE_CASE_CONFIG: CatalogMessageTestConfigPair = diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt index 123987f851d6..259839bdadc7 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt @@ -29,7 +29,7 @@ protected constructor( override fun check(config: JsonNode): AirbyteConnectionStatus? { try { val destinationConfig = configFactory.getS3DestinationConfig(config, storageProvider()) - val s3Client = destinationConfig!!.getS3Client() + val s3Client = destinationConfig.getS3Client() S3BaseChecks.testIAMUserHasListObjectPermission(s3Client, destinationConfig.bucketName) S3BaseChecks.testSingleUpload( @@ -64,7 +64,7 @@ protected constructor( return S3ConsumerFactory() .create( outputRecordCollector, - S3StorageOperations(nameTransformer, s3Config!!.getS3Client(), s3Config), + S3StorageOperations(nameTransformer, s3Config.getS3Client(), s3Config), nameTransformer, getCreateFunction( s3Config, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BlobStorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BlobStorageOperations.kt index bcd4f2f474dc..9bea16421537 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BlobStorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BlobStorageOperations.kt @@ -12,7 +12,7 @@ abstract class BlobStorageOperations protected constructor() { protected val blobDecorators: MutableList = ArrayList() abstract fun getBucketObjectPath( - namespace: String, + namespace: String?, streamName: String, writeDatetime: DateTime, customFormat: String @@ -29,7 +29,7 @@ abstract class BlobStorageOperations protected constructor() { @Throws(Exception::class) abstract fun uploadRecordsToBucket( recordsData: SerializableBuffer, - namespace: String, + namespace: String?, objectPath: String ): String? @@ -46,7 +46,7 @@ abstract class BlobStorageOperations protected constructor() { * @param pathFormat formatted string for the path */ abstract fun cleanUpBucketObject( - namespace: String, + namespace: String?, streamName: String, objectPath: String, pathFormat: String diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseChecks.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseChecks.kt index 3723931d9e1b..d81a147784d5 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseChecks.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseChecks.kt @@ -91,6 +91,7 @@ object S3BaseChecks { * * @param endpoint URL string representing an accessible S3 bucket */ + @JvmStatic fun testCustomEndpointSecured(endpoint: String?): Boolean { // if user does not use a custom endpoint, do not fail return if (endpoint == null || endpoint.length == 0) { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt index c12a5ba57f4d..0052c69c26cd 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt @@ -69,7 +69,7 @@ class S3ConsumerFactory { pathFormat ) storageOperations.cleanUpBucketObject( - namespace!!, + namespace, stream, outputBucketPath, pathFormat @@ -124,7 +124,7 @@ class S3ConsumerFactory { writeConfig!!.addStoredFile( storageOperations.uploadRecordsToBucket( writer, - writeConfig.namespace!!, + writeConfig.namespace, writeConfig.fullOutputPath )!! ) @@ -183,7 +183,7 @@ class S3ConsumerFactory { "Undefined destination sync mode" ) val abStream = stream.stream - val namespace = abStream.namespace + val namespace: String? = abStream.namespace val streamName = abStream.name val bucketPath = s3Config.bucketPath val customOutputFormat = java.lang.String.join("/", bucketPath, s3Config.pathFormat) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfigFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfigFactory.kt index 04fa8a292fa3..b063990dd286 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfigFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfigFactory.kt @@ -6,8 +6,8 @@ package io.airbyte.cdk.integrations.destination.s3 import com.fasterxml.jackson.databind.JsonNode import javax.annotation.Nonnull -class S3DestinationConfigFactory { - fun getS3DestinationConfig( +open class S3DestinationConfigFactory { + open fun getS3DestinationConfig( config: JsonNode, @Nonnull storageProvider: StorageProvider ): S3DestinationConfig { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index 875250ec3bd8..56bfde2291f2 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -30,8 +30,6 @@ import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern import org.apache.commons.io.FilenameUtils -import org.apache.commons.lang3.StringUtils -import org.apache.logging.log4j.util.Strings import org.joda.time.DateTime private val logger = KotlinLogging.logger {} @@ -46,13 +44,13 @@ open class S3StorageOperations( private val partCounts: ConcurrentMap = ConcurrentHashMap() override fun getBucketObjectPath( - namespace: String, + namespace: String?, streamName: String, writeDatetime: DateTime, customFormat: String ): String { val namespaceStr: String = - nameTransformer.getNamespace(if (Strings.isNotBlank(namespace)) namespace else "") + nameTransformer.getNamespace(if (!namespace.isNullOrBlank()) namespace else "") val streamNameStr: String = nameTransformer.getIdentifier(streamName) return nameTransformer.applyDefaultCase( customFormat @@ -114,7 +112,7 @@ open class S3StorageOperations( override fun uploadRecordsToBucket( recordsData: SerializableBuffer, - namespace: String, + namespace: String?, objectPath: String ): String { val exceptionsThrown: MutableList = ArrayList() @@ -174,7 +172,7 @@ open class S3StorageOperations( val partId: String = getPartId(objectPath) val fileExtension: String = getExtension(recordsData.filename) val fullObjectKey: String = - if (StringUtils.isNotBlank(s3Config.fileNamePattern)) { + if (!s3Config.fileNamePattern.isNullOrBlank()) { s3FilenameTemplateManager.applyPatternToFilename( S3FilenameTemplateParameterObject.builder() .partId(partId) @@ -291,7 +289,7 @@ open class S3StorageOperations( } override fun cleanUpBucketObject( - namespace: String, + namespace: String?, streamName: String, objectPath: String, pathFormat: String diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt index 4fc931aae890..80b77a392c80 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt @@ -108,6 +108,7 @@ abstract class S3AvroParquetDestinationAcceptanceTest protected constructor(s3Fo @Throws(IOException::class) private fun readMessagesFromFile(messagesFilename: String): List { return MoreResources.readResource(messagesFilename) + .trim() .lines() .map { record -> Jsons.deserialize(record, AirbyteMessage::class.java) } .toList() diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt index 36bfd5312de2..0081b1f2f790 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt @@ -45,12 +45,15 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest( val fieldTypes = getFieldTypes(streamSchema) val jsonRecords: MutableList = LinkedList() - for (objectSummary in objectSummaries!!) { - s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key).use { `object` -> + for (objectSummary in objectSummaries) { + s3Client!!.getObject(objectSummary.bucketName, objectSummary.key).use { `object` -> getReader(`object`).use { `in` -> val records: Iterable = - CSVFormat.DEFAULT.withQuoteMode(QuoteMode.NON_NUMERIC) - .withFirstRecordAsHeader() + CSVFormat.Builder.create() + .setHeader() + .setSkipHeaderRecord(true) + .setQuoteMode(QuoteMode.NON_NUMERIC) + .build() .parse(`in`) StreamSupport.stream(records.spliterator(), false).forEach { r: CSVRecord -> jsonRecords.add(getJsonNode(r.toMap(), fieldTypes)) @@ -87,7 +90,7 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest( input: Map, fieldTypes: Map ): JsonNode { - val json: ObjectNode = S3DestinationAcceptanceTest.Companion.MAPPER.createObjectNode() + val json: ObjectNode = MAPPER.createObjectNode() if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) { return Jsons.deserialize(input[JavaBaseConstants.COLUMN_NAME_DATA]) @@ -100,7 +103,7 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest( ) { continue } - if (value == null || value == "") { + if (value == "") { continue } val type = fieldTypes[key] diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvGzipDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvGzipDestinationAcceptanceTest.kt index c9042ba5a548..f616e30fbb77 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvGzipDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvGzipDestinationAcceptanceTest.kt @@ -11,14 +11,13 @@ import java.io.IOException import java.io.InputStreamReader import java.io.Reader import java.nio.charset.StandardCharsets -import java.util.Map import java.util.zip.GZIPInputStream abstract class S3BaseCsvGzipDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override val formatConfig: JsonNode? get() = // config without compression defaults to GZIP Jsons.jsonNode( - Map.of("format_type", outputFormat, "flattening", Flattening.ROOT_LEVEL.value) + mapOf("format_type" to outputFormat, "flattening" to Flattening.ROOT_LEVEL.value) ) @Throws(IOException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlDestinationAcceptanceTest.kt index f89e54c965de..82de354be8a2 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlDestinationAcceptanceTest.kt @@ -13,7 +13,6 @@ import java.io.IOException import java.io.InputStreamReader import java.nio.charset.StandardCharsets import java.util.* -import java.util.Map import kotlin.collections.List import kotlin.collections.MutableList @@ -22,13 +21,10 @@ abstract class S3BaseJsonlDestinationAcceptanceTest protected constructor() : override val formatConfig: JsonNode? get() = Jsons.jsonNode( - Map.of( - "format_type", - outputFormat, - "flattening", - Flattening.NO.value, - "compression", - Jsons.jsonNode(Map.of("compression_type", "No Compression")) + mapOf( + "format_type" to outputFormat, + "flattening" to Flattening.NO.value, + "compression" to Jsons.jsonNode(mapOf("compression_type" to "No Compression")) ) ) @@ -42,8 +38,8 @@ abstract class S3BaseJsonlDestinationAcceptanceTest protected constructor() : val objectSummaries = getAllSyncedObjects(streamName, namespace) val jsonRecords: MutableList = LinkedList() - for (objectSummary in objectSummaries!!) { - val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key) + for (objectSummary in objectSummaries) { + val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key) getReader(`object`).use { reader -> var line: String? while ((reader.readLine().also { line = it }) != null) { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlGzipDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlGzipDestinationAcceptanceTest.kt index e7f145a486f5..3a48cc532a81 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlGzipDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlGzipDestinationAcceptanceTest.kt @@ -11,13 +11,12 @@ import java.io.BufferedReader import java.io.IOException import java.io.InputStreamReader import java.nio.charset.StandardCharsets -import java.util.Map import java.util.zip.GZIPInputStream abstract class S3BaseJsonlGzipDestinationAcceptanceTest : S3BaseJsonlDestinationAcceptanceTest() { override val formatConfig: JsonNode? get() = // config without compression defaults to GZIP - Jsons.jsonNode(Map.of("format_type", outputFormat, "flattening", Flattening.NO.value)) + Jsons.jsonNode(mapOf("format_type" to outputFormat, "flattening" to Flattening.NO.value)) @Throws(IOException::class) override fun getReader(s3Object: S3Object): BufferedReader { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 0e245a4473e1..a53671d6cc90 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -46,9 +46,12 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta protected var s3nameTransformer: NamingConventionTransformer = mock() protected var s3StorageOperations: S3StorageOperations? = null - protected val baseConfigJson: JsonNode + protected open val baseConfigJson: JsonNode get() = Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))) + override val imageName: String + get() = "airbyte/destination-s3:dev" + override fun getDefaultSchema(config: JsonNode): String? { if (config.has("s3_bucket_path")) { return config["s3_bucket_path"].asText() @@ -79,7 +82,7 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta namespaceStr, streamNameStr, DateTime.now(DateTimeZone.UTC), - s3DestinationConfig.pathFormat!! + s3DestinationConfig.pathFormat!!, ) // the child folder contains a non-deterministic epoch timestamp, so use the parent folder val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1) @@ -96,7 +99,7 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta objectSummaries .stream() .map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) } - .collect(Collectors.toList()) + .collect(Collectors.toList()), ) return objectSummaries } @@ -117,7 +120,7 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta String.format( "%s_test_%s", outputFormat.name.lowercase(), - RandomStringUtils.randomAlphanumeric(5) + RandomStringUtils.randomAlphanumeric(5), ) (configJson as ObjectNode) .put("s3_bucket_path", testBucketPath) @@ -128,7 +131,7 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta LOGGER.info( "Test full path: {}/{}", s3DestinationConfig.bucketName, - s3DestinationConfig.bucketPath + s3DestinationConfig.bucketPath, ) this.s3Client = s3DestinationConfig.getS3Client() @@ -152,11 +155,11 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta LOGGER.info( "Tearing down test bucket path: {}/{}", s3DestinationConfig.bucketName, - s3DestinationConfig.bucketPath + s3DestinationConfig.bucketPath, ) val result = s3Client!!.deleteObjects( - DeleteObjectsRequest(s3DestinationConfig.bucketName).withKeys(keysToDelete) + DeleteObjectsRequest(s3DestinationConfig.bucketName).withKeys(keysToDelete), ) LOGGER.info("Deleted {} file(s).", result.deletedObjects.size) }