Skip to content

Commit

Permalink
destination-s3: add file transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Oct 18, 2024
1 parent 36566f7 commit 45dcb2a
Show file tree
Hide file tree
Showing 22 changed files with 621 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BufferDequeue(

// otherwise pull records until we hit the memory limit.
val newSize: Long = (memoryItem.size) + bytesRead.get()
if (newSize <= optimalBytesToRead) {
if (newSize <= optimalBytesToRead || output.isEmpty()) {
memoryItem.size.let { bytesRead.addAndGet(it) }
queue.poll()?.item?.let { output.add(it) }
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.async.model

import com.fasterxml.jackson.annotation.JsonProperty

class AirbyteRecordMessageFile {
constructor(
fileUrl: String? = null,
bytes: Long? = null,
fileRelativePath: String? = null,
modified: Long? = null,
sourceFileUrl: String? = null
) {
this.fileUrl = fileUrl
this.bytes = bytes
this.fileRelativePath = fileRelativePath
this.modified = modified
this.sourceFileUrl = sourceFileUrl
}
constructor() :
this(
fileUrl = null,
bytes = null,
fileRelativePath = null,
modified = null,
sourceFileUrl = null
)

@get:JsonProperty("file_url")
@set:JsonProperty("file_url")
@JsonProperty("file_url")
var fileUrl: String? = null

@get:JsonProperty("bytes")
@set:JsonProperty("bytes")
@JsonProperty("bytes")
var bytes: Long? = null

@get:JsonProperty("file_relative_path")
@set:JsonProperty("file_relative_path")
@JsonProperty("file_relative_path")
var fileRelativePath: String? = null

@get:JsonProperty("modified")
@set:JsonProperty("modified")
@JsonProperty("modified")
var modified: Long? = null

@get:JsonProperty("source_file_url")
@set:JsonProperty("source_file_url")
@JsonProperty("source_file_url")
var sourceFileUrl: String? = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.cdk.integrations.destination.async.model

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.StreamDescriptor
Expand Down Expand Up @@ -33,14 +32,18 @@ class PartialAirbyteRecordMessage {
@get:JsonProperty("emitted_at")
@set:JsonProperty("emitted_at")
@JsonProperty("emitted_at")
@JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.")
var emittedAt: Long = 0

@get:JsonProperty("meta")
@set:JsonProperty("meta")
@JsonProperty("meta")
var meta: AirbyteRecordMessageMeta? = null

@get:JsonProperty("file")
@set:JsonProperty("file")
@JsonProperty("file")
var file: AirbyteRecordMessageFile? = null

fun withNamespace(namespace: String?): PartialAirbyteRecordMessage {
this.namespace = namespace
return this
Expand All @@ -66,6 +69,11 @@ class PartialAirbyteRecordMessage {
return this
}

fun withFile(file: AirbyteRecordMessageFile): PartialAirbyteRecordMessage {
this.file = file
return this
}

override fun equals(other: Any?): Boolean {
if (this === other) {
return true
Expand All @@ -77,7 +85,8 @@ class PartialAirbyteRecordMessage {
return namespace == that.namespace &&
stream == that.stream &&
emittedAt == that.emittedAt &&
meta == that.meta
meta == that.meta &&
file == that.file
}

override fun hashCode(): Int {
Expand All @@ -98,6 +107,9 @@ class PartialAirbyteRecordMessage {
", meta='" +
meta +
'\'' +
", file='" +
file +
'\'' +
'}'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ class AsyncStreamConsumerTest {
val throwable =
assertThrows(RuntimeException::class.java) { consumer.accept(retyped, retyped.length) }
// Ensure that the offending data has been scrubbed from the error message
assertFalse(throwable.message!!.contains(offender))
assertFalse(
throwable.message!!.contains(offender),
"message should not contain the offender. Was ${throwable.message}"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.airbyte.configoss.WorkerDestinationConfig
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.workers.exception.TestHarnessException
import io.airbyte.workers.helper.ConnectorConfigUpdater
import io.airbyte.workers.internal.AirbyteDestination
import io.airbyte.workers.internal.DefaultAirbyteDestination
Expand Down Expand Up @@ -215,7 +216,11 @@ abstract class BaseDestinationAcceptanceTest(
}
}

destination.close()
try {
destination.close()
} catch (e: TestHarnessException) {
throw TestHarnessException(e.message, e, destinationOutput)
}

return destinationOutput
}
Expand Down Expand Up @@ -258,6 +263,7 @@ abstract class BaseDestinationAcceptanceTest(
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource,
"host",
getConnectorEnv()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest {
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource = null,
"host",
envMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.commons.features

import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.util.function.Function

private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -46,6 +47,12 @@ class EnvVariableFeatureFlags : FeatureFlags {
return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg }
}

override fun airbyteStagingDirectory(): Path? {
return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, null) { arg: String ->
Path.of(arg)
}
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
fun <T> getEnvOrDefault(key: String?, defaultValue: T, parser: Function<String, T>): T {
val value = System.getenv(key)
Expand Down Expand Up @@ -73,5 +80,7 @@ class EnvVariableFeatureFlags : FeatureFlags {
const val STRICT_COMPARISON_NORMALIZATION_TAG: String =
"STRICT_COMPARISON_NORMALIZATION_TAG"
const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE"
val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files")
const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.commons.features

import java.nio.file.Path

/**
* Interface that describe which features are activated in airbyte. Currently, the only
* implementation relies on env. Ideally it should be on some DB.
Expand Down Expand Up @@ -51,4 +53,6 @@ interface FeatureFlags {
* @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode.
*/
fun deploymentMode(): String?

fun airbyteStagingDirectory(): Path?
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.commons.features

import java.nio.file.Path

open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags {
override fun autoDetectSchema(): Boolean {
return wrapped.autoDetectSchema()
Expand Down Expand Up @@ -36,6 +38,10 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags
return wrapped.deploymentMode()
}

override fun airbyteStagingDirectory(): Path? {
return wrapped.airbyteStagingDirectory()
}

companion object {
/** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */
@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,23 @@
*/
package io.airbyte.workers.exception

import io.airbyte.protocol.models.v0.AirbyteMessage

class TestHarnessException : Exception {
constructor(message: String?) : super(message)
val outputMessages: List<AirbyteMessage>?
constructor(message: String?) : super(message) {
outputMessages = null
}

constructor(message: String?, cause: Throwable?) : super(message, cause) {
outputMessages = null
}

constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(
message: String?,
cause: Throwable?,
outputMessages: List<AirbyteMessage>
) : super(message, cause) {
this.outputMessages = outputMessages
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Joiner
import com.google.common.base.Strings
import com.google.common.collect.Lists
import io.airbyte.commons.features.EnvVariableFeatureFlags
import io.airbyte.commons.io.IOs
import io.airbyte.commons.io.LineGobbler
import io.airbyte.commons.map.MoreMaps
Expand All @@ -30,6 +31,7 @@ class DockerProcessFactory(
private val workspaceRoot: Path,
private val workspaceMountSource: String?,
private val localMountSource: String?,
private val fileTransferMountSource: Path?,
private val networkName: String?,
private val envMap: Map<String, String>
) : ProcessFactory {
Expand Down Expand Up @@ -125,6 +127,17 @@ class DockerProcessFactory(
cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION))
}

if (fileTransferMountSource != null) {
cmd.add("-v")
cmd.add(
"$fileTransferMountSource:${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}"
)
cmd.add("-e")
cmd.add(
"${EnvVariableFeatureFlags.AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME}=${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}"
)
}

val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables)
for ((key, value) in allEnvMap) {
cmd.add("-e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ protected constructor(
s3Config,
catalog,
memoryRatio,
nThreads
nThreads,
featureFlags
)
}

Expand Down
Loading

0 comments on commit 45dcb2a

Please sign in to comment.