From 3187b1f48c03d59cb37c4e63c5bce68700ffd2df Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 27 Jun 2024 07:52:09 -0700 Subject: [PATCH] Destination redshift: Use new interfaces (#39008) --- .../destination-redshift/build.gradle | 2 +- .../destination-redshift/metadata.yaml | 2 +- .../redshift/RedshiftDestination.kt | 332 ++++++++++++------ .../RedshiftStagingStorageOperation.kt | 229 ++++++++++++ .../RedshiftS3StagingSqlOperations.kt | 220 ------------ .../operations/RedshiftSqlOperations.kt | 236 ------------- .../typing_deduping/RedshiftDV2Migration.kt | 41 +++ .../RedshiftDestinationHandler.kt | 68 +++- .../typing_deduping/RedshiftSqlGenerator.kt | 22 +- .../destination/redshift/util/RedshiftUtil.kt | 7 - .../redshift/RedshiftConnectionHandler.kt | 24 -- .../redshift/RedshiftConnectionTest.kt | 5 + ...3StagingInsertDestinationAcceptanceTest.kt | 298 ---------------- .../redshift/RedshiftTestDataComparator.kt | 86 ----- ...shiftS3StagingDestinationAcceptanceTest.kt | 18 - ...edshiftStagingDestinationAcceptanceTest.kt | 19 - ...shRedshiftDestinationBaseAcceptanceTest.kt | 297 ---------------- .../RedshiftSqlGeneratorIntegrationTest.kt | 5 +- .../RedshiftSqlGeneratorTest.kt | 2 +- docs/integrations/destinations/redshift.md | 1 + 20 files changed, 574 insertions(+), 1340 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operation/RedshiftStagingStorageOperation.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.kt create mode 100644 airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDV2Migration.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionHandler.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshKeyRedshiftS3StagingDestinationAcceptanceTest.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.kt delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.kt diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index 251d71c4b9f1..975ce629bfcd 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.35.15' + cdkVersionRequired = '0.35.16' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index 3642ba3fbd11..67233c3c4f0f 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 3.1.0 + dockerImageTag: 3.1.1 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt index 9aadbb493f00..52f803456194 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt @@ -14,6 +14,7 @@ import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.db.jdbc.JdbcSourceOperations import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.BaseConnector import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.emitConfigErrorTrace import io.airbyte.cdk.integrations.base.Destination @@ -24,36 +25,41 @@ import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOve import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer +import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager +import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer -import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage +import io.airbyte.cdk.integrations.destination.async.state.FlushFailure +import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination.Companion.DISABLE_TYPE_DEDUPE +import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination.Companion.RAW_SCHEMA_OVERRIDE import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig.Companion.fromJson +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat import io.airbyte.cdk.integrations.destination.s3.NoEncryption import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.attemptS3WriteAndDelete import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations -import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory.Companion.builder -import io.airbyte.commons.exceptions.ConnectionErrorException +import io.airbyte.cdk.integrations.destination.staging.operation.StagingStreamOperations import io.airbyte.commons.json.Jsons.deserialize -import io.airbyte.commons.json.Jsons.emptyObject import io.airbyte.commons.json.Jsons.jsonNode import io.airbyte.commons.resources.MoreResources.readResource +import io.airbyte.integrations.base.destination.operation.DefaultFlush +import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser -import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler -import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations -import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog -import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper +import io.airbyte.integrations.base.destination.typing_deduping.Sql +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants -import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations +import io.airbyte.integrations.destination.redshift.operation.RedshiftStagingStorageOperation +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDV2Migration import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator @@ -62,24 +68,25 @@ import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSupe import io.airbyte.integrations.destination.redshift.util.RedshiftUtil import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConnectorSpecification +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.sql.SQLException import java.time.Duration +import java.util.Objects import java.util.Optional +import java.util.UUID +import java.util.concurrent.Executors import java.util.function.Consumer import javax.sql.DataSource import org.apache.commons.lang3.NotImplementedException import org.apache.commons.lang3.StringUtils +import org.jetbrains.annotations.VisibleForTesting import org.slf4j.Logger import org.slf4j.LoggerFactory -class RedshiftDestination : - AbstractJdbcDestination( - DRIVER_CLASS, - RedshiftSQLNameTransformer(), - RedshiftSqlOperations() - ), - Destination { +class RedshiftDestination : BaseConnector(), Destination { private fun isEphemeralKeysAndPurgingStagingData( config: JsonNode, encryptionConfig: EncryptionConfig @@ -114,36 +121,126 @@ class RedshiftDestination : "You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt." ) } - attemptS3WriteAndDelete( - S3StorageOperations(RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), - s3Config, - s3Config.bucketPath - ) + attemptS3WriteAndDelete(getS3StorageOperations(s3Config), s3Config, s3Config.bucketPath) - val nameTransformer = namingResolver - val redshiftS3StagingSqlOperations = - RedshiftS3StagingSqlOperations( - nameTransformer, - s3Config.getS3Client(), - s3Config, - encryptionConfig - ) val dataSource = getDataSource(config) try { val database: JdbcDatabase = DefaultJdbcDatabase(dataSource) - val outputSchema = - super.namingResolver.getIdentifier(config[JdbcUtils.SCHEMA_KEY].asText()) - attemptTableOperations( - outputSchema, - database, - nameTransformer, - redshiftS3StagingSqlOperations, - false + val outputSchema = namingResolver.getIdentifier(config[JdbcUtils.SCHEMA_KEY].asText()) + val rawTableSchemaName: String = + if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) { + getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get() + } else { + JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE + } + val finalTableName = + namingResolver.getIdentifier( + "_airbyte_connection_test_" + + UUID.randomUUID().toString().replace("-".toRegex(), "") + ) + + val sqlGenerator = getSqlGenerator(config) + val streamId = + sqlGenerator.buildStreamId(outputSchema, finalTableName, rawTableSchemaName) + val streamConfig = + StreamConfig( + id = streamId, + destinationSyncMode = DestinationSyncMode.APPEND, + primaryKey = listOf(), + cursor = Optional.empty(), + columns = linkedMapOf(), + generationId = 0, + minimumGenerationId = 0, + syncId = 0 + ) + + val databaseName = getDatabaseName(config) + val destinationHandler = + RedshiftDestinationHandler(databaseName, database, rawTableSchemaName) + val storageOperation = + RedshiftStagingStorageOperation( + s3Config, + keepStagingFiles = false, + getS3StorageOperations(s3Config), + sqlGenerator, + destinationHandler, + ) + + // We simulate a mini-sync to see the raw table code path is exercised. and disable T+D + destinationHandler.createNamespaces(setOf(rawTableSchemaName, outputSchema)) + val streamOperation: StagingStreamOperations = + StagingStreamOperations( + storageOperation, + // None of the fields in destination initial status matter + // for a dummy sync with type-dedupe disabled. We only look at these + // when we perform final table related setup operations. + // We just need the streamId to perform the calls in streamOperation. + DestinationInitialStatus( + streamConfig = streamConfig, + isFinalTablePresent = false, + initialRawTableStatus = + InitialRawTableStatus( + rawTableExists = false, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.empty(), + ), + isSchemaMismatch = true, + isFinalTableEmpty = true, + destinationState = + RedshiftState(needsSoftReset = false, isAirbyteMetaPresentInRaw = true), + ), + FileUploadFormat.CSV, + destinationColumns, + disableTypeDedupe = true, + ) + streamOperation.writeRecords( + streamConfig, + listOf( + // Dummy message + PartialAirbyteMessage() + .withSerialized("""{"testKey": "testValue"}""") + .withRecord( + PartialAirbyteRecordMessage() + .withEmittedAt(System.currentTimeMillis()) + .withMeta( + AirbyteRecordMessageMeta(), + ), + ) + ) + .stream() ) - RedshiftUtil.checkSvvTableAccess(database) + streamOperation.finalizeTable( + streamConfig, + StreamSyncSummary(recordsWritten = Optional.of(1)), + ) + + // And now that we have a table, simulate the next sync startup. + destinationHandler.gatherInitialState(listOf(streamConfig)) + // (not bothering to verify the return value, maybe we should?) + + // clean up the raw table, this is intentionally not part of actual sync code + // because we avoid dropping original tables directly. + destinationHandler.execute( + Sql.of( + "DROP TABLE IF EXISTS \"${streamId.rawNamespace}\".\"${streamId.rawName}\";", + ), + ) + return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) - } catch (e: ConnectionErrorException) { - val message = getErrorMessage(e.stateCode, e.errorCode, e.exceptionMessage, e) + } catch (e: SQLException) { + // copied from AbstractJdbcDestination's attemptTableOperations + val stateCode: String = e.sqlState + val errorCode: Int + val exceptionMessage: String? + if (Objects.isNull(e.cause) || e.cause !is SQLException) { + errorCode = e.errorCode + exceptionMessage = e.message + } else { + val cause = e.cause as SQLException + errorCode = cause.errorCode + exceptionMessage = cause.message + } + val message = getErrorMessage(stateCode, errorCode, exceptionMessage, e) emitConfigErrorTrace(e, message) return AirbyteConnectionStatus() .withStatus(AirbyteConnectionStatus.Status.FAILED) @@ -154,9 +251,9 @@ class RedshiftDestination : .withStatus(AirbyteConnectionStatus.Status.FAILED) .withMessage( """ - Could not connect with provided configuration. - ${e.message} - """.trimIndent() + Could not connect with provided configuration. + ${e.message} + """.trimIndent() ) } finally { try { @@ -169,7 +266,8 @@ class RedshiftDestination : override val isV2Destination: Boolean = true - override fun getDataSource(config: JsonNode): DataSource { + @VisibleForTesting + fun getDataSource(config: JsonNode): DataSource { val jdbcConfig: JsonNode = getJdbcConfig(config) return create( jdbcConfig[JdbcUtils.USERNAME_KEY].asText(), @@ -177,12 +275,12 @@ class RedshiftDestination : else null, DRIVER_CLASS, jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(), - getDefaultConnectionProperties(config), + getDefaultConnectionProperties(), Duration.ofMinutes(2) ) } - override fun getDatabase(dataSource: DataSource): JdbcDatabase { + private fun getDatabase(dataSource: DataSource): JdbcDatabase { return DefaultJdbcDatabase(dataSource) } @@ -190,10 +288,10 @@ class RedshiftDestination : return DefaultJdbcDatabase(dataSource, sourceOperations) } - override val namingResolver: NamingConventionTransformer + private val namingResolver: NamingConventionTransformer get() = RedshiftSQLNameTransformer() - override fun getDefaultConnectionProperties(config: JsonNode): Map { + private fun getDefaultConnectionProperties(): Map { // The following properties can be overriden through jdbcUrlParameters in the config. val connectionOptions: MutableMap = HashMap() // Redshift properties @@ -213,16 +311,11 @@ class RedshiftDestination : return connectionOptions } - // this is a no op since we override getDatabase. - override fun toJdbcConfig(config: JsonNode): JsonNode { - return emptyObject() - } - - override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator { + private fun getSqlGenerator(config: JsonNode): RedshiftSqlGenerator { return RedshiftSqlGenerator(namingResolver, config) } - override fun getDestinationHandler( + private fun getDestinationHandler( databaseName: String, database: JdbcDatabase, rawTableSchema: String @@ -230,19 +323,24 @@ class RedshiftDestination : return RedshiftDestinationHandler(databaseName, database, rawTableSchema) } - override fun getMigrations( + private fun getMigrations( database: JdbcDatabase, databaseName: String, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler + sqlGenerator: RedshiftSqlGenerator ): List> { - return listOf>( - RedshiftRawTableAirbyteMetaMigration(database, databaseName) + return listOf( + RedshiftDV2Migration( + namingResolver, + database, + databaseName, + sqlGenerator, + ), + RedshiftRawTableAirbyteMetaMigration(database, databaseName), ) } @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - override fun getDataTransformer( + private fun getDataTransformer( parsedCatalog: ParsedCatalog?, defaultNamespace: String? ): StreamAwareDataTransformer { @@ -267,12 +365,11 @@ class RedshiftDestination : catalog: ConfiguredAirbyteCatalog, outputRecordCollector: Consumer ): SerializedAirbyteMessageConsumer { - val encryptionConfig = - if (config.has(RedshiftDestinationConstants.UPLOADING_METHOD)) - fromJson( - config[RedshiftDestinationConstants.UPLOADING_METHOD][JdbcUtils.ENCRYPTION_KEY] - ) - else NoEncryption() + if (config.has(RedshiftDestinationConstants.UPLOADING_METHOD)) + fromJson( + config[RedshiftDestinationConstants.UPLOADING_METHOD][JdbcUtils.ENCRYPTION_KEY] + ) + else NoEncryption() val s3Options = RedshiftUtil.findS3Options(config) val s3Config: S3DestinationConfig = S3DestinationConfig.getS3DestinationConfig(s3Options) @@ -285,7 +382,6 @@ class RedshiftDestination : val sqlGenerator = RedshiftSqlGenerator(namingResolver, config) val parsedCatalog: ParsedCatalog - val typerDeduper: TyperDeduper val database = getDatabase(getDataSource(config)) val databaseName = config[JdbcUtils.DATABASE_KEY].asText() val catalogParser: CatalogParser @@ -300,56 +396,57 @@ class RedshiftDestination : val redshiftDestinationHandler = RedshiftDestinationHandler(databaseName, database, rawNamespace) parsedCatalog = catalogParser.parseCatalog(catalog) - val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName) - val v2TableMigrator = NoopV2TableMigrator() val disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false) val redshiftMigrations: List> = - getMigrations(database, databaseName, sqlGenerator, redshiftDestinationHandler) - typerDeduper = - if (disableTypeDedupe) { - NoOpTyperDeduperWithV1V2Migrations( - sqlGenerator, - redshiftDestinationHandler, - parsedCatalog, - migrator, - v2TableMigrator, - redshiftMigrations - ) - } else { - DefaultTyperDeduper( - sqlGenerator, - redshiftDestinationHandler, - parsedCatalog, - migrator, - v2TableMigrator, - redshiftMigrations - ) - } + getMigrations(database, databaseName, sqlGenerator) - return builder( - outputRecordCollector, - database, - RedshiftS3StagingSqlOperations( - namingResolver, - s3Config.getS3Client(), - s3Config, - encryptionConfig - ), - namingResolver, - config, - catalog, + val s3StorageOperations = getS3StorageOperations(s3Config) + + val redshiftStagingStorageOperation = + RedshiftStagingStorageOperation( + s3Config, isPurgeStagingData(s3Options), - typerDeduper, + s3StorageOperations, + sqlGenerator, + redshiftDestinationHandler, + ) + val syncOperation = + DefaultSyncOperation( parsedCatalog, + redshiftDestinationHandler, defaultNamespace, - JavaBaseConstants.DestinationColumns.V2_WITH_META + { initialStatus, disableTD -> + StagingStreamOperations( + redshiftStagingStorageOperation, + initialStatus, + FileUploadFormat.CSV, + destinationColumns, + disableTD + ) + }, + redshiftMigrations, + disableTypeDedupe, ) - .setDataTransformer(getDataTransformer(parsedCatalog, defaultNamespace)) - .build() - .createAsync() + return AsyncStreamConsumer( + outputRecordCollector, + onStart = {}, + onClose = { _, streamSyncSummaries -> + syncOperation.finalizeStreams(streamSyncSummaries) + }, + onFlush = DefaultFlush(OPTIMAL_FLUSH_BATCH_SIZE, syncOperation), + catalog, + BufferManager(bufferMemoryLimit), + Optional.ofNullable(defaultNamespace), + FlushFailure(), + Executors.newFixedThreadPool(5), + AirbyteMessageDeserializer(getDataTransformer(parsedCatalog, defaultNamespace)), + ) } + private fun getS3StorageOperations(s3Config: S3DestinationConfig) = + S3StorageOperations(namingResolver, s3Config.getS3Client(), s3Config) + private fun isPurgeStagingData(config: JsonNode?): Boolean { return !config!!.has("purge_staging_data") || config["purge_staging_data"].asBoolean() } @@ -366,6 +463,15 @@ class RedshiftDestination : "com.amazon.redshift.ssl.NonValidatingFactory" ) + private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITH_META + + private const val OPTIMAL_FLUSH_BATCH_SIZE: Long = 50 * 1024 * 1024 + private val bufferMemoryLimit: Long = (Runtime.getRuntime().maxMemory() * 0.5).toLong() + + private fun getDatabaseName(config: JsonNode): String { + return config[JdbcUtils.DATABASE_KEY].asText() + } + private fun sshWrappedDestination(): Destination { return SshWrappedDestination( RedshiftDestination(), diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operation/RedshiftStagingStorageOperation.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operation/RedshiftStagingStorageOperation.kt new file mode 100644 index 000000000000..fd1d80f78ff9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operation/RedshiftStagingStorageOperation.kt @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.redshift.operation + +import com.fasterxml.jackson.databind.ObjectMapper +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig +import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations +import io.airbyte.integrations.base.destination.operation.StorageOperation +import io.airbyte.integrations.base.destination.typing_deduping.Sql +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil +import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer +import io.airbyte.integrations.destination.redshift.manifest.Entry +import io.airbyte.integrations.destination.redshift.manifest.Manifest +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.github.oshai.kotlinlogging.KotlinLogging +import java.time.Instant +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.util.Optional +import java.util.UUID +import java.util.stream.Collectors + +private val log = KotlinLogging.logger {} + +class RedshiftStagingStorageOperation( + private val s3Config: S3DestinationConfig, + private val keepStagingFiles: Boolean, + private val s3StorageOperations: S3StorageOperations, + private val sqlGenerator: RedshiftSqlGenerator, + private val destinationHandler: RedshiftDestinationHandler, +) : StorageOperation { + private val connectionId: UUID = UUID.randomUUID() + private val writeDatetime: ZonedDateTime = Instant.now().atZone(ZoneOffset.UTC) + private val objectMapper = ObjectMapper() + + override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { + // create raw table + destinationHandler.execute(Sql.of(createRawTableQuery(streamId))) + if (destinationSyncMode == DestinationSyncMode.OVERWRITE) { + destinationHandler.execute(Sql.of(truncateRawTableQuery(streamId))) + } + // create bucket for staging files + s3StorageOperations.createBucketIfNotExists() + } + + override fun writeToStage(streamId: StreamId, data: SerializableBuffer) { + val objectPath: String = getStagingPath(streamId) + log.info { + "Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath" + } + val filename = + s3StorageOperations.uploadRecordsToBucket(data, streamId.rawNamespace, objectPath) + + log.info { + "Starting copy to target table from stage: ${streamId.rawName} in destination from stage: $objectPath/$filename." + } + val manifestContents = createManifest(listOf(filename), objectPath) + val manifestPath = putManifest(manifestContents, objectPath) + executeCopy(manifestPath, destinationHandler, streamId.rawNamespace, streamId.rawName) + log.info { + "Copy to target table ${streamId.rawNamespace}.${streamId.rawName} in destination complete." + } + } + + override fun cleanupStage(streamId: StreamId) { + if (keepStagingFiles) return + val stagingRootPath = getStagingPath(streamId) + log.info { "Cleaning up staging path at $stagingRootPath" } + s3StorageOperations.dropBucketObject(stagingRootPath) + } + + override fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) { + destinationHandler.execute(sqlGenerator.createTable(streamConfig, suffix, replace)) + } + + override fun softResetFinalTable(streamConfig: StreamConfig) { + TyperDeduperUtil.executeSoftReset( + sqlGenerator = sqlGenerator, + destinationHandler = destinationHandler, + streamConfig, + ) + } + + override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) { + if (tmpTableSuffix.isNotBlank()) { + log.info { + "Overwriting table ${streamConfig.id.finalTableId(RedshiftSqlGenerator.QUOTE)} with ${ + streamConfig.id.finalTableId( + RedshiftSqlGenerator.QUOTE, + tmpTableSuffix, + ) + }" + } + destinationHandler.execute( + sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix) + ) + } + } + + override fun typeAndDedupe( + streamConfig: StreamConfig, + maxProcessedTimestamp: Optional, + finalTableSuffix: String + ) { + TyperDeduperUtil.executeTypeAndDedupe( + sqlGenerator = sqlGenerator, + destinationHandler = destinationHandler, + streamConfig, + maxProcessedTimestamp, + finalTableSuffix, + ) + } + + private fun getStagingPath(streamId: StreamId): String { + // S3DestinationConfig.getS3DestinationConfig always sets a nonnull bucket path + // TODO mark bucketPath as non-nullable + val prefix = + if (s3Config.bucketPath!!.isEmpty()) "" + else s3Config.bucketPath + (if (s3Config.bucketPath!!.endsWith("/")) "" else "/") + return nameTransformer.applyDefaultCase( + String.format( + "%s%s/%s_%02d_%02d_%02d_%s/", + prefix, + nameTransformer.applyDefaultCase( + // I have no idea why we're doing this. + // streamId.rawName already has been passed through the name transformer. + nameTransformer.convertStreamName(streamId.rawName) + ), + writeDatetime.year, + writeDatetime.monthValue, + writeDatetime.dayOfMonth, + writeDatetime.hour, + connectionId + ) + ) + } + + private fun createManifest(stagedFiles: List, stagingPath: String): String { + if (stagedFiles.isEmpty()) { + throw IllegalArgumentException("Cannot create manifest for empty list of files") + } + + val s3FileEntries = + stagedFiles + .stream() + .map { file: String -> + Entry(getManifestPath(s3Config.bucketName!!, file, stagingPath)) + } + .collect(Collectors.toList()) + val manifest = Manifest(s3FileEntries) + + return objectMapper.writeValueAsString(manifest) + } + + private fun putManifest(manifestContents: String, stagingPath: String): String { + val manifestFilePath = stagingPath + String.format("%s.manifest", UUID.randomUUID()) + s3StorageOperations.uploadManifest(manifestFilePath, manifestContents) + return manifestFilePath + } + + private fun executeCopy( + manifestPath: String, + destinationHandler: RedshiftDestinationHandler, + schemaName: String, + tableName: String, + ) { + val accessKeyId = + s3Config.s3CredentialConfig!!.s3CredentialsProvider.credentials.awsAccessKeyId + val secretAccessKey = + s3Config.s3CredentialConfig!!.s3CredentialsProvider.credentials.awsSecretKey + + val copyQuery = + """ + COPY $schemaName.$tableName FROM '${getFullS3Path(s3Config.bucketName!!, manifestPath)}' + CREDENTIALS 'aws_access_key_id=$accessKeyId;aws_secret_access_key=$secretAccessKey' + CSV GZIP + REGION '${s3Config.bucketRegion}' TIMEFORMAT 'auto' + STATUPDATE OFF + MANIFEST; + """.trimIndent() + + // Disable statement logging. The statement contains a plaintext S3 secret+access key. + destinationHandler.execute(Sql.of(copyQuery), logStatements = false) + } + + companion object { + private val nameTransformer = RedshiftSQLNameTransformer() + + private fun createRawTableQuery(streamId: StreamId): String { + return """ + CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" ( + ${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID} VARCHAR(36), + ${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT} TIMESTAMPTZ DEFAULT GETDATE(), + ${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} TIMESTAMPTZ, + ${JavaBaseConstants.COLUMN_NAME_DATA} SUPER NOT NULL, + ${JavaBaseConstants.COLUMN_NAME_AB_META} SUPER NULL + ) + """.trimIndent() + } + + private fun truncateRawTableQuery(streamId: StreamId): String { + return String.format( + """TRUNCATE TABLE "%s"."%s";""", + streamId.rawNamespace, + streamId.rawName + ) + } + + private fun getFullS3Path(s3BucketName: String, s3StagingFile: String): String { + return java.lang.String.join("/", "s3:/", s3BucketName, s3StagingFile) + } + + private fun getManifestPath( + s3BucketName: String, + s3StagingFile: String, + stagingPath: String, + ): String { + return "s3://$s3BucketName/$stagingPath$s3StagingFile" + } + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.kt deleted file mode 100644 index e1c09ad7c570..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.kt +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift.operations - -import com.amazonaws.services.s3.AmazonS3 -import com.fasterxml.jackson.databind.ObjectMapper -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.integrations.destination.NamingConventionTransformer -import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer -import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption -import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator -import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig -import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig -import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations -import io.airbyte.cdk.integrations.destination.s3.credential.S3AccessKeyCredentialConfig -import io.airbyte.cdk.integrations.destination.staging.StagingOperations -import io.airbyte.commons.lang.Exceptions.toRuntime -import io.airbyte.integrations.destination.redshift.manifest.Entry -import io.airbyte.integrations.destination.redshift.manifest.Manifest -import java.time.Instant -import java.time.ZoneOffset -import java.util.* -import java.util.stream.Collectors -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -@SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") -class RedshiftS3StagingSqlOperations( - private val nameTransformer: NamingConventionTransformer, - s3Client: AmazonS3?, - private val s3Config: S3DestinationConfig, - encryptionConfig: EncryptionConfig -) : RedshiftSqlOperations(), StagingOperations { - private val s3StorageOperations = S3StorageOperations(nameTransformer, s3Client!!, s3Config) - private val objectMapper = ObjectMapper() - private val keyEncryptingKey: ByteArray? - - init { - if (encryptionConfig is AesCbcEnvelopeEncryption) { - s3StorageOperations.addBlobDecorator( - AesCbcEnvelopeEncryptionBlobDecorator(encryptionConfig.key) - ) - this.keyEncryptingKey = encryptionConfig.key - } else { - this.keyEncryptingKey = null - } - } - - override fun getStagingPath( - connectionId: UUID?, - namespace: String?, - streamName: String?, - outputTableName: String?, - writeDatetime: Instant? - ): String? { - val bucketPath = s3Config.bucketPath - val prefix = - if (bucketPath!!.isEmpty()) "" - else bucketPath + (if (bucketPath.endsWith("/")) "" else "/") - val zdt = writeDatetime!!.atZone(ZoneOffset.UTC) - return nameTransformer.applyDefaultCase( - String.format( - "%s%s/%s_%02d_%02d_%02d_%s/", - prefix, - nameTransformer.applyDefaultCase( - nameTransformer.convertStreamName(outputTableName!!) - ), - zdt.year, - zdt.monthValue, - zdt.dayOfMonth, - zdt.hour, - connectionId - ) - ) - } - - override fun getStageName(namespace: String?, streamName: String?): String? { - return "garbage-unused" - } - - @Throws(Exception::class) - override fun createStageIfNotExists(database: JdbcDatabase?, stageName: String?) { - s3StorageOperations.createBucketIfNotExists() - } - - @Throws(Exception::class) - override fun uploadRecordsToStage( - database: JdbcDatabase?, - recordsData: SerializableBuffer?, - schemaName: String?, - stageName: String?, - stagingPath: String? - ): String { - return s3StorageOperations.uploadRecordsToBucket(recordsData!!, schemaName, stagingPath!!) - } - - private fun putManifest(manifestContents: String, stagingPath: String?): String { - val manifestFilePath = stagingPath + String.format("%s.manifest", UUID.randomUUID()) - s3StorageOperations.uploadManifest(manifestFilePath, manifestContents) - return manifestFilePath - } - - @Throws(Exception::class) - override fun copyIntoTableFromStage( - database: JdbcDatabase?, - stageName: String?, - stagingPath: String?, - stagedFiles: List?, - tableName: String?, - schemaName: String? - ) { - LOGGER.info( - "Starting copy to target table from stage: {} in destination from stage: {}, schema: {}, .", - tableName, - stagingPath, - schemaName - ) - val possibleManifest = Optional.ofNullable(createManifest(stagedFiles, stagingPath)) - toRuntime { - possibleManifest - .stream() - .map { manifestContent: String -> putManifest(manifestContent, stagingPath) } - .forEach { manifestPath: String -> - executeCopy(manifestPath, database, schemaName, tableName) - } - } - LOGGER.info("Copy to target table {}.{} in destination complete.", schemaName, tableName) - } - - /** Generates the COPY data from staging files into target table */ - private fun executeCopy( - manifestPath: String, - db: JdbcDatabase?, - schemaName: String?, - tableName: String? - ) { - val credentialConfig = s3Config.s3CredentialConfig as S3AccessKeyCredentialConfig? - val encryptionClause = - if (keyEncryptingKey == null) { - "" - } else { - // TODO This is broken (it's using Snowflake SQL syntax). - // Leaving it here since the rest of the plumbing seems reasonable, - // but we should fix this eventually. - String.format( - " encryption = (type = 'aws_cse' master_key = '%s')", - BASE64_ENCODER.encodeToString(keyEncryptingKey) - ) - } - - val copyQuery = - String.format( - """ - COPY %s.%s FROM '%s' - CREDENTIALS 'aws_access_key_id=%s;aws_secret_access_key=%s' - %s - CSV GZIP - REGION '%s' TIMEFORMAT 'auto' - STATUPDATE OFF - MANIFEST; - """.trimIndent(), - schemaName, - tableName, - getFullS3Path(s3Config.bucketName, manifestPath), - credentialConfig!!.accessKeyId, - credentialConfig.secretAccessKey, - encryptionClause, - s3Config.bucketRegion - ) - - toRuntime { db!!.execute(copyQuery) } - } - - private fun createManifest(stagedFiles: List?, stagingPath: String?): String? { - if (stagedFiles!!.isEmpty()) { - return null - } - - val s3FileEntries = - stagedFiles - .stream() - .map { file: String? -> - Entry(getManifestPath(s3Config.bucketName, file!!, stagingPath)) - } - .collect(Collectors.toList()) - val manifest = Manifest(s3FileEntries) - - return toRuntime { objectMapper.writeValueAsString(manifest) } - } - - @Throws(Exception::class) - override fun dropStageIfExists( - database: JdbcDatabase?, - stageName: String?, - stagingPath: String? - ) { - // stageName is unused here but used in Snowflake. This interface needs to be fixed. - s3StorageOperations.dropBucketObject(stagingPath!!) - } - - companion object { - private val BASE64_ENCODER: Base64.Encoder = Base64.getEncoder() - private val LOGGER: Logger = - LoggerFactory.getLogger(RedshiftS3StagingSqlOperations::class.java) - - private fun getFullS3Path(s3BucketName: String?, s3StagingFile: String): String { - return java.lang.String.join("/", "s3:/", s3BucketName, s3StagingFile) - } - - private fun getManifestPath( - s3BucketName: String?, - s3StagingFile: String, - stagingPath: String? - ): String { - return "s3://$s3BucketName/$stagingPath$s3StagingFile" - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.kt deleted file mode 100644 index c2cc31b3e0e2..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.kt +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift.operations - -import com.google.common.collect.Iterables -import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.integrations.base.JavaBaseConstants -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage -import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils.insertRawRecordsInSingleQuery -import io.airbyte.commons.json.Jsons.serialize -import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants -import java.nio.charset.StandardCharsets -import java.sql.Connection -import java.sql.SQLException -import java.time.Instant -import java.time.OffsetDateTime -import java.time.ZoneOffset -import java.util.* -import org.jooq.DSLContext -import org.jooq.Record -import org.jooq.SQLDialect -import org.jooq.conf.ParamType -import org.jooq.conf.Settings -import org.jooq.conf.StatementType -import org.jooq.impl.DSL -import org.jooq.impl.SQLDataType -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -open class RedshiftSqlOperations : JdbcSqlOperations() { - private val dslContext: DSLContext - get() = DSL.using(SQLDialect.POSTGRES) - - override fun createTableQueryV1(schemaName: String?, tableName: String?): String { - return String.format( - """ - CREATE TABLE IF NOT EXISTS %s.%s ( - %s VARCHAR PRIMARY KEY, - %s SUPER, - %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP) - - """.trimIndent(), - schemaName, - tableName, - JavaBaseConstants.COLUMN_NAME_AB_ID, - JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT - ) - } - - override fun createTableQueryV2(schemaName: String?, tableName: String?): String { - val dsl = dslContext - return dsl.createTableIfNotExists(DSL.name(schemaName, tableName)) - .column( - JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, - SQLDataType.VARCHAR(36).nullable(false) - ) - .column( - JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, - SQLDataType.TIMESTAMPWITHTIMEZONE.defaultValue( - DSL.function("GETDATE", SQLDataType.TIMESTAMPWITHTIMEZONE) - ) - ) - .column(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, SQLDataType.TIMESTAMPWITHTIMEZONE) - .column( - JavaBaseConstants.COLUMN_NAME_DATA, - RedshiftDestinationConstants.SUPER_TYPE.nullable(false) - ) - .column( - JavaBaseConstants.COLUMN_NAME_AB_META, - RedshiftDestinationConstants.SUPER_TYPE.nullable(true) - ) - .getSQL() - } - - @Throws(SQLException::class) - public override fun insertRecordsInternal( - database: JdbcDatabase, - records: List, - schemaName: String?, - tmpTableName: String? - ) { - LOGGER.info("actual size of batch: {}", records.size) - - // query syntax: - // INSERT INTO public.users (ab_id, data, emitted_at) VALUES - // (?, ?::jsonb, ?), - // ... - val insertQueryComponent = - String.format( - "INSERT INTO %s.%s (%s, %s, %s) VALUES\n", - schemaName, - tmpTableName, - JavaBaseConstants.COLUMN_NAME_AB_ID, - JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT - ) - val recordQueryComponent = "(?, JSON_PARSE(?), ?),\n" - insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, records) - } - - override fun insertRecordsInternalV2( - database: JdbcDatabase, - records: List, - schemaName: String?, - tableName: String? - ) { - try { - database.execute { connection: Connection -> - LOGGER.info("Total records received to insert: {}", records.size) - // This comment was copied from DV1 code - // (SqlOperationsUtils.insertRawRecordsInSingleQuery): - // > We also partition the query to run on 10k records at a time, since some DBs set - // a max limit on - // > how many records can be inserted at once - // > TODO(sherif) this should use a smarter, destination-aware partitioning scheme - // instead of 10k by - // > default - for (batch in Iterables.partition(records, 10000)) { - val create = - DSL.using( - connection, - SQLDialect.POSTGRES, // Force inlined params. - // jooq normally tries to intelligently use bind params when possible. - // This would cause queries with many params to use inline params, - // but small queries would use bind params. - // In turn, that would force us to intelligently escape string values, - // since we need to escape inlined strings - // but need to not escape bound strings. - // Instead, we force jooq to always inline params, - // and always call escapeStringLiteral() on the string values. - Settings().withStatementType(StatementType.STATIC_STATEMENT) - ) - // JOOQ adds some overhead here. Building the InsertValuesStep object takes - // about 139ms for 5K - // records. - // That's a nontrivial execution speed loss when the actual statement execution - // takes 500ms. - // Hopefully we're executing these statements infrequently enough in a sync that - // it doesn't matter. - // But this is a potential optimization if we need to eke out a little more - // performance on standard - // inserts. - // ... which presumably we won't, because standard inserts is so inherently - // slow. - // See - // https://github.com/airbytehq/airbyte/blob/f73827eb43f62ee30093451c434ad5815053f32d/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java#L39 - // and - // https://github.com/airbytehq/airbyte/blob/f73827eb43f62ee30093451c434ad5815053f32d/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/SqlOperationsUtils.java#L62 - // for how DV1 did this in pure JDBC. - var insert = - create.insertInto< - Record?, String?, String?, String?, OffsetDateTime?, OffsetDateTime?>( - DSL.table(DSL.name(schemaName, tableName)), - DSL.field( - JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, - SQLDataType.VARCHAR(36) - ), - DSL.field( - JavaBaseConstants.COLUMN_NAME_DATA, - RedshiftDestinationConstants.SUPER_TYPE - ), - DSL.field( - JavaBaseConstants.COLUMN_NAME_AB_META, - RedshiftDestinationConstants.SUPER_TYPE - ), - DSL.field( - JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, - SQLDataType.TIMESTAMPWITHTIMEZONE - ), - DSL.field( - JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, - SQLDataType.TIMESTAMPWITHTIMEZONE - ) - ) - for (record in batch) { - insert = - insert.values( - DSL.`val`(UUID.randomUUID().toString()), - DSL.function( - "JSON_PARSE", - String::class.java, - DSL.`val`(escapeStringLiteral(record.serialized)) - ), - DSL.function( - "JSON_PARSE", - String::class.java, - DSL.`val`(serialize(record.record!!.meta)) - ), - DSL.`val`( - Instant.ofEpochMilli(record.record!!.emittedAt) - .atOffset(ZoneOffset.UTC) - ), - DSL.`val`(null as OffsetDateTime?) - ) - } - val insertSQL = insert.getSQL(ParamType.INLINED) - LOGGER.info( - "Prepared batch size: {}, Schema: {}, Table: {}, SQL statement size {} MB", - batch.size, - schemaName, - tableName, - (insertSQL.toByteArray(StandardCharsets.UTF_8).size) / (1024 * 1024L) - ) - val startTime = System.currentTimeMillis() - // Intentionally not using Jooq's insert.execute() as it was hiding the actual - // RedshiftException - // and also leaking the insert record values in the exception message. - connection.createStatement().execute(insertSQL) - LOGGER.info( - "Executed batch size: {}, Schema: {}, Table: {} in {} ms", - batch.size, - schemaName, - tableName, - (System.currentTimeMillis() - startTime) - ) - } - } - } catch (e: Exception) { - LOGGER.error("Error while inserting records", e) - throw RuntimeException(e) - } - } - - companion object { - private val LOGGER: Logger = LoggerFactory.getLogger(RedshiftSqlOperations::class.java) - const val REDSHIFT_VARCHAR_MAX_BYTE_SIZE: Int = 65535 - - @JvmStatic - fun escapeStringLiteral(str: String?): String? { - return str?.replace("\\", "\\\\") - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDV2Migration.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDV2Migration.kt new file mode 100644 index 000000000000..1c1c48806b11 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDV2Migration.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.redshift.typing_deduping + +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.github.oshai.kotlinlogging.KotlinLogging + +private val logger = KotlinLogging.logger {} + +class RedshiftDV2Migration( + namingConventionTransformer: NamingConventionTransformer, + database: JdbcDatabase, + databaseName: String, + private val sqlGenerator: RedshiftSqlGenerator, +) : Migration { + private val legacyV1V2migrator = + JdbcV1V2Migrator(namingConventionTransformer, database, databaseName) + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + logger.info { "Initializing DV2 Migration check" } + legacyV1V2migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream) + return Migration.MigrationResult( + RedshiftState( + needsSoftReset = false, + isAirbyteMetaPresentInRaw = false, + ), + true, + ) + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.kt index 977d5ca67277..9c584b20599e 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.kt @@ -3,6 +3,7 @@ */ package io.airbyte.integrations.destination.redshift.typing_deduping +import com.amazon.redshift.util.RedshiftException import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler @@ -33,21 +34,69 @@ class RedshiftDestinationHandler( SQLDialect.DEFAULT ) { override fun createNamespaces(schemas: Set) { - TODO("Not yet implemented") + // SHOW SCHEMAS will fail with a "schema ... does not exist" error + // if any schema is deleted while the SHOW SCHEMAS query runs. + // Run in a retry loop to mitigate this. + // This is mostly useful for tests, where we create+drop many schemas. + // Use up to 10 attempts since this is a fairly basic operation. + val maxAttempts = 10 + for (i in 1..maxAttempts) { + try { + // plain SHOW SCHEMAS doesn't work, we have to specify the database name explicitly + val existingSchemas = + jdbcDatabase.queryJsons("""SHOW SCHEMAS FROM DATABASE "$catalogName";""").map { + it["schema_name"].asText() + } + schemas.forEach { + if (!existingSchemas.contains(it)) { + log.info { "Schema $it does not exist, proceeding to create it" } + jdbcDatabase.execute("""CREATE SCHEMA IF NOT EXISTS "$it";""") + } + } + break + } catch (e: RedshiftException) { + if (e.message == null) { + // No message, assume this is some different error and fail fast + throw e + } + + // Can't smart cast, so use !! and temp var + val message: String = e.message!! + val isConcurrentSchemaDeletionError = + message.startsWith("ERROR: schema") && message.endsWith("does not exist") + if (!isConcurrentSchemaDeletionError) { + // The error is not + // `ERROR: schema "sql_generator_test_akqywgsxqs" does not exist` + // so just fail fast + throw e + } + + // Swallow the exception and go the next loop iteration. + log.info { + "Encountered possibly transient nonexistent schema error during a SHOW SCHEMAS query. Retrying ($i/$maxAttempts attempts)" + } + } + } } @Throws(Exception::class) override fun execute(sql: Sql) { + execute(sql, logStatements = true) + } + + fun execute(sql: Sql, logStatements: Boolean) { val transactions = sql.transactions val queryId = UUID.randomUUID() for (transaction in transactions) { val transactionId = UUID.randomUUID() - log.info( - "Executing sql {}-{}: {}", - queryId, - transactionId, - java.lang.String.join("\n", transaction) - ) + if (logStatements) { + log.info( + "Executing sql {}-{}: {}", + queryId, + transactionId, + java.lang.String.join("\n", transaction) + ) + } val startTime = System.currentTimeMillis() try { @@ -59,7 +108,10 @@ class RedshiftDestinationHandler( // see https://github.com/airbytehq/airbyte/issues/33900 modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n") modifiedStatements.addAll(transaction) - jdbcDatabase.executeWithinTransaction(modifiedStatements) + jdbcDatabase.executeWithinTransaction( + modifiedStatements, + logStatements = logStatements + ) } catch (e: SQLException) { log.error("Sql {}-{} failed", queryId, transactionId, e) // This is a big hammer for something that should be much more targetted, only when diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.kt index f1214d66c3a2..d4b77e2174e2 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.kt @@ -269,25 +269,26 @@ open class RedshiftSqlGenerator( * Return ROW_NUMBER() OVER (PARTITION BY primaryKeys ORDER BY cursor DESC NULLS LAST, * _airbyte_extracted_at DESC) * - * @param primaryKeys - * @param cursor + * @param primaryKey + * @param cursorField * @return */ - override fun getRowNumber(primaryKeys: List, cursor: Optional): Field { + override fun getRowNumber( + primaryKey: List, + cursorField: Optional + ): Field { // literally identical to postgres's getRowNumber implementation, changes here probably // should // be reflected there val primaryKeyFields = - if (primaryKeys != null) - primaryKeys - .stream() - .map { columnId: ColumnId -> DSL.field(DSL.quotedName(columnId.name)) } - .collect(Collectors.toList()) - else ArrayList() + primaryKey + .stream() + .map { columnId: ColumnId -> DSL.field(DSL.quotedName(columnId.name)) } + .collect(Collectors.toList()) val orderedFields: MutableList> = ArrayList() // We can still use Jooq's field to get the quoted name with raw sql templating. // jooq's .desc returns SortField instead of Field and NULLS LAST doesn't work with it - cursor.ifPresent { columnId: ColumnId -> + cursorField.ifPresent { columnId: ColumnId -> orderedFields.add( DSL.field("{0} desc NULLS LAST", DSL.field(DSL.quotedName(columnId.name))) ) @@ -331,6 +332,7 @@ open class RedshiftSqlGenerator( companion object { const val CASE_STATEMENT_SQL_TEMPLATE: String = "CASE WHEN {0} THEN {1} ELSE {2} END " const val CASE_STATEMENT_NO_ELSE_SQL_TEMPLATE: String = "CASE WHEN {0} THEN {1} END " + const val QUOTE: String = "\"" private const val AIRBYTE_META_COLUMN_CHANGES_KEY = "changes" diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/util/RedshiftUtil.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/util/RedshiftUtil.kt index 78774449a96d..28fd571404b4 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/util/RedshiftUtil.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/util/RedshiftUtil.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.redshift.util import com.fasterxml.jackson.databind.JsonNode -import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants import io.github.oshai.kotlinlogging.KotlinLogging @@ -37,10 +36,4 @@ object RedshiftUtil { private fun isNullOrEmpty(jsonNode: JsonNode?): Boolean { return null == jsonNode || "" == jsonNode.asText() } - - @Throws(Exception::class) - fun checkSvvTableAccess(database: JdbcDatabase) { - log.info("checking SVV_TABLE_INFO permissions") - database.queryJsons("SELECT 1 FROM SVV_TABLE_INFO LIMIT 1;") - } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionHandler.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionHandler.kt deleted file mode 100644 index 143f87b3eac9..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionHandler.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import java.sql.Connection -import java.sql.SQLException - -object RedshiftConnectionHandler { - /** - * For to close a connection. Aimed to be use in test only. - * - * @param connection The connection to close - */ - fun close(connection: Connection) { - try { - connection.autoCommit = false - connection.commit() - connection.close() - } catch (e: SQLException) { - throw RuntimeException(e) - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.kt index 05e71c4cbd6a..72ece537e22e 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.redshift import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.integrations.base.DestinationConfig import io.airbyte.commons.io.IOs.readFile import io.airbyte.commons.json.Jsons.deserialize import io.airbyte.protocol.models.v0.AirbyteConnectionStatus @@ -20,6 +21,7 @@ class RedshiftConnectionTest { @Throws(Exception::class) fun testCheckIncorrectPasswordFailure() { (config as ObjectNode).put("password", "fake") + DestinationConfig.initialize(config) status = destination.check(config) Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status!!.status) Assertions.assertTrue(status!!.message.contains("State code: 28000;")) @@ -29,6 +31,7 @@ class RedshiftConnectionTest { @Throws(Exception::class) fun testCheckIncorrectUsernameFailure() { (config as ObjectNode).put("username", "") + DestinationConfig.initialize(config) status = destination.check(config) Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status!!.status) Assertions.assertTrue(status!!.message.contains("State code: 28000;")) @@ -38,6 +41,7 @@ class RedshiftConnectionTest { @Throws(Exception::class) fun testCheckIncorrectHostFailure() { (config as ObjectNode).put("host", "localhost2") + DestinationConfig.initialize(config) status = destination.check(config) Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status!!.status) Assertions.assertTrue(status!!.message.contains("State code: 08001;")) @@ -47,6 +51,7 @@ class RedshiftConnectionTest { @Throws(Exception::class) fun testCheckIncorrectDataBaseFailure() { (config as ObjectNode).put("database", "wrongdatabase") + DestinationConfig.initialize(config) status = destination.check(config) Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status!!.status) Assertions.assertTrue(status!!.message.contains("State code: 3D000;")) diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.kt deleted file mode 100644 index ec6d402d7a83..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.kt +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import com.amazon.redshift.util.RedshiftTimestamp -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.ObjectNode -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.Database -import io.airbyte.cdk.db.factory.ConnectionFactory.create -import io.airbyte.cdk.db.factory.DatabaseDriver -import io.airbyte.cdk.db.jdbc.JdbcUtils.DATABASE_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.HOST_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.PASSWORD_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.PORT_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.USERNAME_KEY -import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA -import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT -import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest -import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces.generate -import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces.isOlderThan2Days -import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator -import io.airbyte.commons.io.IOs.readFile -import io.airbyte.commons.json.Jsons -import io.airbyte.commons.json.Jsons.deserialize -import io.airbyte.commons.string.Strings.addRandomSuffix -import io.airbyte.integrations.destination.redshift.RedshiftDestination.Companion.SSL_JDBC_PARAMETERS -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations -import java.nio.file.Path -import java.sql.Connection -import java.sql.SQLException -import java.time.ZoneOffset -import java.time.ZonedDateTime -import java.time.format.DateTimeFormatterBuilder -import java.time.temporal.ChronoField -import java.util.Optional -import java.util.stream.Collectors -import org.jooq.DSLContext -import org.jooq.Record -import org.jooq.impl.DSL -import org.junit.jupiter.api.Disabled -import org.junit.jupiter.api.parallel.Execution -import org.junit.jupiter.api.parallel.ExecutionMode -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -/** - * Integration test testing [RedshiftDestination]. The default Redshift integration test credentials - * contain S3 credentials - this automatically causes COPY to be selected. - */ -// these tests are not yet thread-safe, unlike the DV2 tests. -@Execution(ExecutionMode.SAME_THREAD) -@Disabled -class RedshiftS3StagingInsertDestinationAcceptanceTest : JdbcDestinationAcceptanceTest() { - // config from which to create / delete schemas. - private lateinit var baseConfig: JsonNode - - // config which refers to the schema that the test is being run in. - // override the getter name, because the base class declares a getConfig method, which clashes. - // Eventually we should just replace the super method with a native kotlin `abstract val`. - @get:JvmName("getConfig_") - protected lateinit var config: JsonNode - @SuppressFBWarnings( - "NP_NONNULL_RETURN_VIOLATION", - "spotbugs doesn't like lateinit on non-private vars" - ) - get - private val namingResolver = RedshiftSQLNameTransformer() - private val USER_WITHOUT_CREDS = addRandomSuffix("test_user", "_", 5) - - protected var database: Database? = null - private set - private lateinit var connection: Connection - protected var testDestinationEnv: TestDestinationEnv? = null - - override val imageName: String - get() = "airbyte/destination-redshift:dev" - - override fun getConfig(): JsonNode { - return config - } - - val staticConfig: JsonNode - get() = deserialize(readFile(Path.of("secrets/config_staging.json"))) - - override fun getFailCheckConfig(): JsonNode? { - val invalidConfig: JsonNode = Jsons.clone(config) - (invalidConfig as ObjectNode).put("password", "wrong password") - return invalidConfig - } - - override fun getTestDataComparator(): TestDataComparator { - return RedshiftTestDataComparator() - } - - override fun supportBasicDataTypeTest(): Boolean { - return true - } - - override fun supportArrayDataTypeTest(): Boolean { - return true - } - - override fun supportObjectDataTypeTest(): Boolean { - return true - } - - override fun supportIncrementalSchemaChanges(): Boolean { - return true - } - - override fun supportsInDestinationNormalization(): Boolean { - return true - } - - @Throws(Exception::class) - override fun retrieveRecords( - env: TestDestinationEnv?, - streamName: String, - namespace: String, - streamSchema: JsonNode - ): List { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) - .stream() - .map { j: JsonNode -> j.get(COLUMN_NAME_DATA) } - .collect(Collectors.toList()) - } - - override fun implementsNamespaces(): Boolean { - return true - } - - @Throws(Exception::class) - override fun retrieveNormalizedRecords( - testEnv: TestDestinationEnv?, - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - streamName: String?, - namespace: String? - ): List { - var tableName = namingResolver.getIdentifier(streamName!!) - if (!tableName.startsWith("\"")) { - // Currently, Normalization always quote tables identifiers - tableName = "\"" + tableName + "\"" - } - return retrieveRecordsFromTable(tableName, namespace) - } - - @Throws(SQLException::class) - private fun retrieveRecordsFromTable(tableName: String, schemaName: String?): List { - return database!!.query> { ctx: DSLContext -> - ctx.fetch( - String.format( - "SELECT * FROM %s.%s ORDER BY %s ASC;", - schemaName, - tableName, - COLUMN_NAME_EMITTED_AT - ) - ) - .stream() - .map { record: Record -> - getJsonFromRecord(record) { value: Any -> - if (value is RedshiftTimestamp) { - // We can't just use rts.toInstant().toString(), because that will - // mangle historical - // dates (e.g. 1504-02-28...) because toInstant() just converts to epoch - // millis, - // which works _very badly_ for for very old dates. - // Instead, convert to a string and then parse that string. - // We can't just rts.toString(), because that loses the timezone... - // so instead we use getPostgresqlString and parse that >.> - // Thanks, redshift. - return@getJsonFromRecord Optional.of( - ZonedDateTime.parse( - value.postgresqlString, - DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd HH:mm:ss") - .optionalStart() - .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 9, true) - .optionalEnd() - .appendPattern("X") - .toFormatter() - ) - .withZoneSameInstant(ZoneOffset.UTC) - .toString() - ) - } else { - return@getJsonFromRecord Optional.empty() - } - } - } - .collect(Collectors.toList()) - }!! - } - - // for each test we create a new schema in the database. run the test in there and then remove - // it. - @Throws(Exception::class) - override fun setup(testEnv: TestDestinationEnv, TEST_SCHEMAS: HashSet) { - val schemaName = generate() - val createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName) - baseConfig = staticConfig - database = createDatabase() - removeOldNamespaces() - database!!.query { ctx: DSLContext -> ctx.execute(createSchemaQuery) } - val createUser = - String.format( - "create user %s with password '%s' SESSION TIMEOUT 60;", - USER_WITHOUT_CREDS, - baseConfig!!["password"].asText() - ) - database!!.query { ctx: DSLContext -> ctx.execute(createUser) } - val configForSchema: JsonNode = Jsons.clone(baseConfig) - (configForSchema as ObjectNode).put("schema", schemaName) - TEST_SCHEMAS.add(schemaName) - config = configForSchema - testDestinationEnv = testEnv - } - - private fun removeOldNamespaces() { - val schemas: List - try { - schemas = - database!! - .query { ctx: DSLContext -> - ctx.fetch("SELECT schema_name FROM information_schema.schemata;") - }!! - .stream() - .map { record: Record -> record["schema_name"].toString() } - .toList() - } catch (e: SQLException) { - // if we can't fetch the schemas, just return. - return - } - - var schemasDeletedCount = 0 - for (schema in schemas) { - if (isOlderThan2Days(schema)) { - try { - database!!.query { ctx: DSLContext -> - ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema)) - } - schemasDeletedCount++ - } catch (e: SQLException) { - LOGGER.error("Failed to delete old dataset: {}", schema, e) - } - } - } - LOGGER.info("Deleted {} old schemas.", schemasDeletedCount) - } - - @Throws(Exception::class) - override fun tearDown(testEnv: TestDestinationEnv) { - println("TEARING_DOWN_SCHEMAS: $testSchemas") - database!!.query { ctx: DSLContext -> - ctx.execute( - String.format("DROP SCHEMA IF EXISTS %s CASCADE", config!!["schema"].asText()) - ) - } - for (schema in testSchemas) { - database!!.query { ctx: DSLContext -> - ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema)) - } - } - database!!.query { ctx: DSLContext -> - ctx.execute(String.format("drop user if exists %s;", USER_WITHOUT_CREDS)) - } - RedshiftConnectionHandler.close(connection) - } - - protected fun createDatabase(): Database { - connection = - create( - baseConfig.get(USERNAME_KEY).asText(), - baseConfig.get(PASSWORD_KEY).asText(), - // ConnectionFactory.create() excepts a Map - // but SSL_JDBC_PARAMETERS is a Map - // so copy it to a new map :( - HashMap(SSL_JDBC_PARAMETERS), - String.format( - DatabaseDriver.REDSHIFT.urlFormatString, - baseConfig.get(HOST_KEY).asText(), - baseConfig.get(PORT_KEY).asInt(), - baseConfig.get(DATABASE_KEY).asText() - ) - ) - - return Database(DSL.using(connection)) - } - - override val maxRecordValueLimit: Int - get() = RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE - - companion object { - private val LOGGER: Logger = - LoggerFactory.getLogger(RedshiftS3StagingInsertDestinationAcceptanceTest::class.java) - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.kt deleted file mode 100644 index 132ba199c123..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftTestDataComparator.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator -import java.time.LocalDate -import java.time.ZoneOffset -import java.time.ZonedDateTime -import java.time.format.DateTimeFormatter -import java.time.format.DateTimeParseException -import java.util.Locale -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -class RedshiftTestDataComparator : AdvancedTestDataComparator() { - private val namingResolver = RedshiftSQLNameTransformer() - - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - override fun resolveIdentifier(identifier: String?): List { - val result: MutableList = ArrayList() - val resolved = namingResolver.getIdentifier(identifier!!) - result.add(identifier) - result.add(resolved) - if (!resolved.startsWith("\"")) { - result.add(resolved.lowercase(Locale.getDefault())) - result.add(resolved.uppercase(Locale.getDefault())) - } - return result - } - - override fun compareDateTimeWithTzValues( - airbyteMessageValue: String, - destinationValue: String - ): Boolean { - try { - val airbyteDate = - ZonedDateTime.parse(airbyteMessageValue, airbyteDateTimeWithTzFormatter) - .withZoneSameInstant(ZoneOffset.UTC) - - val destinationDate = - ZonedDateTime.parse(destinationValue).withZoneSameInstant(ZoneOffset.UTC) - return airbyteDate == destinationDate - } catch (e: DateTimeParseException) { - LOGGER.warn( - "Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}", - airbyteMessageValue, - destinationValue, - e - ) - return compareTextValues(airbyteMessageValue, destinationValue) - } - } - - override fun compareDateTimeValues(expectedValue: String, actualValue: String): Boolean { - val destinationDate = parseLocalDateTime(actualValue) - val expectedDate = - LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT)) - return expectedDate == destinationDate - } - - private fun parseLocalDateTime(dateTimeValue: String?): LocalDate? { - return if (dateTimeValue != null) { - LocalDate.parse(dateTimeValue, DateTimeFormatter.ofPattern(getFormat(dateTimeValue))) - } else { - null - } - } - - private fun getFormat(dateTimeValue: String): String { - return if (dateTimeValue.contains("T")) { - // MySql stores array of objects as a jsonb type, i.e. array of string for all cases - AIRBYTE_DATETIME_FORMAT - } else { - // MySql stores datetime as datetime type after normalization - AIRBYTE_DATETIME_PARSED_FORMAT - } - } - - companion object { - private val LOGGER: Logger = LoggerFactory.getLogger(RedshiftTestDataComparator::class.java) - - protected const val REDSHIFT_DATETIME_WITH_TZ_FORMAT: String = "yyyy-MM-dd HH:mm:ssX" - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshKeyRedshiftS3StagingDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshKeyRedshiftS3StagingDestinationAcceptanceTest.kt deleted file mode 100644 index ce7e71740b08..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshKeyRedshiftS3StagingDestinationAcceptanceTest.kt +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import io.airbyte.cdk.integrations.base.ssh.SshTunnel -import org.junit.jupiter.api.Disabled - -/* - * SshKeyRedshiftInsertDestinationAcceptanceTest runs basic Redshift Destination Tests using the SQL - * Insert mechanism for upload of data and "key" authentication for the SSH bastion configuration. - */ -@Disabled -class SshKeyRedshiftS3StagingDestinationAcceptanceTest : - SshRedshiftDestinationBaseAcceptanceTest() { - override val tunnelMethod: SshTunnel.TunnelMethod - get() = SshTunnel.TunnelMethod.SSH_KEY_AUTH -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.kt deleted file mode 100644 index 48b9891da640..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.kt +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import io.airbyte.cdk.integrations.base.ssh.SshTunnel -import org.junit.jupiter.api.Disabled - -/** - * SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using - * the S3 Staging mechanism for upload of data and "password" authentication for the SSH bastion - * configuration. - */ -@Disabled -class SshPasswordRedshiftStagingDestinationAcceptanceTest : - SshRedshiftDestinationBaseAcceptanceTest() { - override val tunnelMethod: SshTunnel.TunnelMethod - get() = SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.kt deleted file mode 100644 index a15d511c6c25..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.kt +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import com.fasterxml.jackson.core.type.TypeReference -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.ObjectNode -import com.google.common.collect.ImmutableMap -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.ContextQueryFunction -import io.airbyte.cdk.db.Database -import io.airbyte.cdk.db.factory.ConnectionFactory.create -import io.airbyte.cdk.db.factory.DatabaseDriver -import io.airbyte.cdk.db.jdbc.JdbcUtils.DATABASE_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.HOST_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.HOST_LIST_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.PASSWORD_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.PORT_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.PORT_LIST_KEY -import io.airbyte.cdk.db.jdbc.JdbcUtils.USERNAME_KEY -import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA -import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT -import io.airbyte.cdk.integrations.base.ssh.SshTunnel -import io.airbyte.cdk.integrations.base.ssh.SshTunnel.Companion.sshWrap -import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest -import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces.generate -import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator -import io.airbyte.commons.functional.CheckedConsumer -import io.airbyte.commons.functional.CheckedFunction -import io.airbyte.commons.io.IOs.readFile -import io.airbyte.commons.jackson.MoreMappers.initMapper -import io.airbyte.commons.json.Jsons -import io.airbyte.commons.json.Jsons.deserialize -import io.airbyte.commons.json.Jsons.jsonNode -import io.airbyte.commons.string.Strings.addRandomSuffix -import io.airbyte.integrations.destination.redshift.RedshiftConnectionHandler.close -import io.airbyte.integrations.destination.redshift.RedshiftDestination.Companion.SSL_JDBC_PARAMETERS -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations -import java.io.IOException -import java.nio.file.Path -import java.sql.Connection -import java.util.function.Function -import java.util.stream.Collectors -import org.jooq.DSLContext -import org.jooq.Record -import org.jooq.impl.DSL - -abstract class SshRedshiftDestinationBaseAcceptanceTest : JdbcDestinationAcceptanceTest() { - protected var schemaName: String? = null - - // config from which to create / delete schemas. - protected var baseConfig: JsonNode? = null - - // config which refers to the schema that the test is being run in. - protected var config: JsonNode? - // override the getter name, because the base class declares a getConfig method, which - // clashes. - // Eventually we should just replace the super method with a native kotlin `abstract val`. - @JvmName("getConfig_") get() = null - set(value) = TODO() - - private var database: Database? = null - - private var connection: Connection? = null - - private val namingResolver = RedshiftSQLNameTransformer() - private val USER_WITHOUT_CREDS = addRandomSuffix("test_user", "_", 5) - - abstract val tunnelMethod: SshTunnel.TunnelMethod - - override val imageName: String - get() = "airbyte/destination-redshift:dev" - - @Throws(Exception::class) - override fun getConfig(): JsonNode { - val configAsMap = deserializeToObjectMap(config) - val configMapBuilder = ImmutableMap.Builder().putAll(configAsMap) - return getTunnelConfig(tunnelMethod, configMapBuilder) - } - - protected fun getTunnelConfig( - tunnelMethod: SshTunnel.TunnelMethod, - builderWithSchema: ImmutableMap.Builder - ): JsonNode { - val sshBastionHost = config!!["ssh_bastion_host"] - val sshBastionPort = config!!["ssh_bastion_port"] - val sshBastionUser = config!!["ssh_bastion_user"] - val sshBastionPassword = config!!["ssh_bastion_password"] - val sshBastionKey = config!!["ssh_bastion_key"] - - val tunnelUserPassword = - if (tunnelMethod == SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH) - sshBastionPassword.asText() - else "" - val sshKey = - if (tunnelMethod == SshTunnel.TunnelMethod.SSH_KEY_AUTH) sshBastionKey.asText() else "" - - return jsonNode( - builderWithSchema - .put( - "tunnel_method", - jsonNode( - ImmutableMap.builder() - .put("tunnel_host", sshBastionHost) - .put("tunnel_method", tunnelMethod.toString()) - .put("tunnel_port", sshBastionPort.intValue()) - .put("tunnel_user", sshBastionUser) - .put("tunnel_user_password", tunnelUserPassword) - .put("ssh_key", sshKey) - .build() - ) - ) - .build() - ) - } - - @get:Throws(IOException::class) - val staticConfig: JsonNode - get() { - val configPath = Path.of("secrets/config_staging.json") - val configAsString = readFile(configPath) - return deserialize(configAsString) - } - - override fun getFailCheckConfig(): JsonNode? { - val invalidConfig: JsonNode = Jsons.clone(config!!) - (invalidConfig as ObjectNode).put("password", "wrong password") - return invalidConfig - } - - override fun implementsNamespaces(): Boolean { - return true - } - - @Throws(Exception::class) - override fun retrieveNormalizedRecords( - env: TestDestinationEnv?, - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - streamName: String?, - namespace: String? - ): List { - val tableName = namingResolver.getIdentifier(streamName!!) - return retrieveRecordsFromTable(tableName, namespace) - } - - @Throws(Exception::class) - override fun retrieveRecords( - testEnv: TestDestinationEnv?, - streamName: String, - namespace: String, - streamSchema: JsonNode - ): List { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) - .stream() - .map(Function { j: JsonNode -> j.get(COLUMN_NAME_DATA) }) - .collect(Collectors.toList()) - } - - @Throws(Exception::class) - private fun retrieveRecordsFromTable(tableName: String, schemaName: String?): List { - return sshWrap>( - getConfig(), - HOST_LIST_KEY, - PORT_LIST_KEY, - CheckedFunction { config: JsonNode -> - database!!.query> { ctx: DSLContext -> - ctx.fetch( - String.format( - "SELECT * FROM %s.%s ORDER BY %s ASC;", - schemaName, - tableName, - COLUMN_NAME_EMITTED_AT - ) - ) - .stream() - .map { record: Record -> this.getJsonFromRecord(record) } - .collect(Collectors.toList()) - }!! - } - ) - } - - override fun getTestDataComparator(): TestDataComparator { - return RedshiftTestDataComparator() - } - - private fun createDatabaseFromConfig(config: JsonNode?): Database { - connection = - create( - config!!.get(USERNAME_KEY).asText(), - config.get(PASSWORD_KEY).asText(), - // we have a map - // but we need a map - // so just copy the map - HashMap(SSL_JDBC_PARAMETERS), - String.format( - DatabaseDriver.REDSHIFT.urlFormatString, - config.get(HOST_KEY).asText(), - config.get(PORT_KEY).asInt(), - config.get(DATABASE_KEY).asText() - ) - ) - - return Database(DSL.using(connection)) - } - - override val maxRecordValueLimit: Int - get() = RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE - - @Throws(Exception::class) - override fun setup(testEnv: TestDestinationEnv, TEST_SCHEMAS: HashSet) { - baseConfig = staticConfig - val configForSchema: JsonNode = Jsons.clone(baseConfig!!) - schemaName = generate() - TEST_SCHEMAS.add(schemaName!!) - (configForSchema as ObjectNode).put("schema", schemaName) - config = configForSchema - database = createDatabaseFromConfig(config) - - // create the schema - sshWrap( - getConfig(), - HOST_LIST_KEY, - PORT_LIST_KEY, - CheckedConsumer { config: JsonNode? -> - database!!.query( - ContextQueryFunction { ctx: DSLContext -> - ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName)) - } - ) - } - ) - - // create the user - sshWrap( - getConfig(), - HOST_LIST_KEY, - PORT_LIST_KEY, - CheckedConsumer { config: JsonNode? -> - database!!.query( - ContextQueryFunction { ctx: DSLContext -> - ctx.fetch( - String.format( - "CREATE USER %s WITH PASSWORD '%s' SESSION TIMEOUT 60;", - USER_WITHOUT_CREDS, - baseConfig!!["password"].asText() - ) - ) - } - ) - } - ) - } - - @Throws(Exception::class) - override fun tearDown(testEnv: TestDestinationEnv) { - // blow away the test schema at the end. - sshWrap( - getConfig(), - HOST_LIST_KEY, - PORT_LIST_KEY, - CheckedConsumer { config: JsonNode? -> - database!!.query( - ContextQueryFunction { ctx: DSLContext -> - ctx.fetch(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName)) - } - ) - } - ) - - // blow away the user at the end. - sshWrap( - getConfig(), - HOST_LIST_KEY, - PORT_LIST_KEY, - CheckedConsumer { config: JsonNode? -> - database!!.query( - ContextQueryFunction { ctx: DSLContext -> - ctx.fetch(String.format("DROP USER IF EXISTS %s;", USER_WITHOUT_CREDS)) - } - ) - } - ) - close(connection!!) - } - - companion object { - fun deserializeToObjectMap(json: JsonNode?): Map { - val objectMapper = initMapper() - return objectMapper.convertValue>( - json, - object : TypeReference?>() {} - ) - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.kt index a6d1fa3be15e..8d949fb4b011 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.kt @@ -21,7 +21,6 @@ import io.airbyte.commons.json.Jsons.deserializeExact import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler import io.airbyte.integrations.destination.redshift.RedshiftDestination import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations.Companion.escapeStringLiteral import java.nio.file.Files import java.nio.file.Path import java.sql.ResultSet @@ -273,5 +272,9 @@ class RedshiftSqlGeneratorIntegrationTest : JdbcSqlGeneratorIntegrationTest obj.trim { it <= ' ' } } - .filter { line: String -> !line.isEmpty() } + .filter { line: String -> line.isNotEmpty() } .toList() Assertions.assertEquals(expectedSqlLines.size, generatedSqlLines.size) for (i in expectedSqlLines.indices) { diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 5b9c33edc265..637c1017b8e3 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -244,6 +244,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.1.1 | 2024-06-26 | [39008](https://github.com/airbytehq/airbyte/pull/39008) | Internal code changes | | 3.1.0 | 2024-06-26 | [39141](https://github.com/airbytehq/airbyte/pull/39141) | Remove nonfunctional "encrypted staging" option | | 3.0.0 | 2024-06-04 | [38886](https://github.com/airbytehq/airbyte/pull/38886) | Remove standard inserts mode | | 2.6.4 | 2024-05-31 | [38825](https://github.com/airbytehq/airbyte/pull/38825) | Adopt CDK 0.35.15 |