From e5a89ceb6fda7cc17375baae60cf007684e69e55 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Tue, 15 Oct 2024 09:02:54 -0700 Subject: [PATCH] Bulk Load CDK: S3 client and minimal S3V2 usage (#46897) --- .../cdk/load/file/ObjectStorageClient.kt | 11 ++++ .../airbyte/cdk/load/command/s3/S3Client.kt | 60 +++++++++++++++++++ .../connectors/destination-s3-v2/build.gradle | 3 +- .../destination-s3-v2/metadata.yaml | 2 +- .../src/main/kotlin/S3V2Checker.kt | 6 +- 5 files changed, 77 insertions(+), 5 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/ObjectStorageClient.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3Client.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/ObjectStorageClient.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/ObjectStorageClient.kt new file mode 100644 index 000000000000..88cae47cb351 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/ObjectStorageClient.kt @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file + +import io.airbyte.cdk.load.message.RemoteObject + +interface ObjectStorageClient { + suspend fun list(prefix: String): List +} diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3Client.kt new file mode 100644 index 000000000000..75da3a1d6f8d --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/command/s3/S3Client.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.command.s3 + +import aws.sdk.kotlin.runtime.auth.credentials.StaticCredentialsProvider +import aws.sdk.kotlin.services.s3.model.ListObjectsRequest +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfigurationProvider +import io.airbyte.cdk.load.file.ObjectStorageClient +import io.airbyte.cdk.load.message.RemoteObject +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Singleton + +class S3Object( + override val key: String, +) : RemoteObject() + +@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") +class S3Client( + private val client: aws.sdk.kotlin.services.s3.S3Client, + private val bucketConfig: S3BucketConfiguration +) : ObjectStorageClient { + override suspend fun list(prefix: String): List { + val request = ListObjectsRequest { + bucket = bucketConfig.s3BucketName + this.prefix = prefix + } + return client.listObjects(request).contents?.mapNotNull { + it.key?.let { key -> S3Object(key) } + } + ?: emptyList() + } +} + +@Factory +class S3ClientFactory( + private val keyConfig: AWSAccessKeyConfigurationProvider, + private val bucketConfig: S3BucketConfigurationProvider +) { + + @Singleton + @Secondary + fun make(): S3Client { + val credentials = StaticCredentialsProvider { + accessKeyId = keyConfig.awsAccessKeyConfiguration.accessKeyId + secretAccessKey = keyConfig.awsAccessKeyConfiguration.secretAccessKey + } + + val client = + aws.sdk.kotlin.services.s3.S3Client { + region = bucketConfig.s3BucketConfiguration.s3BucketRegion.name + credentialsProvider = credentials + } + + return S3Client(client, bucketConfig.s3BucketConfiguration) + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle index c77044ab3e47..4e89e3aa0c11 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -11,8 +11,7 @@ airbyteBulkConnector { application { mainClass = 'io.airbyte.integrations.destination.s3_v2.S3V2Destination' - - //applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] + applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] // Uncomment and replace to run locally //applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED'] diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index d35faf5c6764..2167cc86d646 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.1.1 + dockerImageTag: 0.1.2 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt index 7a5d42b6b9ad..2cae8a7d1f38 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Checker.kt @@ -5,11 +5,13 @@ package io.airbyte.integrations.destination.s3_v2 import io.airbyte.cdk.load.check.DestinationChecker +import io.airbyte.cdk.load.command.s3.S3Client import jakarta.inject.Singleton +import kotlinx.coroutines.runBlocking @Singleton -class S3V2Checker : DestinationChecker { +class S3V2Checker(private val s3Client: S3Client) : DestinationChecker { override fun check(config: S3V2Configuration) { - // TODO: validate that the configuration works + runBlocking { s3Client.list("") } } }