From 45dcb2a68964ed10bc81dc4877f58f739156a3df Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Fri, 18 Oct 2024 12:41:41 -0700 Subject: [PATCH] destination-s3: add file transfer --- .../async/buffers/BufferDequeue.kt | 2 +- .../async/model/AirbyteRecordMessageFile.kt | 56 +++++++ .../model/PartialAirbyteRecordMessage.kt | 18 ++- .../async/AsyncStreamConsumerTest.kt | 5 +- .../BaseDestinationAcceptanceTest.kt | 8 +- .../source/AbstractSourceConnectorTest.kt | 1 + .../features/EnvVariableFeatureFlags.kt | 9 ++ .../airbyte/commons/features/FeatureFlags.kt | 4 + .../commons/features/FeatureFlagsWrapper.kt | 6 + .../workers/exception/TestHarnessException.kt | 19 ++- .../workers/process/DockerProcessFactory.kt | 13 ++ .../destination/s3/BaseS3Destination.kt | 3 +- .../destination/s3/S3ConsumerFactory.kt | 110 ++++++++++--- .../s3/S3DestinationFlushFunction.kt | 12 +- .../destination/s3/S3StorageOperations.kt | 28 +++- .../s3/S3BaseDestinationAcceptanceTest.kt | 126 +++++++++++++++ .../s3/S3DestinationAcceptanceTest.kt | 77 +++++++++ .../typing_deduping/BaseTypingDedupingTest.kt | 1 + .../connectors/destination-s3/metadata.yaml | 1 + .../s3/S3CsvDestinationAcceptanceTest.kt | 6 + .../s3/S3FileTransferDestinationTest.kt | 153 ++++++++++++++++++ .../S3JsonlGzipDestinationAcceptanceTest.kt | 6 + 22 files changed, 621 insertions(+), 43 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseDestinationAcceptanceTest.kt create mode 100644 airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt index 31f80de484d5..60ab054a2fb6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt @@ -63,7 +63,7 @@ class BufferDequeue( // otherwise pull records until we hit the memory limit. val newSize: Long = (memoryItem.size) + bytesRead.get() - if (newSize <= optimalBytesToRead) { + if (newSize <= optimalBytesToRead || output.isEmpty()) { memoryItem.size.let { bytesRead.addAndGet(it) } queue.poll()?.item?.let { output.add(it) } } else { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt new file mode 100644 index 000000000000..54df03f239d0 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.async.model + +import com.fasterxml.jackson.annotation.JsonProperty + +class AirbyteRecordMessageFile { + constructor( + fileUrl: String? = null, + bytes: Long? = null, + fileRelativePath: String? = null, + modified: Long? = null, + sourceFileUrl: String? = null + ) { + this.fileUrl = fileUrl + this.bytes = bytes + this.fileRelativePath = fileRelativePath + this.modified = modified + this.sourceFileUrl = sourceFileUrl + } + constructor() : + this( + fileUrl = null, + bytes = null, + fileRelativePath = null, + modified = null, + sourceFileUrl = null + ) + + @get:JsonProperty("file_url") + @set:JsonProperty("file_url") + @JsonProperty("file_url") + var fileUrl: String? = null + + @get:JsonProperty("bytes") + @set:JsonProperty("bytes") + @JsonProperty("bytes") + var bytes: Long? = null + + @get:JsonProperty("file_relative_path") + @set:JsonProperty("file_relative_path") + @JsonProperty("file_relative_path") + var fileRelativePath: String? = null + + @get:JsonProperty("modified") + @set:JsonProperty("modified") + @JsonProperty("modified") + var modified: Long? = null + + @get:JsonProperty("source_file_url") + @set:JsonProperty("source_file_url") + @JsonProperty("source_file_url") + var sourceFileUrl: String? = null +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt index fd26f6ad5747..d01c70c15cf6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.async.model import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.annotation.JsonPropertyDescription import com.fasterxml.jackson.databind.JsonNode import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.StreamDescriptor @@ -33,7 +32,6 @@ class PartialAirbyteRecordMessage { @get:JsonProperty("emitted_at") @set:JsonProperty("emitted_at") @JsonProperty("emitted_at") - @JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.") var emittedAt: Long = 0 @get:JsonProperty("meta") @@ -41,6 +39,11 @@ class PartialAirbyteRecordMessage { @JsonProperty("meta") var meta: AirbyteRecordMessageMeta? = null + @get:JsonProperty("file") + @set:JsonProperty("file") + @JsonProperty("file") + var file: AirbyteRecordMessageFile? = null + fun withNamespace(namespace: String?): PartialAirbyteRecordMessage { this.namespace = namespace return this @@ -66,6 +69,11 @@ class PartialAirbyteRecordMessage { return this } + fun withFile(file: AirbyteRecordMessageFile): PartialAirbyteRecordMessage { + this.file = file + return this + } + override fun equals(other: Any?): Boolean { if (this === other) { return true @@ -77,7 +85,8 @@ class PartialAirbyteRecordMessage { return namespace == that.namespace && stream == that.stream && emittedAt == that.emittedAt && - meta == that.meta + meta == that.meta && + file == that.file } override fun hashCode(): Int { @@ -98,6 +107,9 @@ class PartialAirbyteRecordMessage { ", meta='" + meta + '\'' + + ", file='" + + file + + '\'' + '}' } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index e8634f5f8707..bd706681ed5a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -765,6 +765,9 @@ class AsyncStreamConsumerTest { val throwable = assertThrows(RuntimeException::class.java) { consumer.accept(retyped, retyped.length) } // Ensure that the offending data has been scrubbed from the error message - assertFalse(throwable.message!!.contains(offender)) + assertFalse( + throwable.message!!.contains(offender), + "message should not contain the offender. Was ${throwable.message}" + ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt index 6fdd1d4836bf..5be46433d1a8 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt @@ -14,6 +14,7 @@ import io.airbyte.configoss.WorkerDestinationConfig import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateStats import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.workers.exception.TestHarnessException import io.airbyte.workers.helper.ConnectorConfigUpdater import io.airbyte.workers.internal.AirbyteDestination import io.airbyte.workers.internal.DefaultAirbyteDestination @@ -215,7 +216,11 @@ abstract class BaseDestinationAcceptanceTest( } } - destination.close() + try { + destination.close() + } catch (e: TestHarnessException) { + throw TestHarnessException(e.message, e, destinationOutput) + } return destinationOutput } @@ -258,6 +263,7 @@ abstract class BaseDestinationAcceptanceTest( workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource, "host", getConnectorEnv() ) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt index 4761398a496c..a71aca115d2e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", envMap ) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt index 5106ad19f598..8777b9b5f78a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt @@ -4,6 +4,7 @@ package io.airbyte.commons.features import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path import java.util.function.Function private val log = KotlinLogging.logger {} @@ -46,6 +47,12 @@ class EnvVariableFeatureFlags : FeatureFlags { return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg } } + override fun airbyteStagingDirectory(): Path? { + return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, null) { arg: String -> + Path.of(arg) + } + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java fun getEnvOrDefault(key: String?, defaultValue: T, parser: Function): T { val value = System.getenv(key) @@ -73,5 +80,7 @@ class EnvVariableFeatureFlags : FeatureFlags { const val STRICT_COMPARISON_NORMALIZATION_TAG: String = "STRICT_COMPARISON_NORMALIZATION_TAG" const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE" + val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files") + const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY" } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt index a8626b46ec64..8e98639b26a0 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + /** * Interface that describe which features are activated in airbyte. Currently, the only * implementation relies on env. Ideally it should be on some DB. @@ -51,4 +53,6 @@ interface FeatureFlags { * @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode. */ fun deploymentMode(): String? + + fun airbyteStagingDirectory(): Path? } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt index 056c6730332c..b83be21bf2e7 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags { override fun autoDetectSchema(): Boolean { return wrapped.autoDetectSchema() @@ -36,6 +38,10 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags return wrapped.deploymentMode() } + override fun airbyteStagingDirectory(): Path? { + return wrapped.airbyteStagingDirectory() + } + companion object { /** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */ @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt index 15adcfdd2a06..b7f1edd1c0a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt @@ -3,8 +3,23 @@ */ package io.airbyte.workers.exception +import io.airbyte.protocol.models.v0.AirbyteMessage + class TestHarnessException : Exception { - constructor(message: String?) : super(message) + val outputMessages: List? + constructor(message: String?) : super(message) { + outputMessages = null + } + + constructor(message: String?, cause: Throwable?) : super(message, cause) { + outputMessages = null + } - constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor( + message: String?, + cause: Throwable?, + outputMessages: List + ) : super(message, cause) { + this.outputMessages = outputMessages + } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt index e58319655d9d..e1fbc87b37c7 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.base.Joiner import com.google.common.base.Strings import com.google.common.collect.Lists +import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.io.IOs import io.airbyte.commons.io.LineGobbler import io.airbyte.commons.map.MoreMaps @@ -30,6 +31,7 @@ class DockerProcessFactory( private val workspaceRoot: Path, private val workspaceMountSource: String?, private val localMountSource: String?, + private val fileTransferMountSource: Path?, private val networkName: String?, private val envMap: Map ) : ProcessFactory { @@ -125,6 +127,17 @@ class DockerProcessFactory( cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION)) } + if (fileTransferMountSource != null) { + cmd.add("-v") + cmd.add( + "$fileTransferMountSource:${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}" + ) + cmd.add("-e") + cmd.add( + "${EnvVariableFeatureFlags.AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME}=${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}" + ) + } + val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables) for ((key, value) in allEnvMap) { cmd.add("-e") diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt index 358abe22d7c0..888e0d7f8141 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt @@ -78,7 +78,8 @@ protected constructor( s3Config, catalog, memoryRatio, - nThreads + nThreads, + featureFlags ) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt index fdf25cf0c6f7..242dad6d68b0 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt @@ -10,6 +10,8 @@ import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction @@ -21,12 +23,15 @@ import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer import io.airbyte.cdk.integrations.destination.record_buffer.SerializedBufferingStrategy import io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory.Companion.getCreateFunction import io.airbyte.commons.exceptions.ConfigErrorException +import io.airbyte.commons.features.FeatureFlags import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.* import io.github.oshai.kotlinlogging.KotlinLogging +import java.io.File import java.util.concurrent.Executors import java.util.function.Consumer import java.util.function.Function +import java.util.stream.Stream import org.joda.time.DateTime import org.joda.time.DateTimeZone @@ -188,7 +193,8 @@ class S3ConsumerFactory { s3Config: S3DestinationConfig, catalog: ConfiguredAirbyteCatalog, memoryRatio: Double, - nThreads: Int + nThreads: Int, + featureFlags: FeatureFlags ): SerializedAirbyteMessageConsumer { val writeConfigs = createWriteConfigs(storageOps, s3Config, catalog) // Buffer creation function: yields a file buffer that converts @@ -203,15 +209,6 @@ class S3ConsumerFactory { descriptor to Pair(stream.generationId, stream.syncId) } - val createFunction = - getCreateFunction( - s3Config, - Function { fileExtension: String -> - FileBuffer(fileExtension) - }, - useV2FieldNames = true - ) - // Parquet has significantly higher overhead. This small adjustment // results in a ~5x performance improvement. val adjustedMemoryRatio = @@ -221,25 +218,52 @@ class S3ConsumerFactory { memoryRatio } + // This needs to be called before the creation of the flush function because it updates + // writeConfigs! + val onStartFunction = onStartFunction(storageOps, writeConfigs) + + val streamDescriptorToWriteConfig = + writeConfigs.associateBy { + StreamDescriptor().withNamespace(it.namespace).withName(it.streamName) + } + val flushFunction = + if (featureFlags.airbyteStagingDirectory() != null) { + FileTransferDestinationFlushFunction( + streamDescriptorToWriteConfig, + storageOps, + featureFlags + ) + } else { + val createFunction = + getCreateFunction( + s3Config, + Function { fileExtension: String -> + FileBuffer(fileExtension) + }, + useV2FieldNames = true + ) + S3DestinationFlushFunction( + // Ensure the file buffer is always larger than the memory buffer, + // as the file buffer will be flushed at the end of the memory flush. + optimalBatchSizeBytes = + (FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(), + { + // Yield a new BufferingStrategy every time we flush (for thread-safety). + SerializedBufferingStrategy( + createFunction, + catalog, + flushBufferFunction(storageOps, writeConfigs, catalog) + ) + }, + generationAndSyncIds + ) + } + return AsyncStreamConsumer( outputRecordCollector, - onStartFunction(storageOps, writeConfigs), + onStartFunction, onCloseFunction(storageOps, writeConfigs), - S3DestinationFlushFunction( - // Ensure the file buffer is always larger than the memory buffer, - // as the file buffer will be flushed at the end of the memory flush. - optimalBatchSizeBytes = - (FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(), - { - // Yield a new BufferingStrategy every time we flush (for thread-safety). - SerializedBufferingStrategy( - createFunction, - catalog, - flushBufferFunction(storageOps, writeConfigs, catalog) - ) - }, - generationAndSyncIds - ), + flushFunction, catalog, // S3 has no concept of default namespace // In the "namespace from destination case", the namespace @@ -252,6 +276,40 @@ class S3ConsumerFactory { ) } + private class FileTransferDestinationFlushFunction( + val streamDescriptorToWriteConfig: Map, + val storageOps: S3StorageOperations, + val featureFlags: FeatureFlags + ) : DestinationFlushFunction { + override fun flush( + streamDescriptor: StreamDescriptor, + stream: Stream + ) { + val records = stream.toList() + val writeConfig = streamDescriptorToWriteConfig.getValue(streamDescriptor) + if (records.isEmpty()) { + return + } + if (records.size > 1) { + throw RuntimeException( + "the destinationFlushFunction for RAW_FILES should be called with only 1 record" + ) + } + val absolutePath = records[0].record!!.file!!.fileUrl!! + val relativePath = records[0].record!!.file!!.fileRelativePath!! + val fullObjectKey = writeConfig.fullOutputPath + relativePath + val dataFile = File(absolutePath) + storageOps.loadDataIntoBucket( + fullObjectKey = fullObjectKey, + fileName = dataFile.name, + fileContent = dataFile.inputStream(), + generationId = writeConfig.generationId + ) + } + + override val optimalBatchSizeBytes: Long = 1L + } + private fun isAppendSync(writeConfig: WriteConfig): Boolean { // This is an additional safety check, that this really is OVERWRITE // mode, this avoids bad things happening like deleting all objects diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt index cb89081ddd0b..de15e206baac 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt @@ -27,13 +27,16 @@ class S3DestinationFlushFunction( strategyProvider().use { strategy -> for (partialMessage in stream) { val partialRecord = partialMessage.record!! - val data = + if (partialRecord.file != null) { + throw RuntimeException(FILE_RECORD_ERROR_MESSAGE) + } /** * This should always be null, but if something changes upstream to trigger a clone * of the record, then `null` becomes `JsonNull` and `data == null` goes from `true` * to `false` */ - if (partialRecord.data == null || partialRecord.data!!.isNull) { + val data = + if (partialRecord.data == null || partialRecord.data!!.isNull) { Jsons.deserialize(partialMessage.serialized) } else { partialRecord.data @@ -53,4 +56,9 @@ class S3DestinationFlushFunction( strategy.flushSingleStream(nameAndNamespace) } } + + companion object { + val FILE_RECORD_ERROR_MESSAGE = + "received a message of RECORD type with a populated `file` attribute. This should only happen in file transfer mode" + } } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index 894e53789973..f6f6cab368f9 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -22,8 +22,7 @@ import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFact import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil import io.airbyte.commons.exceptions.ConfigErrorException import io.github.oshai.kotlinlogging.KotlinLogging -import java.io.IOException -import java.io.OutputStream +import java.io.* import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap @@ -194,15 +193,32 @@ open class S3StorageOperations( } while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey)) return fullObjectKey } + @Throws(IOException::class) private fun loadDataIntoBucket( objectPath: String, recordsData: SerializableBuffer, generationId: Long + ): String { + val fullObjectKey: String = getFileName(objectPath, recordsData) + return loadDataIntoBucket( + fullObjectKey, + recordsData.filename, + recordsData.inputStream!!, + generationId + ) + } + + @Throws(IOException::class) + public fun loadDataIntoBucket( + fullObjectKey: String, + fileName: String, + fileContent: InputStream, + generationId: Long ): String { val partSize: Long = DEFAULT_PART_SIZE.toLong() val bucket: String? = s3Config.bucketName - val fullObjectKey: String = getFileName(objectPath, recordsData) + val metadata: MutableMap = HashMap() for (blobDecorator: BlobDecorator in blobDecorators) { blobDecorator.updateMetadata(metadata, getMetadataMapping()) @@ -232,13 +248,13 @@ open class S3StorageOperations( try { rawOutputStream.use { outputStream -> - recordsData.inputStream!!.use { dataStream -> + fileContent.use { dataStream -> dataStream.transferTo(outputStream) succeeded = true } } } catch (e: Exception) { - logger.error(e) { "Failed to load data into storage $objectPath" } + logger.error(e) { "Failed to load data into storage $fullObjectKey" } throw RuntimeException(e) } finally { if (!succeeded) { @@ -253,7 +269,7 @@ open class S3StorageOperations( } val newFilename: String = getFilename(fullObjectKey) logger.info { - "Uploaded buffer file to storage: ${recordsData.filename} -> $fullObjectKey (filename: $newFilename)" + "Uploaded buffer file to storage: $fileName -> $fullObjectKey (filename: $newFilename)" } return newFilename } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseDestinationAcceptanceTest.kt new file mode 100644 index 000000000000..947b7195fd10 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseDestinationAcceptanceTest.kt @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.s3 + +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.model.S3ObjectSummary +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer +import io.airbyte.cdk.integrations.standardtest.destination.BaseDestinationAcceptanceTest +import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest +import io.airbyte.commons.io.IOs +import io.airbyte.commons.json.Jsons +import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path +import java.util.Comparator +import java.util.HashSet +import org.apache.commons.lang3.RandomStringUtils +import org.joda.time.DateTime +import org.joda.time.DateTimeZone +import org.mockito.Mockito + +private val LOGGER = KotlinLogging.logger {} + +abstract class S3BaseDestinationAcceptanceTest() : + BaseDestinationAcceptanceTest( + verifyIndividualStateAndCounts = true, + ) { + protected val secretFilePath: String = "secrets/config.json" + override val imageName: String + get() = "airbyte/destination-s3:dev" + protected var configJson: JsonNode? = null + + override fun getConfig(): JsonNode = configJson!! + protected open val baseConfigJson: JsonNode + get() = Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))) + protected abstract val formatConfig: JsonNode? + get + protected var s3DestinationConfig: S3DestinationConfig = Mockito.mock() + protected var s3Client: AmazonS3? = null + protected var s3nameTransformer: NamingConventionTransformer = Mockito.mock() + protected var s3StorageOperations: S3StorageOperations? = null + + fun storageProvider(): StorageProvider { + return StorageProvider.AWS_S3 + } + + /** + * This method does the following: + * * Construct the S3 destination config. + * * Construct the S3 client. + */ + override fun setup( + testEnv: DestinationAcceptanceTest.TestDestinationEnv, + TEST_SCHEMAS: HashSet + ) { + val baseConfigJson = baseConfigJson + // Set a random s3 bucket path for each integration test + val configJson = Jsons.clone(baseConfigJson) + val testBucketPath = + String.format( + "test_%s", + RandomStringUtils.insecure().nextAlphanumeric(5), + ) + (configJson as ObjectNode) + .put("s3_bucket_path", testBucketPath) + .set("format", formatConfig) + this.configJson = configJson + this.s3DestinationConfig = + S3DestinationConfig.getS3DestinationConfig( + configJson, + storageProvider(), + getConnectorEnv() + ) + LOGGER.info { + "${"Test full path: {}/{}"} ${s3DestinationConfig.bucketName} ${s3DestinationConfig.bucketPath}" + } + + this.s3Client = s3DestinationConfig.getS3Client() + this.s3nameTransformer = S3NameTransformer() + this.s3StorageOperations = + S3StorageOperations(s3nameTransformer, s3Client!!, s3DestinationConfig) + } + + fun getDefaultSchema(): String { + if (configJson!!.has("s3_bucket_path")) { + return configJson!!["s3_bucket_path"].asText() + } + throw RuntimeException() + } + + /** Helper method to retrieve all synced objects inside the configured bucket path. */ + protected fun getAllSyncedObjects( + streamName: String, + ): List { + val namespaceStr = s3nameTransformer.getNamespace(getDefaultSchema()) + val streamNameStr = s3nameTransformer.getIdentifier(streamName) + val outputPrefix = + s3StorageOperations!!.getBucketObjectPath( + namespaceStr, + streamNameStr, + DateTime.now(DateTimeZone.UTC), + s3DestinationConfig.pathFormat!!, + ) + // the child folder contains a non-deterministic epoch timestamp, so use the parent folder + val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1) + val objectSummaries = + s3Client!! + .listObjects(s3DestinationConfig.bucketName, parentFolder) + .objectSummaries + .filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") } + .sortedWith(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time }) + + LOGGER.info { + "${"All objects: {}"} ${ + objectSummaries.map { o: S3ObjectSummary -> + String.format("%s/%s", o.bucketName, o.key) + } + }" + } + return objectSummaries + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 8095464a4d62..ff99342ce11c 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -12,11 +12,13 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.collect.ImmutableMap import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataArgumentsProvider import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator +import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.io.IOs import io.airbyte.commons.jackson.MoreMappers import io.airbyte.commons.json.Jsons @@ -27,9 +29,11 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.nio.file.Path import java.time.Instant import java.util.* +import kotlin.test.assertContains import org.apache.commons.lang3.RandomStringUtils import org.joda.time.DateTime import org.joda.time.DateTimeZone +import org.junit.Assert.fail import org.junit.jupiter.api.Assumptions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows @@ -734,6 +738,79 @@ protected constructor( ) } + @Test + open fun testFakeFileTransfer() { + val streamSchema = JsonNodeFactory.instance.objectNode() + streamSchema.set("properties", JsonNodeFactory.instance.objectNode()) + val streamName = "str" + RandomStringUtils.randomAlphanumeric(5) + val catalog = + ConfiguredAirbyteCatalog() + .withStreams( + java.util.List.of( + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) + .withStream( + AirbyteStream().withName(streamName).withJsonSchema(streamSchema) + ), + ), + ) + + val recordMessage = + AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord( + AirbyteRecordMessage() + .withStream(streamName) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(ObjectMapper().readTree("{}")) + .withAdditionalProperty( + "file", + AirbyteRecordMessageFile( + fileUrl = + "${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}/fakeFile", + bytes = 182776, + fileRelativePath = "fakeFile", + modified = 123456L, + sourceFileUrl = + "//sftp-testing-for-file-transfer/sftp-folder/simpsons_locations.csv", + ) + ) + ) + val streamCompleteMessage = + AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + .withStreamDescriptor(StreamDescriptor().withName(streamName)) + ) + ) + try { + val destinationOutput = + runSync( + config = getConfig(), + messages = listOf(recordMessage, streamCompleteMessage), + catalog = catalog, + runNormalization = false, + imageName = imageName, + ) + fail("sync should have failed. Instead got output $destinationOutput") + } catch (e: TestHarnessException) { + assertContains( + e.outputMessages!![0].trace.error.internalMessage, + S3DestinationFlushFunction.FILE_RECORD_ERROR_MESSAGE + ) + } + } + companion object { @JvmStatic protected val MAPPER: ObjectMapper = MoreMappers.initMapper() diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 1c7c08350d01..dcfdda420636 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -1525,6 +1525,7 @@ abstract class BaseTypingDedupingTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", emptyMap() ) diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 727d7c75c210..f7a1f0585845 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -34,6 +34,7 @@ data: ql: 300 supportLevel: certified supportsRefreshes: true + supportsFileTransfer: true connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt index 76ff3e210a9f..007f0fdea538 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvDestinationAcceptanceTest.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.s3 import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion +import org.junit.jupiter.api.Test class S3CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override fun getProtocolVersion(): ProtocolVersion { @@ -14,4 +15,9 @@ class S3CsvDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override val baseConfigJson: JsonNode get() = S3DestinationTestUtils.baseConfigJsonFilePath + + @Test + override fun testAirbyteTimeTypes() { + super.testAirbyteTimeTypes() + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt new file mode 100644 index 000000000000..767da2f42339 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3 + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.s3.S3BaseDestinationAcceptanceTest +import io.airbyte.cdk.integrations.destination.s3.util.Flattening +import io.airbyte.commons.features.EnvVariableFeatureFlags +import io.airbyte.commons.json.Jsons +import io.airbyte.protocol.models.v0.* +import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path +import java.time.Instant +import kotlin.io.path.createDirectories +import kotlin.io.path.createFile +import kotlin.io.path.writeText +import kotlin.random.Random +import kotlin.test.assertEquals +import org.apache.commons.lang3.RandomStringUtils +import org.junit.jupiter.api.Test + +private val LOGGER = KotlinLogging.logger {} + +class S3FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() { + override val supportsFileTransfer = true + override val formatConfig: JsonNode + get() = + Jsons.jsonNode( + java.util.Map.of( + "format_type", + FileUploadFormat.CSV, + "flattening", + Flattening.ROOT_LEVEL.value, + "compression", + Jsons.jsonNode(java.util.Map.of("compression_type", "No Compression")) + ) + ) + + private fun getStreamCompleteMessage(streamName: String): AirbyteMessage { + return AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + .withStreamDescriptor(StreamDescriptor().withName(streamName)) + ) + ) + } + + private fun createFakeFile(): Path { + val depth = Random.nextInt(10) + val dirPath = + (0..depth).joinToString("/") { + "dir" + RandomStringUtils.insecure().nextAlphanumeric(5) + } + val fileName = "fakeFile" + RandomStringUtils.insecure().nextAlphanumeric(5) + val filePath = "$dirPath/$fileName" + val fileSize = 1_024 * 1_024 + LOGGER.info { "SGX creating director $dirPath inside $fileTransferMountSource" } + + fileTransferMountSource!!.resolve(dirPath).createDirectories() + LOGGER.info { "SGX creating file $filePath inside $fileTransferMountSource" } + val absoluteFilePath = + fileTransferMountSource!! + .resolve(filePath) + .createFile() + .writeText(RandomStringUtils.insecure().nextAlphanumeric(fileSize)) + return Path.of(filePath) + } + + private fun configureCatalog(streamName: String): ConfiguredAirbyteCatalog { + val streamSchema = JsonNodeFactory.instance.objectNode() + streamSchema.set("properties", JsonNodeFactory.instance.objectNode()) + return ConfiguredAirbyteCatalog() + .withStreams( + java.util.List.of( + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) + .withStream( + AirbyteStream().withName(streamName).withJsonSchema(streamSchema) + ), + ), + ) + } + + private fun createMessageForFile(streamName: String, relativeFilePath: Path): AirbyteMessage { + val absoluteFilePath = + EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY.resolve(relativeFilePath) + return AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord( + AirbyteRecordMessage() + .withStream(streamName) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(ObjectMapper().readTree("{}")) + .withAdditionalProperty( + "file", + AirbyteRecordMessageFile( + fileUrl = absoluteFilePath.toString(), + bytes = absoluteFilePath.toFile().length(), + fileRelativePath = "$relativeFilePath", + modified = 123456L, + sourceFileUrl = "//sftp-testing-for-file-transfer/$relativeFilePath", + ) + ) + ) + } + + @Test + fun testFakeFileTransfer() { + LOGGER.info { + "${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY} is mounted from $fileTransferMountSource" + } + val streamName = "str" + RandomStringUtils.insecure().nextAlphanumeric(5) + val filePath = createFakeFile() + val catalog = configureCatalog(streamName) + val recordMessage = createMessageForFile(streamName, filePath) + + runSyncAndVerifyStateOutput( + getConfig(), + listOf(recordMessage, getStreamCompleteMessage(streamName)), + catalog, + false + ) + val allObjectsInStore = getAllSyncedObjects(streamName) + /* assertEquals(listOf(filePath.toString()), allObjectsInStore.map { it.key })*/ + val file = fileTransferMountSource!!.resolve(filePath).toFile() + val objectInStore = allObjectsInStore[0] + assertEquals(file.length(), objectInStore.size) + /*assertEquals( + file.readBytes(), + s3Client!! + .getObject(objectInStore.bucketName, objectInStore.key) + .objectContent + .readBytes() + )*/ + LOGGER.info { "SGX getAllSyncedObjects = ${getAllSyncedObjects(streamName)}" } + } +} diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3JsonlGzipDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3JsonlGzipDestinationAcceptanceTest.kt index cca447664d72..85b9fb44500b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3JsonlGzipDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3JsonlGzipDestinationAcceptanceTest.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.destination.s3 import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.S3BaseJsonlGzipDestinationAcceptanceTest import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion +import org.junit.jupiter.api.Test class S3JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAcceptanceTest() { override fun getProtocolVersion(): ProtocolVersion { @@ -14,4 +15,9 @@ class S3JsonlGzipDestinationAcceptanceTest : S3BaseJsonlGzipDestinationAcceptanc override val baseConfigJson: JsonNode get() = S3DestinationTestUtils.baseConfigJsonFilePath + + @Test + override fun testFakeFileTransfer() { + super.testFakeFileTransfer() + } }