Skip to content

Commit

Permalink
CDK s3-destinations: fixes for s3 connector compilation (#36868)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa authored Apr 8, 2024
1 parent a9238d9 commit f268be0
Show file tree
Hide file tree
Showing 18 changed files with 72 additions and 74 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.29.8 | 2024-04-08 | [\#36868](https://github.com/airbytehq/airbyte/pull/36868) | Destinations: s3-destinations Compilation fixes for connector |
| 0.29.7 | 2024-04-08 | [\#36768](https://github.com/airbytehq/airbyte/pull/36768) | Destinations: Make destination state fetch/commit logic more resilient to errors |
| 0.29.6 | 2024-04-05 | [\#36577](https://github.com/airbytehq/airbyte/pull/36577) | Do not send system_error trace message for config exceptions. |
| 0.29.5 | 2024-04-05 | [\#36620](https://github.com/airbytehq/airbyte/pull/36620) | Missed changes - open for extension for destination-postgres |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ object AdaptiveDestinationRunner {
private const val DEPLOYMENT_MODE_KEY = EnvVariableFeatureFlags.DEPLOYMENT_MODE
private const val CLOUD_MODE = "CLOUD"

@JvmStatic
fun baseOnEnv(): OssDestinationBuilder {
val mode = System.getenv(DEPLOYMENT_MODE_KEY)
return OssDestinationBuilder(mode)
}

class OssDestinationBuilder(private val deploymentMode: String) {
class OssDestinationBuilder(private val deploymentMode: String?) {
fun <OT : Destination> withOssDestination(
ossDestinationSupplier: Supplier<OT>
): CloudDestinationBuilder<OT> {
Expand All @@ -34,7 +35,7 @@ object AdaptiveDestinationRunner {
}

class CloudDestinationBuilder<OT : Destination>(
private val deploymentMode: String,
private val deploymentMode: String?,
private val ossDestinationSupplier: Supplier<OT>
) {
fun <CT : Destination> withCloudDestination(
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.7
version=0.29.8
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ object GeneralStagingFunctions {
typerDeduper: TyperDeduper
): OnStartFunction {
return OnStartFunction {
log.info(
"Preparing raw tables in destination started for {} streams",
writeConfigs.size
)
log.info {
"Preparing raw tables in destination started for ${writeConfigs.size} streams"
}
typerDeduper.prepareSchemasAndRunMigrations()

// Create raw tables
Expand All @@ -53,20 +52,16 @@ object GeneralStagingFunctions {
val stageName = stagingOperations.getStageName(schema, dstTableName)
val stagingPath =
stagingOperations.getStagingPath(
SerialStagingConsumerFactory.Companion.RANDOM_CONNECTION_ID,
RANDOM_CONNECTION_ID,
schema,
stream,
writeConfig.outputTableName,
writeConfig.writeDatetime
)

log.info(
"Preparing staging area in destination started for schema {} stream {}: target table: {}, stage: {}",
schema,
stream,
dstTableName,
stagingPath
)
log.info {
"Preparing staging area in destination started for schema $schema stream $stream: target table: $dstTableName, stage: $stagingPath"
}

stagingOperations.createSchemaIfNotExists(database, schema)
stagingOperations.createTableIfNotExists(database, schema, dstTableName)
Expand All @@ -84,16 +79,14 @@ object GeneralStagingFunctions {
"Unrecognized sync mode: " + writeConfig.syncMode
)
}
log.info(
"Preparing staging area in destination completed for schema {} stream {}",
schema,
stream
)
log.info {
"Preparing staging area in destination completed for schema $schema stream $stream"
}
}

typerDeduper.prepareFinalTables()

log.info("Executing finalization of tables.")
log.info { "Executing finalization of tables." }
stagingOperations.executeTransaction(database, queryList)
}
}
Expand Down Expand Up @@ -167,7 +160,7 @@ object GeneralStagingFunctions {
// After moving data from staging area to the target table (airybte_raw) clean up the
// staging
// area (if user configured)
log.info("Cleaning up destination started for {} streams", writeConfigs.size)
log.info { "Cleaning up destination started for ${writeConfigs.size} streams" }
typerDeduper.typeAndDedupe(streamSyncSummaries)
for (writeConfig in writeConfigs) {
val schemaName = writeConfig.outputSchemaName
Expand All @@ -182,12 +175,9 @@ object GeneralStagingFunctions {
writeConfig.outputTableName,
writeConfig.writeDatetime
)
log.info(
"Cleaning stage in destination started for stream {}. schema {}, stage: {}",
writeConfig.streamName,
schemaName,
stagePath
)
log.info {
"Cleaning stage in destination started for stream ${writeConfig.streamName}. schema $schemaName, stage: $stagePath"
}
// TODO: This is another weird manifestation of Redshift vs Snowflake using
// either or variables from
// stageName/StagingPath.
Expand All @@ -196,7 +186,7 @@ object GeneralStagingFunctions {
}
typerDeduper.commitFinalTables()
typerDeduper.cleanup()
log.info("Cleaning up destination completed.")
log.info { "Cleaning up destination completed." }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ abstract class DestinationAcceptanceTest {
val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog)
val messages: List<io.airbyte.protocol.models.v0.AirbyteMessage> =
MoreResources.readResource(messagesFilename)
.trim()
.lines()
.map {
Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java)
Expand Down Expand Up @@ -458,6 +459,7 @@ abstract class DestinationAcceptanceTest {
val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog)
val messages: List<io.airbyte.protocol.models.v0.AirbyteMessage> =
MoreResources.readResource(messagesFilename)
.trim()
.lines()
.map {
Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java)
Expand Down Expand Up @@ -515,6 +517,7 @@ abstract class DestinationAcceptanceTest {
getProtocolVersion()
)
)
.trim()
.lines()
.map {
Jsons.deserialize<io.airbyte.protocol.models.v0.AirbyteMessage>(
Expand Down Expand Up @@ -712,6 +715,7 @@ abstract class DestinationAcceptanceTest {
getProtocolVersion()
)
)
.trim()
.lines()
.map { Jsons.deserialize(it, AirbyteMessage::class.java) }
.toList()
Expand Down Expand Up @@ -1406,6 +1410,7 @@ abstract class DestinationAcceptanceTest {
getProtocolVersion()
)
)
.trim()
.lines()
.map { Jsons.deserialize(it, AirbyteMessage::class.java) }
val config = getConfig()
Expand Down Expand Up @@ -2328,7 +2333,7 @@ abstract class DestinationAcceptanceTest {
private fun readMessagesFromFile(
messagesFilename: String
): List<io.airbyte.protocol.models.v0.AirbyteMessage> {
return MoreResources.readResource(messagesFilename).lines().map {
return MoreResources.readResource(messagesFilename).trim().lines().map {
Jsons.deserialize(it, AirbyteMessage::class.java)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class DataArgumentsProvider : ArgumentsProvider {
}

companion object {
@JvmField
val EXCHANGE_RATE_CONFIG: CatalogMessageTestConfigPair =
CatalogMessageTestConfigPair("exchange_rate_catalog.json", "exchange_rate_messages.txt")
val EDGE_CASE_CONFIG: CatalogMessageTestConfigPair =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected constructor(
override fun check(config: JsonNode): AirbyteConnectionStatus? {
try {
val destinationConfig = configFactory.getS3DestinationConfig(config, storageProvider())
val s3Client = destinationConfig!!.getS3Client()
val s3Client = destinationConfig.getS3Client()

S3BaseChecks.testIAMUserHasListObjectPermission(s3Client, destinationConfig.bucketName)
S3BaseChecks.testSingleUpload(
Expand Down Expand Up @@ -64,7 +64,7 @@ protected constructor(
return S3ConsumerFactory()
.create(
outputRecordCollector,
S3StorageOperations(nameTransformer, s3Config!!.getS3Client(), s3Config),
S3StorageOperations(nameTransformer, s3Config.getS3Client(), s3Config),
nameTransformer,
getCreateFunction(
s3Config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ abstract class BlobStorageOperations protected constructor() {
protected val blobDecorators: MutableList<BlobDecorator> = ArrayList()

abstract fun getBucketObjectPath(
namespace: String,
namespace: String?,
streamName: String,
writeDatetime: DateTime,
customFormat: String
Expand All @@ -29,7 +29,7 @@ abstract class BlobStorageOperations protected constructor() {
@Throws(Exception::class)
abstract fun uploadRecordsToBucket(
recordsData: SerializableBuffer,
namespace: String,
namespace: String?,
objectPath: String
): String?

Expand All @@ -46,7 +46,7 @@ abstract class BlobStorageOperations protected constructor() {
* @param pathFormat formatted string for the path
*/
abstract fun cleanUpBucketObject(
namespace: String,
namespace: String?,
streamName: String,
objectPath: String,
pathFormat: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ object S3BaseChecks {
*
* @param endpoint URL string representing an accessible S3 bucket
*/
@JvmStatic
fun testCustomEndpointSecured(endpoint: String?): Boolean {
// if user does not use a custom endpoint, do not fail
return if (endpoint == null || endpoint.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class S3ConsumerFactory {
pathFormat
)
storageOperations.cleanUpBucketObject(
namespace!!,
namespace,
stream,
outputBucketPath,
pathFormat
Expand Down Expand Up @@ -124,7 +124,7 @@ class S3ConsumerFactory {
writeConfig!!.addStoredFile(
storageOperations.uploadRecordsToBucket(
writer,
writeConfig.namespace!!,
writeConfig.namespace,
writeConfig.fullOutputPath
)!!
)
Expand Down Expand Up @@ -183,7 +183,7 @@ class S3ConsumerFactory {
"Undefined destination sync mode"
)
val abStream = stream.stream
val namespace = abStream.namespace
val namespace: String? = abStream.namespace
val streamName = abStream.name
val bucketPath = s3Config.bucketPath
val customOutputFormat = java.lang.String.join("/", bucketPath, s3Config.pathFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package io.airbyte.cdk.integrations.destination.s3
import com.fasterxml.jackson.databind.JsonNode
import javax.annotation.Nonnull

class S3DestinationConfigFactory {
fun getS3DestinationConfig(
open class S3DestinationConfigFactory {
open fun getS3DestinationConfig(
config: JsonNode,
@Nonnull storageProvider: StorageProvider
): S3DestinationConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang3.StringUtils
import org.apache.logging.log4j.util.Strings
import org.joda.time.DateTime

private val logger = KotlinLogging.logger {}
Expand All @@ -46,13 +44,13 @@ open class S3StorageOperations(
private val partCounts: ConcurrentMap<String, AtomicInteger> = ConcurrentHashMap()

override fun getBucketObjectPath(
namespace: String,
namespace: String?,
streamName: String,
writeDatetime: DateTime,
customFormat: String
): String {
val namespaceStr: String =
nameTransformer.getNamespace(if (Strings.isNotBlank(namespace)) namespace else "")
nameTransformer.getNamespace(if (!namespace.isNullOrBlank()) namespace else "")
val streamNameStr: String = nameTransformer.getIdentifier(streamName)
return nameTransformer.applyDefaultCase(
customFormat
Expand Down Expand Up @@ -114,7 +112,7 @@ open class S3StorageOperations(

override fun uploadRecordsToBucket(
recordsData: SerializableBuffer,
namespace: String,
namespace: String?,
objectPath: String
): String {
val exceptionsThrown: MutableList<Exception> = ArrayList()
Expand Down Expand Up @@ -174,7 +172,7 @@ open class S3StorageOperations(
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
val fullObjectKey: String =
if (StringUtils.isNotBlank(s3Config.fileNamePattern)) {
if (!s3Config.fileNamePattern.isNullOrBlank()) {
s3FilenameTemplateManager.applyPatternToFilename(
S3FilenameTemplateParameterObject.builder()
.partId(partId)
Expand Down Expand Up @@ -291,7 +289,7 @@ open class S3StorageOperations(
}

override fun cleanUpBucketObject(
namespace: String,
namespace: String?,
streamName: String,
objectPath: String,
pathFormat: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ abstract class S3AvroParquetDestinationAcceptanceTest protected constructor(s3Fo
@Throws(IOException::class)
private fun readMessagesFromFile(messagesFilename: String): List<AirbyteMessage> {
return MoreResources.readResource(messagesFilename)
.trim()
.lines()
.map { record -> Jsons.deserialize(record, AirbyteMessage::class.java) }
.toList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest(
val fieldTypes = getFieldTypes(streamSchema)
val jsonRecords: MutableList<JsonNode> = LinkedList()

for (objectSummary in objectSummaries!!) {
s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key).use { `object` ->
for (objectSummary in objectSummaries) {
s3Client!!.getObject(objectSummary.bucketName, objectSummary.key).use { `object` ->
getReader(`object`).use { `in` ->
val records: Iterable<CSVRecord> =
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.NON_NUMERIC)
.withFirstRecordAsHeader()
CSVFormat.Builder.create()
.setHeader()
.setSkipHeaderRecord(true)
.setQuoteMode(QuoteMode.NON_NUMERIC)
.build()
.parse(`in`)
StreamSupport.stream(records.spliterator(), false).forEach { r: CSVRecord ->
jsonRecords.add(getJsonNode(r.toMap(), fieldTypes))
Expand Down Expand Up @@ -87,7 +90,7 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest(
input: Map<String, String>,
fieldTypes: Map<String, String>
): JsonNode {
val json: ObjectNode = S3DestinationAcceptanceTest.Companion.MAPPER.createObjectNode()
val json: ObjectNode = MAPPER.createObjectNode()

if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) {
return Jsons.deserialize(input[JavaBaseConstants.COLUMN_NAME_DATA])
Expand All @@ -100,7 +103,7 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest(
) {
continue
}
if (value == null || value == "") {
if (value == "") {
continue
}
val type = fieldTypes[key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import java.io.IOException
import java.io.InputStreamReader
import java.io.Reader
import java.nio.charset.StandardCharsets
import java.util.Map
import java.util.zip.GZIPInputStream

abstract class S3BaseCsvGzipDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
override val formatConfig: JsonNode?
get() = // config without compression defaults to GZIP
Jsons.jsonNode(
Map.of("format_type", outputFormat, "flattening", Flattening.ROOT_LEVEL.value)
mapOf("format_type" to outputFormat, "flattening" to Flattening.ROOT_LEVEL.value)
)

@Throws(IOException::class)
Expand Down
Loading

0 comments on commit f268be0

Please sign in to comment.