From 7c0a6c569dd33d37944d45454ae82eaae801966d Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Fri, 3 May 2024 11:20:07 -0700 Subject: [PATCH] convert destination-snowflake to Kotlin CDK (#36910) not only bringing snowflake to the latest CDK but also: 1) Bringing the `SourceOperation` into production code from the test code. There's really no reason those improvements should stay out of production (and they're present in the source-snowflake) 2) adding `putTimestamp` into the `SourceOperation`, so that snowflake doesn't throw an exception at every call, which implies it also creates a new thread 3) make use of the newly added ability to filter orphan thread on shutdown. We filter all the threads created during calls to `SFStatement.close()` 4) don't always take a lock when deleting destinationStates. We now check if there's any states to delete by doing a `SELECT` (and not taking any table lock) before issuing the `DELETE` (the old behavior was causing test contention, and it's a bad idea in general) 5) only execute `airbyte_internal._airbyte_destination_state` --- airbyte-cdk/java/airbyte-cdk/README.md | 4 +- .../io/airbyte/cdk/db/jdbc/JdbcDatabase.kt | 3 + .../integrations/base/IntegrationRunner.kt | 4 +- .../src/main/resources/version.properties | 2 +- .../destination/jdbc/JdbcSqlOperations.kt | 2 +- .../typing_deduping/JdbcDestinationHandler.kt | 59 +++--- .../destination/LocalAirbyteDestination.kt | 2 +- .../workers/internal/AirbyteDestination.kt | 2 +- .../BaseDestinationV1V2Migrator.kt | 2 +- .../BaseSqlGeneratorIntegrationTest.kt | 13 +- .../destination-snowflake/build.gradle | 2 +- .../destination-snowflake/gradle.properties | 2 +- .../destination-snowflake/metadata.yaml | 2 +- .../snowflake/SnowflakeDatabase.java | 2 +- .../snowflake/SnowflakeDestinationRunner.java | 14 ++ .../SnowflakeInternalStagingDestination.java | 5 +- .../snowflake/SnowflakeSourceOperations.java} | 30 ++- .../snowflake/SnowflakeSqlOperations.java | 6 +- .../SnowflakeSqlStagingOperations.java | 4 +- .../SnowflakeDestinationHandler.java | 65 ++++-- .../SnowflakeSqlGenerator.java | 46 ++-- .../SnowflakeV1V2Migrator.java | 14 +- .../SnowflakeV2TableMigrator.java | 20 +- ...wflakeInsertDestinationAcceptanceTest.java | 8 +- .../snowflake/SnowflakeTestUtils.java | 2 +- .../AbstractSnowflakeTypingDedupingTest.java | 46 ++-- ...ngLowercaseDatabaseTypingDedupingTest.java | 3 +- .../SnowflakeSqlGeneratorIntegrationTest.java | 196 +++++++++--------- .../SnowflakeSqlNameTransformerTest.java | 3 - .../snowflake/SnowflakeSqlOperationsTest.java | 2 +- docs/integrations/destinations/snowflake.md | 1 + 31 files changed, 338 insertions(+), 228 deletions(-) rename airbyte-integrations/connectors/destination-snowflake/src/{test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java => main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java} (54%) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 6ad23b89c055..87b4cf59e699 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -173,7 +173,9 @@ corresponds to that version. ### Java CDK | Version | Date | Pull Request | Subject | -|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake | +| 0.31.6 | 2024-05-02 | [\#37746](https://github.com/airbytehq/airbyte/pull/37746) | debuggability improvements. | | 0.31.5 | 2024-04-30 | [\#37758](https://github.com/airbytehq/airbyte/pull/37758) | Set debezium max retries to zero | | 0.31.4 | 2024-04-30 | [\#37754](https://github.com/airbytehq/airbyte/pull/37754) | Add DebeziumEngine notification log | | 0.31.3 | 2024-04-30 | [\#37726](https://github.com/airbytehq/airbyte/pull/37726) | Remove debezium retries | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt index 9f4ca36ca76d..384e85feac35 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt @@ -15,6 +15,8 @@ import java.util.function.Consumer import java.util.function.Function import java.util.stream.Stream import java.util.stream.StreamSupport +import org.slf4j.Logger +import org.slf4j.LoggerFactory /** Database object for interacting with a JDBC connection. */ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSourceOperations<*>?) : @@ -211,6 +213,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource abstract fun executeMetadataQuery(query: Function): T companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDatabase::class.java) /** * Map records returned in a result set. It is an "unsafe" stream because the stream must be * manually closed. Otherwise, there will be a database connection leak. diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index 2a2e50a60fb7..e053cca47888 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -370,8 +370,8 @@ internal constructor( } @JvmStatic - fun getThreadCreationInfo(thread: Thread): ThreadCreationInfo { - return getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo + fun getThreadCreationInfo(thread: Thread): ThreadCreationInfo? { + return getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo? } /** diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index a39d5358d186..92c66235cb23 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.31.6 +version=0.31.7 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt index f797de148330..6a1a29f45099 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt @@ -204,7 +204,7 @@ abstract class JdbcSqlOperations : SqlOperations { } } - fun dropTableIfExistsQuery(schemaName: String?, tableName: String?): String { + open fun dropTableIfExistsQuery(schemaName: String?, tableName: String?): String { return String.format("DROP TABLE IF EXISTS %s.%s;\n", schemaName, tableName) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 62bd8ce0f4c6..3e642da540ed 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -356,7 +356,7 @@ abstract class JdbcDestinationHandler( existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_META]!!.type } - private fun existingSchemaMatchesStreamConfig( + open protected fun existingSchemaMatchesStreamConfig( stream: StreamConfig?, existingTable: TableDefinition ): Boolean { @@ -400,6 +400,29 @@ abstract class JdbcDestinationHandler( return actualColumns == intendedColumns } + protected open fun getDeleteStatesSql( + destinationStates: Map + ): String { + return dslContext + .deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) + .where( + destinationStates.keys + .stream() + .map { streamId: StreamId -> + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)) + .eq(streamId.originalName) + .and( + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) + .eq(streamId.originalNamespace) + ) + } + .reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? -> + obj.or(arg2) + } + ) + .getSQL(ParamType.INLINED) + } + @Throws(Exception::class) override fun commitDestinationStates(destinationStates: Map) { try { @@ -408,25 +431,7 @@ abstract class JdbcDestinationHandler( } // Delete all state records where the stream name+namespace match one of our states - val deleteStates = - dslContext - .deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))) - .where( - destinationStates.keys - .stream() - .map { streamId: StreamId -> - field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)) - .eq(streamId.originalName) - .and( - field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)) - .eq(streamId.originalNamespace) - ) - } - .reduce(DSL.falseCondition()) { obj: Condition, arg2: Condition? -> - obj.or(arg2) - } - ) - .getSQL(ParamType.INLINED) + var deleteStates = getDeleteStatesSql(destinationStates) // Reinsert all of our states var insertStatesStep = @@ -461,12 +466,17 @@ abstract class JdbcDestinationHandler( } val insertStates = insertStatesStep.getSQL(ParamType.INLINED) - jdbcDatabase.executeWithinTransaction(listOf(deleteStates, insertStates)) + executeWithinTransaction(listOf(deleteStates, insertStates)) } catch (e: Exception) { LOGGER.warn("Failed to commit destination states", e) } } + @Throws(Exception::class) + protected open fun executeWithinTransaction(statements: List) { + jdbcDatabase.executeWithinTransaction(statements) + } + /** * Convert to the TYPE_NAME retrieved from [java.sql.DatabaseMetaData.getColumns] * @@ -479,9 +489,9 @@ abstract class JdbcDestinationHandler( companion object { private val LOGGER: Logger = LoggerFactory.getLogger(JdbcDestinationHandler::class.java) - private const val DESTINATION_STATE_TABLE_NAME = "_airbyte_destination_state" - private const val DESTINATION_STATE_TABLE_COLUMN_NAME = "name" - private const val DESTINATION_STATE_TABLE_COLUMN_NAMESPACE = "namespace" + protected const val DESTINATION_STATE_TABLE_NAME = "_airbyte_destination_state" + protected const val DESTINATION_STATE_TABLE_COLUMN_NAME = "name" + protected const val DESTINATION_STATE_TABLE_COLUMN_NAMESPACE = "namespace" private const val DESTINATION_STATE_TABLE_COLUMN_STATE = "destination_state" private const val DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT = "updated_at" @@ -542,6 +552,7 @@ abstract class JdbcDestinationHandler( return Optional.of(TableDefinition(retrievedColumnDefns)) } + @JvmStatic fun fromIsNullableIsoString(isNullable: String?): Boolean { return "YES".equals(isNullable, ignoreCase = true) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/LocalAirbyteDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/LocalAirbyteDestination.kt index b63e1250ea57..e8b4eefc892b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/LocalAirbyteDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/LocalAirbyteDestination.kt @@ -61,7 +61,7 @@ class LocalAirbyteDestination(private val dest: Destination) : AirbyteDestinatio return isClosed } - override val exitValue = 0 + override var exitValue = 0 override fun attemptRead(): Optional { return Optional.empty() diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/AirbyteDestination.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/AirbyteDestination.kt index 45ded9d865b8..aecc460d131a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/AirbyteDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/AirbyteDestination.kt @@ -72,7 +72,7 @@ interface AirbyteDestination : CheckedConsumer, AutoC * @return exit code of the destination process * @throws IllegalStateException if the destination process has not exited */ - abstract val exitValue: Int + val exitValue: Int /** * Attempts to read an AirbyteMessage from the Destination. diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt index 1882d807eada..52da4e7b1bf4 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt @@ -171,7 +171,7 @@ abstract class BaseDestinationV1V2Migrator : Destination * @return whether it exists and is in the correct format */ @Throws(Exception::class) - protected fun doesValidV1RawTableExist(namespace: String?, tableName: String?): Boolean { + protected open fun doesValidV1RawTableExist(namespace: String?, tableName: String?): Boolean { val existingV1RawTable = getTableIfExists(namespace, tableName) return existingV1RawTable.isPresent && doesV1RawTableMatchExpectedSchema(existingV1RawTable.get()) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 84050406c793..33ff1ae5e982 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -80,7 +80,7 @@ abstract class BaseSqlGeneratorIntegrationTest + open protected val finalMetadataColumnNames: Map /** Identical to [BaseTypingDedupingTest.getFinalMetadataColumnNames]. */ get() = HashMap() @@ -728,7 +728,7 @@ abstract class BaseSqlGeneratorIntegrationTest, v2RawRecords: List) { + protected open fun migrationAssertions( + v1RawRecords: List, + v2RawRecords: List + ) { val v2RecordMap = v2RawRecords .stream() @@ -1570,7 +1573,7 @@ abstract class BaseSqlGeneratorIntegrationTest { + open protected fun dumpV1RawTableRecords(streamId: StreamId): List { return dumpRawTableRecords(streamId) } diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index b05e26d234e0..ec37781bb325 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.27.7' + cdkVersionRequired = '0.31.7' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/gradle.properties b/airbyte-integrations/connectors/destination-snowflake/gradle.properties index 8d4afe7f29ca..2fa5a4a844ef 100644 --- a/airbyte-integrations/connectors/destination-snowflake/gradle.properties +++ b/airbyte-integrations/connectors/destination-snowflake/gradle.properties @@ -1,4 +1,4 @@ # currently limit the number of parallel threads until further investigation into the issues \ # where Snowflake will fail to login using config credentials -testExecutionConcurrency=4 +testExecutionConcurrency=-1 JunitMethodExecutionTimeout=15 m diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 2bd24d353aa5..f2db0400ba58 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.7.0 + dockerImageTag: 3.7.1 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index 8a2745fa29ea..2c993174e63b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -197,7 +197,7 @@ private static String getAccessTokenUsingRefreshToken(final String hostName, } public static JdbcDatabase getDatabase(final DataSource dataSource) { - return new DefaultJdbcDatabase(dataSource); + return new DefaultJdbcDatabase(dataSource, new SnowflakeSourceOperations()); } private static Runnable getRefreshTokenTask(final HikariDataSource dataSource) { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java index 9d6460dcb668..22ce625434ec 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationRunner.java @@ -7,12 +7,26 @@ import static io.airbyte.integrations.destination.snowflake.SnowflakeDestination.SCHEDULED_EXECUTOR_SERVICE; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; +import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner; +import net.snowflake.client.core.SFSession; +import net.snowflake.client.core.SFStatement; import net.snowflake.client.jdbc.SnowflakeSQLException; public class SnowflakeDestinationRunner { public static void main(final String[] args) throws Exception { + IntegrationRunner.addOrphanedThreadFilter((Thread t) -> { + for (StackTraceElement stackTraceElement : IntegrationRunner.getThreadCreationInfo(t).getStack()) { + String stackClassName = stackTraceElement.getClassName(); + String stackMethodName = stackTraceElement.getMethodName(); + if (SFStatement.class.getCanonicalName().equals(stackClassName) && "close".equals(stackMethodName) || + SFSession.class.getCanonicalName().equals(stackClassName) && "callHeartBeatWithQueryTimeout".equals(stackMethodName)) { + return false; + } + } + return true; + }); AirbyteExceptionHandler.addThrowableForDeinterpolation(SnowflakeSQLException.class); AdaptiveDestinationRunner.baseOnEnv() .withOssDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS)) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index ff7acac1da01..e1813942c346 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -10,6 +10,7 @@ import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; @@ -132,7 +133,7 @@ public JsonNode toJdbcConfig(final JsonNode config) { } @Override - protected JdbcSqlGenerator getSqlGenerator() { + protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) { throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface"); } @@ -209,7 +210,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN typerDeduper, parsedCatalog, defaultNamespace, - true) + DestinationColumns.V2_WITHOUT_META) .setBufferMemoryLimit(Optional.of(getSnowflakeBufferMemoryLimit())) .setOptimalBatchSizeBytes( // The per stream size limit is following recommendations from: diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java similarity index 54% rename from airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java rename to airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java index c25bcb6709d7..cd549d90a9eb 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestSourceOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSourceOperations.java @@ -8,12 +8,26 @@ import static io.airbyte.cdk.db.jdbc.DateTimeConverter.putJavaSQLTime; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.cdk.db.DataTypeUtils; import io.airbyte.cdk.db.jdbc.JdbcSourceOperations; import io.airbyte.commons.json.Jsons; import java.sql.ResultSet; import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; -public class SnowflakeTestSourceOperations extends JdbcSourceOperations { +public class SnowflakeSourceOperations extends JdbcSourceOperations { + + private static final DateTimeFormatter SNOWFLAKE_TIMESTAMPTZ_FORMATTER = new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral(' ') + .append(DateTimeFormatter.ISO_LOCAL_TIME) + .optionalStart() + .appendLiteral(' ') + .append(DateTimeFormatter.ofPattern("XX")) + .toFormatter(); @Override public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { @@ -45,4 +59,18 @@ protected void putTime(final ObjectNode node, putJavaSQLTime(node, columnName, resultSet, index); } + @Override + protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) + throws SQLException { + final String timestampAsString = resultSet.getString(index); + OffsetDateTime timestampWithOffset = OffsetDateTime.parse(timestampAsString, SNOWFLAKE_TIMESTAMPTZ_FORMATTER); + node.put(columnName, timestampWithOffset.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER)); + } + + protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + // for backward compatibility + var instant = resultSet.getTimestamp(index).toInstant(); + node.put(columnName, DataTypeUtils.toISO8601StringWithMicroseconds(instant)); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index bf709f6f8904..da344de04e8b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -8,7 +8,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.DestinationConfig; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils; @@ -37,10 +37,10 @@ public class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOper @Override public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception { try { - if (!schemaSet.contains(schemaName) && !isSchemaExists(database, schemaName)) { + if (!getSchemaSet().contains(schemaName) && !isSchemaExists(database, schemaName)) { // 1s1t is assuming a lowercase airbyte_internal schema name, so we need to quote it database.execute(String.format("CREATE SCHEMA IF NOT EXISTS \"%s\";", schemaName)); - schemaSet.add(schemaName); + getSchemaSet().add(schemaName); } } catch (final Exception e) { throw checkForKnownConfigExceptions(e).orElseThrow(() -> e); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlStagingOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlStagingOperations.java index 8d4a42e6de85..116d93de95ef 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlStagingOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlStagingOperations.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.snowflake; import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns; import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer; import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer; import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; @@ -18,6 +19,7 @@ public abstract class SnowflakeSqlStagingOperations extends SnowflakeSqlOperatio /** * This method is used in Check connection method to make sure that user has the Write permission */ + @SuppressWarnings("deprecation") protected void attemptWriteToStage(final String outputSchema, final String stageName, final JdbcDatabase database) @@ -25,7 +27,7 @@ protected void attemptWriteToStage(final String outputSchema, final CsvSerializedBuffer csvSerializedBuffer = new CsvSerializedBuffer( new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX), - new StagingDatabaseCsvSheetGenerator(true), + new StagingDatabaseCsvSheetGenerator(DestinationColumns.V2_WITHOUT_META), true); // create a dummy stream\records that will bed used to test uploading diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 61b500ffccdf..7db614b51716 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -42,6 +42,7 @@ import java.util.UUID; import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeSQLException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.jooq.SQLDialect; import org.slf4j.Logger; @@ -70,8 +71,8 @@ public static LinkedHashMap> find final LinkedHashMap> existingTables = new LinkedHashMap<>(); final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); // convert list stream to array - final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); - final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); + final String[] namespaces = streamIds.stream().map(StreamId::getFinalNamespace).toArray(String[]::new); + final String[] names = streamIds.stream().map(StreamId::getFinalName).toArray(String[]::new); final String query = """ SELECT table_schema, table_name, column_name, data_type, is_nullable FROM information_schema.columns @@ -103,8 +104,8 @@ private LinkedHashMap> getFinalTableRowCo final LinkedHashMap> tableRowCounts = new LinkedHashMap<>(); final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); // convert list stream to array - final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); - final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); + final String[] namespaces = streamIds.stream().map(StreamId::getFinalNamespace).toArray(String[]::new); + final String[] names = streamIds.stream().map(StreamId::getFinalName).toArray(String[]::new); final String query = """ SELECT table_schema, table_name, row_count FROM information_schema.tables @@ -133,8 +134,8 @@ private InitialRawTableStatus getInitialRawTableState(final StreamId id, final D } final ResultSet tables = database.getMetaData().getTables( databaseName, - id.rawNamespace(), - id.rawName(), + id.getRawNamespace(), + id.getRawName(), null); if (!tables.next()) { return new InitialRawTableStatus(false, false, Optional.empty()); @@ -227,25 +228,26 @@ public void execute(final Sql sql) throws Exception { } private Set getPks(final StreamConfig stream) { - return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); + return stream.getPrimaryKey() != null ? stream.getPrimaryKey().stream().map(ColumnId::getName).collect(Collectors.toSet()) + : Collections.emptySet(); } private boolean isAirbyteRawIdColumnMatch(final TableDefinition existingTable) { final String abRawIdColumnName = COLUMN_NAME_AB_RAW_ID.toUpperCase(); return existingTable.columns().containsKey(abRawIdColumnName) && - toJdbcTypeName(AirbyteProtocolType.STRING).equals(existingTable.columns().get(abRawIdColumnName).type()); + toJdbcTypeName(AirbyteProtocolType.STRING).equals(existingTable.columns().get(abRawIdColumnName).getType()); } private boolean isAirbyteExtractedAtColumnMatch(final TableDefinition existingTable) { final String abExtractedAtColumnName = COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase(); return existingTable.columns().containsKey(abExtractedAtColumnName) && - toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(existingTable.columns().get(abExtractedAtColumnName).type()); + toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(existingTable.columns().get(abExtractedAtColumnName).getType()); } private boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) { final String abMetaColumnName = COLUMN_NAME_AB_META.toUpperCase(); return existingTable.columns().containsKey(abMetaColumnName) && - "VARIANT".equals(existingTable.columns().get(abMetaColumnName).type()); + "VARIANT".equals(existingTable.columns().get(abMetaColumnName).getType()); } protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { @@ -259,9 +261,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f // Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset return false; } - final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() + final LinkedHashMap intendedColumns = stream.getColumns().entrySet().stream() .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())), + (map, column) -> map.put(column.getKey().getName(), toJdbcTypeName(column.getValue())), LinkedHashMap::putAll); // Filter out Meta columns since they don't exist in stream config. @@ -269,7 +271,7 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f .filter(column -> V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase) .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey(), column.getValue().type()), + (map, column) -> map.put(column.getKey(), column.getValue().getType()), LinkedHashMap::putAll); // soft-resetting https://github.com/airbytehq/airbyte/pull/31082 @SuppressWarnings("deprecation") @@ -285,13 +287,13 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f public List> gatherInitialState(List streamConfigs) throws Exception { final Map destinationStates = super.getAllDestinationStates(); - List streamIds = streamConfigs.stream().map(StreamConfig::id).toList(); + List streamIds = streamConfigs.stream().map(StreamConfig::getId).toList(); final LinkedHashMap> existingTables = findExistingTables(database, databaseName, streamIds); final LinkedHashMap> tableRowCounts = getFinalTableRowCount(streamIds); return streamConfigs.stream().map(streamConfig -> { try { - final String namespace = streamConfig.id().finalNamespace().toUpperCase(); - final String name = streamConfig.id().finalName().toUpperCase(); + final String namespace = streamConfig.getId().getFinalNamespace().toUpperCase(); + final String name = streamConfig.getId().getFinalName().toUpperCase(); boolean isSchemaMismatch = false; boolean isFinalTableEmpty = true; boolean isFinalTablePresent = existingTables.containsKey(namespace) && existingTables.get(namespace).containsKey(name); @@ -301,8 +303,9 @@ public List> gatherInitialState(List( streamConfig, isFinalTablePresent, @@ -353,4 +356,30 @@ private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { }; } + protected String getDeleteStatesSql(Map destinationStates) { + // only doing the DELETE where there's rows to delete allows us to avoid taking a lock on the table + // when there's nothing to delete + // This is particularly relevant in the context of tests, where many instance of the snowflake + // destination could be run in parallel + String deleteStatesSql = super.getDeleteStatesSql(destinationStates); + StringBuilder sql = new StringBuilder(); + // sql.append("BEGIN\n"); + sql.append(" IF (EXISTS (").append(deleteStatesSql.replace("delete from", "SELECT 1 FROM ")).append(")) THEN\n"); + sql.append(" ").append(deleteStatesSql).append(";\n"); + sql.append(" END IF\n"); + // sql.append("END;\n"); + return sql.toString(); + } + + protected void executeWithinTransaction(List statements) throws SQLException { + StringBuilder sb = new StringBuilder(); + sb.append("BEGIN\n"); + sb.append(" BEGIN TRANSACTION;\n "); + sb.append(StringUtils.join(statements, ";\n ")); + sb.append(";\n COMMIT;\n"); + sb.append("END;"); + LOGGER.info("executing SQL:" + sb); + getJdbcDatabase().execute(sb.toString()); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 28198233a948..c444fc4db639 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -117,13 +117,13 @@ public Sql createSchema(final String schema) { @Override public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) { - final String columnDeclarations = stream.columns().entrySet().stream() + final String columnDeclarations = stream.getColumns().entrySet().stream() .map(column -> "," + column.getKey().name(QUOTE) + " " + toDialectType(column.getValue())) .collect(joining("\n")); final String forceCreateTable = force ? "OR REPLACE" : ""; return Sql.of(new StringSubstitutor(Map.of( - "final_table_id", stream.id().finalTableId(QUOTE, suffix.toUpperCase()), + "final_table_id", stream.getId().finalTableId(QUOTE, suffix.toUpperCase()), "force_create_table", forceCreateTable, "column_declarations", columnDeclarations, "retention_period_days", retentionPeriodDays)).replace( @@ -142,20 +142,20 @@ public Sql updateTable(final StreamConfig stream, final String finalSuffix, final Optional minRawTimestamp, final boolean useExpensiveSaferCasting) { - final String insertNewRecords = insertNewRecords(stream, finalSuffix, stream.columns(), minRawTimestamp, useExpensiveSaferCasting); + final String insertNewRecords = insertNewRecords(stream, finalSuffix, stream.getColumns(), minRawTimestamp, useExpensiveSaferCasting); String dedupFinalTable = ""; String cdcDeletes = ""; - if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { - dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor()); + if (stream.getDestinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { + dedupFinalTable = dedupFinalTable(stream.getId(), finalSuffix, stream.getPrimaryKey(), stream.getCursor()); cdcDeletes = cdcDeletes(stream, finalSuffix); } - final String commitRawTable = commitRawTable(stream.id()); + final String commitRawTable = commitRawTable(stream.getId()); return transactionally(insertNewRecords, dedupFinalTable, cdcDeletes, commitRawTable); } private String extractAndCast(final ColumnId column, final AirbyteType airbyteType, final boolean useTryCast) { - return cast("\"_airbyte_data\":\"" + escapeJsonIdentifier(column.originalName()) + "\"", airbyteType, useTryCast); + return cast("\"_airbyte_data\":\"" + escapeJsonIdentifier(column.getOriginalName()) + "\"", airbyteType, useTryCast); } private String cast(final String sqlExpression, final AirbyteType airbyteType, final boolean useTryCast) { @@ -259,7 +259,7 @@ String insertNewRecords(final StreamConfig stream, final String extractNewRawRecords = extractNewRawRecords(stream, minRawTimestamp, useTryCast); return new StringSubstitutor(Map.of( - "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix.toUpperCase()), + "final_table_id", stream.getId().finalTableId(QUOTE, finalSuffix.toUpperCase()), "column_list", columnList, "extractNewRawRecords", extractNewRawRecords)).replace( """ @@ -274,13 +274,13 @@ String insertNewRecords(final StreamConfig stream, } private String extractNewRawRecords(final StreamConfig stream, final Optional minRawTimestamp, final boolean useTryCast) { - final String columnCasts = stream.columns().entrySet().stream().map( + final String columnCasts = stream.getColumns().entrySet().stream().map( col -> extractAndCast(col.getKey(), col.getValue(), useTryCast) + " as " + col.getKey().name(QUOTE) + ",") .collect(joining("\n")); - final String columnErrors = stream.columns().entrySet().stream().map( + final String columnErrors = stream.getColumns().entrySet().stream().map( col -> new StringSubstitutor(Map.of( - "raw_col_name", escapeJsonIdentifier(col.getKey().originalName()), - "printable_col_name", escapeSingleQuotedString(col.getKey().originalName()), + "raw_col_name", escapeJsonIdentifier(col.getKey().getOriginalName()), + "printable_col_name", escapeSingleQuotedString(col.getKey().getOriginalName()), "col_type", toDialectType(col.getValue()), "json_extract", extractAndCast(col.getKey(), col.getValue(), useTryCast))).replace( // TYPEOF returns "NULL_VALUE" for a JSON null and "NULL" for a SQL null @@ -292,12 +292,12 @@ private String extractNewRawRecords(final StreamConfig stream, final Optional quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); + final String columnList = stream.getColumns().keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); final String extractedAtCondition = buildExtractedAtCondition(minRawTimestamp); - if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { + if (stream.getDestinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { String cdcConditionalOrIncludeStatement = ""; - if (stream.columns().containsKey(CDC_DELETED_AT_COLUMN)) { + if (stream.getColumns().containsKey(CDC_DELETED_AT_COLUMN)) { cdcConditionalOrIncludeStatement = """ OR ( "_airbyte_loaded_at" IS NOT NULL @@ -306,13 +306,13 @@ AND TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE') """; } - final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); - final String cursorOrderClause = stream.cursor() + final String pkList = stream.getPrimaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); + final String cursorOrderClause = stream.getCursor() .map(cursorId -> cursorId.name(QUOTE) + " DESC NULLS LAST,") .orElse(""); return new StringSubstitutor(Map.of( - "raw_table_id", stream.id().rawTableId(QUOTE), + "raw_table_id", stream.getId().rawTableId(QUOTE), "column_casts", columnCasts, "column_errors", columnErrors, "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, @@ -351,7 +351,7 @@ WITH intermediate_data AS ( WHERE row_number = 1"""); } else { return new StringSubstitutor(Map.of( - "raw_table_id", stream.id().rawTableId(QUOTE), + "raw_table_id", stream.getId().rawTableId(QUOTE), "column_casts", columnCasts, "column_errors", columnErrors, "extractedAtCondition", extractedAtCondition, @@ -413,17 +413,17 @@ String dedupFinalTable(final StreamId id, } private String cdcDeletes(final StreamConfig stream, final String finalSuffix) { - if (stream.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) { + if (stream.getDestinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) { return ""; } - if (!stream.columns().containsKey(CDC_DELETED_AT_COLUMN)) { + if (!stream.getColumns().containsKey(CDC_DELETED_AT_COLUMN)) { return ""; } // we want to grab IDs for deletion from the raw table (not the final table itself) to hand // out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( - "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix.toUpperCase()))).replace( + "final_table_id", stream.getId().finalTableId(QUOTE, finalSuffix.toUpperCase()))).replace( """ DELETE FROM ${final_table_id} WHERE _AB_CDC_DELETED_AT IS NOT NULL; @@ -455,7 +455,7 @@ public Sql overwriteFinalTable(final StreamId stream, final String finalSuffix) public Sql prepareTablesForSoftReset(final StreamConfig stream) { return concat( createTable(stream, SOFT_RESET_SUFFIX.toUpperCase(), true), - clearLoadedAt(stream.id())); + clearLoadedAt(stream.getId())); } @Override diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java index 3226afa58337..8e1523270311 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java @@ -37,7 +37,7 @@ public SnowflakeV1V2Migrator(final NamingConventionTransformer namingConventionT @SneakyThrows @Override - protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) throws Exception { + public boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) throws Exception { return !database .queryJsons( """ @@ -46,19 +46,19 @@ protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamCon WHERE schema_name = ? AND catalog_name = ?; """, - streamConfig.id().rawNamespace(), + streamConfig.getId().getRawNamespace(), databaseName) .isEmpty(); } @Override - protected boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection columns) { + public boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection columns) { return CollectionUtils.containsAllIgnoreCase(existingTable.columns().keySet(), columns); } @SneakyThrows @Override - protected Optional getTableIfExists(final String namespace, final String tableName) throws Exception { + public Optional getTableIfExists(final String namespace, final String tableName) throws Exception { // TODO this looks similar to SnowflakeDestinationHandler#findExistingTables, with a twist; // databaseName not upper-cased and rawNamespace and rawTableName as-is (no uppercase). // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates @@ -90,12 +90,12 @@ protected Optional getTableIfExists(final String namespace, fin } @Override - protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) { + public NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) { // The implicit upper-casing happens for this in the SqlGenerator @SuppressWarnings("deprecation") - String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.id().originalName()); + String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.getId().getOriginalName()); return new NamespacedTableName( - this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()), + this.namingConventionTransformer.getIdentifier(streamConfig.getId().getOriginalNamespace()), tableName); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java index eef75f86c7bf..757b4788b7ba 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java @@ -46,24 +46,24 @@ public SnowflakeV2TableMigrator(final JdbcDatabase database, @Override public void migrateIfNecessary(final StreamConfig streamConfig) throws Exception { final StreamId caseSensitiveStreamId = buildStreamId_caseSensitive( - streamConfig.id().originalNamespace(), - streamConfig.id().originalName(), + streamConfig.getId().getOriginalNamespace(), + streamConfig.getId().getOriginalName(), rawNamespace); - final boolean syncModeRequiresMigration = streamConfig.destinationSyncMode() != DestinationSyncMode.OVERWRITE; + final boolean syncModeRequiresMigration = streamConfig.getDestinationSyncMode() != DestinationSyncMode.OVERWRITE; final boolean existingTableCaseSensitiveExists = findExistingTable(caseSensitiveStreamId).isPresent(); - final boolean existingTableUppercaseDoesNotExist = findExistingTable(streamConfig.id()).isEmpty(); + final boolean existingTableUppercaseDoesNotExist = findExistingTable(streamConfig.getId()).isEmpty(); LOGGER.info( "Checking whether upcasing migration is necessary for {}.{}. Sync mode requires migration: {}; existing case-sensitive table exists: {}; existing uppercased table does not exist: {}", - streamConfig.id().originalNamespace(), - streamConfig.id().originalName(), + streamConfig.getId().getOriginalNamespace(), + streamConfig.getId().getOriginalName(), syncModeRequiresMigration, existingTableCaseSensitiveExists, existingTableUppercaseDoesNotExist); if (syncModeRequiresMigration && existingTableCaseSensitiveExists && existingTableUppercaseDoesNotExist) { LOGGER.info( "Executing upcasing migration for {}.{}", - streamConfig.id().originalNamespace(), - streamConfig.id().originalName()); + streamConfig.getId().getOriginalNamespace(), + streamConfig.getId().getOriginalName()); TypeAndDedupeTransaction.executeSoftReset(generator, handler, streamConfig); } } @@ -94,8 +94,8 @@ private Optional findExistingTable(final StreamId id) throws SQ // VARIANT as VARCHAR LinkedHashMap> existingTableMap = SnowflakeDestinationHandler.findExistingTables(database, databaseName, List.of(id)); - if (existingTableMap.containsKey(id.finalNamespace()) && existingTableMap.get(id.finalNamespace()).containsKey(id.finalName())) { - return Optional.of(existingTableMap.get(id.finalNamespace()).get(id.finalName())); + if (existingTableMap.containsKey(id.getFinalNamespace()) && existingTableMap.get(id.getFinalNamespace()).containsKey(id.getFinalName())) { + return Optional.of(existingTableMap.get(id.getFinalNamespace()).get(id.getFinalName())); } return Optional.empty(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java index cd105b663b79..437b96c84d6a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java @@ -123,7 +123,7 @@ protected List retrieveRecords(final TestDestinationEnv env, final JsonNode streamSchema) throws Exception { final StreamId streamId = new SnowflakeSqlGenerator(0).buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); - return retrieveRecordsFromTable(streamId.rawName(), streamId.rawNamespace()) + return retrieveRecordsFromTable(streamId.getRawName(), streamId.getRawNamespace()) .stream() .map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA)) .collect(Collectors.toList()); @@ -170,7 +170,7 @@ private List retrieveRecordsFromTable(final String tableName, final St JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)); } }, - new SnowflakeTestSourceOperations()::rowToJson); + new SnowflakeSourceOperations()::rowToJson); } // for each test we create a new schema in the database. run the test in there and then remove it. @@ -190,8 +190,8 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet TES @Override protected void tearDown(final TestDestinationEnv testEnv) throws Exception { - TEST_SCHEMAS.add(config.get("schema").asText()); - for (final String schema : TEST_SCHEMAS) { + getTestSchemas().add(config.get("schema").asText()); + for (final String schema : getTestSchemas()) { // we need to wrap namespaces in quotes, but that means we have to manually upcase them. // thanks, v1 destinations! // this probably doesn't actually work, because v1 destinations are mangling namespaces and names diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java index 89039af021ee..48d7cebeeac2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java @@ -85,7 +85,7 @@ public static List dumpTable(final List columns, """ SELECT ${columns} FROM ${table} ORDER BY ${extracted_at} ASC """)), - new SnowflakeTestSourceOperations()::rowToJson); + new SnowflakeSourceOperations()::rowToJson); } private static String quote(final String name) { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java index 3bac3538f5bc..b49ea1a5cf1a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java @@ -29,6 +29,7 @@ import io.airbyte.protocol.models.v0.SyncMode; import io.airbyte.workers.exception.TestHarnessException; import java.nio.file.Path; +import java.sql.SQLException; import java.util.List; import java.util.Map; import javax.sql.DataSource; @@ -46,6 +47,19 @@ public abstract class AbstractSnowflakeTypingDedupingTest extends BaseTypingDedu private JdbcDatabase database; private DataSource dataSource; + private static volatile boolean cleanedAirbyteInternalTable = false; + + private static void cleanAirbyteInternalTable(JdbcDatabase database) throws SQLException { + if (!cleanedAirbyteInternalTable) { + synchronized (AbstractSnowflakeTypingDedupingTest.class) { + if (!cleanedAirbyteInternalTable) { + database.execute("DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"updated_at\" < current_date() - 7"); + cleanedAirbyteInternalTable = true; + } + } + } + } + protected abstract String getConfigPath(); @Override @@ -54,12 +68,13 @@ protected String getImageName() { } @Override - protected JsonNode generateConfig() { + protected JsonNode generateConfig() throws SQLException { final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of(getConfigPath()))); ((ObjectNode) config).put("schema", "typing_deduping_default_schema" + getUniqueSuffix()); databaseName = config.get(JdbcUtils.DATABASE_KEY).asText(); dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS); database = SnowflakeDatabase.getDatabase(dataSource); + cleanAirbyteInternalTable(database); return config; } @@ -77,7 +92,7 @@ protected List dumpRawTableRecords(String streamNamespace, final Strin } @Override - protected List dumpFinalTableRecords(String streamNamespace, final String streamName) throws Exception { + public List dumpFinalTableRecords(String streamNamespace, final String streamName) throws Exception { if (streamNamespace == null) { streamNamespace = getDefaultSchema(); } @@ -99,9 +114,6 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s // Raw table is still lowercase. StreamId.concatenateRawTableName(streamNamespace, streamName), streamNamespace.toUpperCase())); - database.execute( - String.format("DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"name\"='%s' AND \"namespace\"='%s'", streamName, - streamNamespace)); } @Override @@ -115,7 +127,7 @@ protected SqlGenerator getSqlGenerator() { } @Override - protected Map getFinalMetadataColumnNames() { + public Map getFinalMetadataColumnNames() { return FINAL_METADATA_COLUMN_NAMES; } @@ -138,8 +150,8 @@ public void testFinalTableUppercasingMigration_append() throws Exception { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) .withJsonSchema(SCHEMA)))); // First sync @@ -159,7 +171,7 @@ public void testFinalTableUppercasingMigration_append() throws Exception { // manually drop the lowercased schema, since we no longer have the code to do it automatically // (the raw table is still in lowercase "airbyte_internal"."whatever", so the auto-cleanup code // handles it fine) - database.execute("DROP SCHEMA IF EXISTS \"" + streamNamespace + "\" CASCADE"); + database.execute("DROP SCHEMA IF EXISTS \"" + getStreamNamespace() + "\" CASCADE"); } } @@ -171,8 +183,8 @@ public void testFinalTableUppercasingMigration_overwrite() throws Exception { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) .withJsonSchema(SCHEMA)))); // First sync @@ -192,7 +204,7 @@ public void testFinalTableUppercasingMigration_overwrite() throws Exception { // manually drop the lowercased schema, since we no longer have the code to do it automatically // (the raw table is still in lowercase "airbyte_internal"."whatever", so the auto-cleanup code // handles it fine) - database.execute("DROP SCHEMA IF EXISTS \"" + streamNamespace + "\" CASCADE"); + database.execute("DROP SCHEMA IF EXISTS \"" + getStreamNamespace() + "\" CASCADE"); } } @@ -204,8 +216,8 @@ public void testRemovingPKNonNullIndexes() throws Exception { .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) .withJsonSchema(SCHEMA)))); // First sync @@ -218,7 +230,7 @@ public void testRemovingPKNonNullIndexes() throws Exception { // Second sync runSync(catalog, messages); // does not throw with latest version - assertEquals(1, dumpFinalTableRecords(streamNamespace, streamName).toArray().length); + assertEquals(1, dumpFinalTableRecords(getStreamNamespace(), getStreamName()).toArray().length); } @Test @@ -230,8 +242,8 @@ public void testExtractedAtUtcTimezoneMigration() throws Exception { .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) .withCursorField(List.of("updated_at")) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) + .withNamespace(getStreamNamespace()) + .withName(getStreamName()) .withJsonSchema(SCHEMA)))); // First sync diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeInternalStagingLowercaseDatabaseTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeInternalStagingLowercaseDatabaseTypingDedupingTest.java index 4411df398774..3cd2df85be98 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeInternalStagingLowercaseDatabaseTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeInternalStagingLowercaseDatabaseTypingDedupingTest.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.cdk.db.jdbc.JdbcUtils; +import java.sql.SQLException; public class SnowflakeInternalStagingLowercaseDatabaseTypingDedupingTest extends AbstractSnowflakeTypingDedupingTest { @@ -21,7 +22,7 @@ protected String getConfigPath() { * when checking for an existing final table. */ @Override - protected JsonNode generateConfig() { + protected JsonNode generateConfig() throws SQLException { final JsonNode config = super.generateConfig(); ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, config.get(JdbcUtils.DATABASE_KEY).asText().toLowerCase()); return config; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index 53c725094c08..3dee7ff6b779 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -28,7 +28,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction; import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts; import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase; -import io.airbyte.integrations.destination.snowflake.SnowflakeTestSourceOperations; +import io.airbyte.integrations.destination.snowflake.SnowflakeSourceOperations; import io.airbyte.integrations.destination.snowflake.SnowflakeTestUtils; import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState; import java.nio.file.Path; @@ -74,7 +74,7 @@ protected SnowflakeSqlGenerator getSqlGenerator() { @Override protected SnowflakeDestinationHandler getDestinationHandler() { - return new SnowflakeDestinationHandler(databaseName, database, namespace.toUpperCase()); + return new SnowflakeDestinationHandler(databaseName, database, getNamespace().toUpperCase()); } @Override @@ -111,8 +111,8 @@ protected List dumpFinalTableRecords(final StreamId streamId, final St return SnowflakeTestUtils.dumpFinalTable( database, databaseName, - streamId.finalNamespace(), - streamId.finalName() + suffix.toUpperCase()); + streamId.getFinalNamespace(), + streamId.getFinalName() + suffix.toUpperCase()); } @Override @@ -124,7 +124,7 @@ protected void teardownNamespace(final String namespace) throws SQLException { protected void insertFinalTableRecords(final boolean includeCdcDeletedAt, final StreamId streamId, final String suffix, - final List records) + final List records) throws Exception { final List columnNames = includeCdcDeletedAt ? FINAL_TABLE_COLUMN_NAMES_CDC : FINAL_TABLE_COLUMN_NAMES; final String cdcDeletedAtName = includeCdcDeletedAt ? ",\"_AB_CDC_DELETED_AT\"" : ""; @@ -205,7 +205,7 @@ private String dollarQuoteWrap(final JsonNode node) { } @Override - protected void insertRawTableRecords(final StreamId streamId, final List records) throws Exception { + protected void insertRawTableRecords(final StreamId streamId, final List records) throws Exception { final String recordsText = records.stream() // For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)" .map(record -> JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES @@ -248,13 +248,13 @@ protected Map getFinalMetadataColumnNames() { @Override @Test public void testCreateTableIncremental() throws Exception { - final Sql sql = generator.createTable(incrementalDedupStream, "", false); - destinationHandler.execute(sql); + final Sql sql = getGenerator().createTable(getIncrementalDedupStream(), "", false); + getDestinationHandler().execute(sql); // Note that USERS_FINAL is uppercased here. This is intentional, because snowflake upcases unquoted // identifiers. final Optional tableKind = - database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "USERS_FINAL", namespace.toUpperCase())) + database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "USERS_FINAL", getNamespace().toUpperCase())) .stream().map(record -> record.get("kind").asText()) .findFirst(); final Map columns = database.queryJsons( @@ -267,7 +267,7 @@ public void testCreateTableIncremental() throws Exception { ORDER BY ordinal_position; """, databaseName, - namespace.toUpperCase(), + getNamespace().toUpperCase(), "USERS_FINAL").stream() .collect(toMap( record -> record.get("COLUMN_NAME").asText(), @@ -316,16 +316,16 @@ protected void createV1RawTable(final StreamId v1RawTable) throws Exception { %s TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp() ) data_retention_time_in_days = 0; """, - v1RawTable.rawNamespace(), - v1RawTable.rawNamespace(), - v1RawTable.rawName(), + v1RawTable.getRawNamespace(), + v1RawTable.getRawNamespace(), + v1RawTable.getRawName(), JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)); } @Override - protected void insertV1RawTableRecords(final StreamId streamId, final List records) throws Exception { + protected void insertV1RawTableRecords(final StreamId streamId, final List records) throws Exception { final var recordsText = records .stream() .map(record -> JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS @@ -337,7 +337,7 @@ protected void insertV1RawTableRecords(final StreamId streamId, final List "(%s)".formatted(row)) .collect(joining(",")); final var insert = new StringSubstitutor(Map.of( - "v1_raw_table_id", String.join(".", streamId.rawNamespace(), streamId.rawName()), + "v1_raw_table_id", String.join(".", streamId.getRawNamespace(), streamId.getRawName()), "records", recordsText), // Use different delimiters because we're using dollar quotes in the query. "#{", @@ -358,15 +358,15 @@ protected List dumpV1RawTableRecords(final StreamId streamId) throws E JavaBaseConstants.COLUMN_NAME_DATA).collect(joining(",")); return database.bufferedResultSetQuery(connection -> connection.createStatement().executeQuery(new StringSubstitutor(Map.of( "columns", columns, - "table", String.join(".", streamId.rawNamespace(), streamId.rawName()))).replace( + "table", String.join(".", streamId.getRawNamespace(), streamId.getRawName()))).replace( """ SELECT ${columns} FROM ${table} ORDER BY _airbyte_emitted_at ASC """)), - new SnowflakeTestSourceOperations()::rowToJson); + new SnowflakeSourceOperations()::rowToJson); } @Override - protected void migrationAssertions(final List v1RawRecords, final List v2RawRecords) { + protected void migrationAssertions(final List v1RawRecords, final List v2RawRecords) { final var v2RecordMap = v2RawRecords.stream().collect(Collectors.toMap( record -> record.get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).asText(), Function.identity())); @@ -386,7 +386,7 @@ record -> record.get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).asText(), originalData = originalData.isTextual() ? Jsons.deserializeExact(migratedData.asText()) : originalData; // hacky thing because we only care about the data contents. // diffRawTableRecords makes some assumptions about the structure of the blob. - DIFFER.diffFinalTableRecords(List.of(originalData), List.of(migratedData)); + getDIFFER().diffFinalTableRecords(List.of(originalData), List.of(migratedData)); }); } @@ -403,9 +403,9 @@ public void ignoreOldRawRecords() throws Exception { */ @Test public void ensurePKsAreIndexedUnique() throws Exception { - createRawTable(streamId); + createRawTable(getStreamId()); insertRawTableRecords( - streamId, + getStreamId(), List.of(Jsons.deserialize( """ { @@ -418,14 +418,14 @@ public void ensurePKsAreIndexedUnique() throws Exception { } """))); - final Sql createTable = generator.createTable(incrementalDedupStream, "", false); + final Sql createTable = getGenerator().createTable(getIncrementalDedupStream(), "", false); // should be OK with new tables - destinationHandler.execute(createTable); - List> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + getDestinationHandler().execute(createTable); + List> initialStates = getDestinationHandler().gatherInitialState(List.of(getIncrementalDedupStream())); assertEquals(1, initialStates.size()); assertFalse(initialStates.getFirst().isSchemaMismatch()); - destinationHandler.execute(Sql.of("DROP TABLE " + streamId.finalTableId(""))); + getDestinationHandler().execute(Sql.of("DROP TABLE " + getStreamId().finalTableId(""))); // Hack the create query to add NOT NULLs to emulate the old behavior List> createTableModified = createTable.transactions().stream().map(transaction -> transaction.stream() @@ -435,17 +435,17 @@ public void ensurePKsAreIndexedUnique() throws Exception { : line) .collect(joining("\r\n"))) .toList()).toList(); - destinationHandler.execute(new Sql(createTableModified)); - initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + getDestinationHandler().execute(new Sql(createTableModified)); + initialStates = getDestinationHandler().gatherInitialState(List.of(getIncrementalDedupStream())); assertEquals(1, initialStates.size()); assertTrue(initialStates.get(0).isSchemaMismatch()); } @Test public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_dedup() throws Exception { - this.createRawTable(this.streamId); - this.createFinalTable(this.incrementalDedupStream, ""); - this.insertRawTableRecords(this.streamId, List.of( + this.createRawTable(this.getStreamId()); + this.createFinalTable(this.getIncrementalDedupStream(), ""); + this.insertRawTableRecords(this.getStreamId(), List.of( // 2 records written by a sync running on the old version of snowflake Jsons.deserialize(""" { @@ -494,7 +494,7 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_dedup() throws } } """))); - this.insertFinalTableRecords(false, this.streamId, "", List.of( + this.insertFinalTableRecords(false, this.getStreamId(), "", List.of( Jsons.deserialize(""" { "_airbyte_raw_id": "pre-dst local tz 3", @@ -517,8 +517,8 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_dedup() throws """))); // Gather initial state at the start of our updated sync DestinationInitialStatus initialState = - this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalDedupStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // insert raw records with updates Jsons.deserialize(""" { @@ -565,10 +565,10 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_dedup() throws } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, - initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalDedupStream(), + initialState.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -610,14 +610,14 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_dedup() throws "STRING": "Dave01" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); } @Test public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_dedup() throws Exception { - this.createRawTable(this.streamId); - this.createFinalTable(this.incrementalDedupStream, ""); - this.insertRawTableRecords(this.streamId, List.of( + this.createRawTable(this.getStreamId()); + this.createFinalTable(this.getIncrementalDedupStream(), ""); + this.insertRawTableRecords(this.getStreamId(), List.of( // record written by a sync running on the old version of snowflake Jsons.deserialize(""" { @@ -632,8 +632,8 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransitio """))); // Gather initial state at the start of our updated sync DestinationInitialStatus initialState = - this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalDedupStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // update the record twice // this never really happens, but verify that it works Jsons.deserialize(""" @@ -659,10 +659,10 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransitio } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, - initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalDedupStream(), + initialState.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -674,14 +674,14 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransitio "STRING": "Alice02" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); } @Test public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_dedup() throws Exception { - this.createRawTable(this.streamId); - this.createFinalTable(this.incrementalDedupStream, ""); - this.insertRawTableRecords(this.streamId, List.of( + this.createRawTable(this.getStreamId()); + this.createFinalTable(this.getIncrementalDedupStream(), ""); + this.insertRawTableRecords(this.getStreamId(), List.of( // records written by a sync running on the old version of snowflake Jsons.deserialize(""" { @@ -708,8 +708,8 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition // Gather initial state at the start of our first new sync DestinationInitialStatus initialState = - this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalDedupStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // update the records Jsons.deserialize(""" { @@ -734,10 +734,10 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, - initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalDedupStream(), + initialState.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -759,12 +759,12 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition "STRING": "Bob01" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); // Gather initial state at the start of our second new sync DestinationInitialStatus initialState2 = - this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalDedupStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // update the records again Jsons.deserialize(""" { @@ -789,10 +789,10 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, - initialState2.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalDedupStream(), + initialState2.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -814,14 +814,14 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition "STRING": "Bob02" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); } @Test public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_append() throws Exception { - this.createRawTable(this.streamId); - this.createFinalTable(this.incrementalAppendStream, ""); - this.insertRawTableRecords(this.streamId, List.of( + this.createRawTable(this.getStreamId()); + this.createFinalTable(this.getIncrementalAppendStream(), ""); + this.insertRawTableRecords(this.getStreamId(), List.of( // 2 records written by a sync running on the old version of snowflake Jsons.deserialize(""" { @@ -870,7 +870,7 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_append() throw } } """))); - this.insertFinalTableRecords(false, this.streamId, "", List.of( + this.insertFinalTableRecords(false, this.getStreamId(), "", List.of( Jsons.deserialize(""" { "_airbyte_raw_id": "pre-dst local tz 3", @@ -893,8 +893,8 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_append() throw """))); // Gather initial state at the start of our updated sync DestinationInitialStatus initialState = - this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalAppendStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // insert raw records with updates Jsons.deserialize(""" { @@ -941,10 +941,10 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_append() throw } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, - initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalAppendStream(), + initialState.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -1031,14 +1031,14 @@ public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_append() throw "STRING": "Dave01" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); } @Test public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_append() throws Exception { - this.createRawTable(this.streamId); - this.createFinalTable(this.incrementalAppendStream, ""); - this.insertRawTableRecords(this.streamId, List.of( + this.createRawTable(this.getStreamId()); + this.createFinalTable(this.getIncrementalAppendStream(), ""); + this.insertRawTableRecords(this.getStreamId(), List.of( // record written by a sync running on the old version of snowflake Jsons.deserialize(""" { @@ -1053,8 +1053,8 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransitio """))); // Gather initial state at the start of our updated sync DestinationInitialStatus initialState = - this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalAppendStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // update the record twice // this never really happens, but verify that it works Jsons.deserialize(""" @@ -1080,10 +1080,10 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransitio } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, - initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalAppendStream(), + initialState.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -1117,14 +1117,14 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransitio "STRING": "Alice02" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); } @Test public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_append() throws Exception { - this.createRawTable(this.streamId); - this.createFinalTable(this.incrementalAppendStream, ""); - this.insertRawTableRecords(this.streamId, List.of( + this.createRawTable(this.getStreamId()); + this.createFinalTable(this.getIncrementalAppendStream(), ""); + this.insertRawTableRecords(this.getStreamId(), List.of( // records written by a sync running on the old version of snowflake Jsons.deserialize(""" { @@ -1151,8 +1151,8 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition // Gather initial state at the start of our first new sync DestinationInitialStatus initialState = - this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalAppendStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // update the records Jsons.deserialize(""" { @@ -1177,10 +1177,10 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, - initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalAppendStream(), + initialState.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -1224,12 +1224,12 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition "STRING": "Bob01" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); // Gather initial state at the start of our second new sync DestinationInitialStatus initialState2 = - this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); - this.insertRawTableRecords(this.streamId, List.of( + this.getDestinationHandler().gatherInitialState(List.of(this.getIncrementalAppendStream())).getFirst(); + this.insertRawTableRecords(this.getStreamId(), List.of( // update the records again Jsons.deserialize(""" { @@ -1254,10 +1254,10 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition } """))); - TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, - initialState2.initialRawTableStatus().maxProcessedTimestamp(), ""); + TypeAndDedupeTransaction.executeTypeAndDedupe(this.getGenerator(), this.getDestinationHandler(), this.getIncrementalAppendStream(), + initialState2.initialRawTableStatus().getMaxProcessedTimestamp(), ""); - DIFFER.diffFinalTableRecords( + getDIFFER().diffFinalTableRecords( List.of( Jsons.deserialize(""" { @@ -1323,7 +1323,13 @@ public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition "STRING": "Bob02" } """)), - this.dumpFinalTableRecords(this.streamId, "")); + this.dumpFinalTableRecords(this.getStreamId(), "")); + } + + // This is disabled because snowflake doesn't transform long identifiers + @Disabled + public void testLongIdentifierHandling() { + super.testLongIdentifierHandling(); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlNameTransformerTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlNameTransformerTest.java index 9508a7686888..112f7853c103 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlNameTransformerTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlNameTransformerTest.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.snowflake; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import java.util.Map; import org.junit.jupiter.api.Test; @@ -20,8 +19,6 @@ class SnowflakeSqlNameTransformerTest { @Test public void testGetIdentifier() { - assertNull(INSTANCE.getIdentifier(null)); - assertNull(INSTANCE.convertStreamName(null)); RAW_TO_NORMALIZED_IDENTIFIERS.forEach((raw, normalized) -> { assertEquals(normalized, INSTANCE.convertStreamName(raw)); assertEquals(normalized, INSTANCE.getIdentifier(raw)); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java index 39756ec68f1b..07eeb0289feb 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java @@ -14,7 +14,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.DestinationConfig; import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import java.sql.SQLException; diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index eb911490e62f..6252837c3c01 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -276,6 +276,7 @@ desired namespace. | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.7.1 | 2024-04-30 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | Bump CDK version | | 3.7.0 | 2024-04-08 | [\#35754](https://github.com/airbytehq/airbyte/pull/35754) | Allow configuring `data_retention_time_in_days`; apply to both raw and final tables. *Note*: Existing tables will not be affected; you must manually alter them.| | 3.6.6 | 2024-03-26 | [\#36466](https://github.com/airbytehq/airbyte/pull/36466) | Correctly hhandle instances with `QUOTED_IDENTIFIERS_IGNORE_CASE` enabled globally | | 3.6.5 | 2024-03-25 | [\#36461](https://github.com/airbytehq/airbyte/pull/36461) | Internal code change (use published CDK artifact instead of source dependency) |