From 2a15fbd578f170eeba24f55211541535ad0073e3 Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Fri, 9 Aug 2024 14:50:43 -0700 Subject: [PATCH] destination-snowflake: remove contention on state table (#43440) --- .../connectors/destination-snowflake/metadata.yaml | 2 +- .../typing_deduping/SnowflakeDestinationHandler.kt | 10 ++++++++++ .../AbstractSnowflakeTypingDedupingTest.kt | 8 ++++---- docs/integrations/destinations/snowflake.md | 3 ++- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 0f9f8886c45e..a3c8e529cbda 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.11.6 + dockerImageTag: 3.11.7 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt index b7d152d9cf6c..2b4dd0e939f7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.kt @@ -500,6 +500,16 @@ class SnowflakeDestinationHandler( return database.queryJsons(sql) } + override fun getDeleteStatesSql(destinationStates: Map): String { + if (Math.random() < 0.01) { + LOGGER.info("actually deleting states") + return super.getDeleteStatesSql(destinationStates) + } else { + LOGGER.info("skipping state deletion") + return "SELECT 1" // We still need to send a valid SQL query. + } + } + companion object { private val LOGGER: Logger = LoggerFactory.getLogger(SnowflakeDestinationHandler::class.java) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt index e97a761bef5b..cbfce24556a3 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt @@ -99,10 +99,10 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() { database!!.execute( String.format( """ - DROP TABLE IF EXISTS "%s"."%s"; - DROP SCHEMA IF EXISTS "%s" CASCADE - - """.trimIndent(), + DROP TABLE IF EXISTS "%s"."%s"; + DROP SCHEMA IF EXISTS "%s" CASCADE + + """.trimIndent(), rawSchema, // Raw table is still lowercase. StreamId.concatenateRawTableName(namespaceOrDefault, streamName), namespaceOrDefault.uppercase(Locale.getDefault()), diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 45fd49f16ea0..77a2b8b0bb14 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -268,7 +268,8 @@ desired namespace. | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.11.6 | 2024-08-07 | [\#43332](https://github.com/airbytehq/airbyte/pull/43332) | bump Java CDK | +| 3.11.7 | 2024-08-09 | [\#43440](https://github.com/airbytehq/airbyte/pull/43440) | remove contention on state table by deleting rows ony once every 100 updates | +| 3.11.6 | 2024-08-09 | [\#43332](https://github.com/airbytehq/airbyte/pull/43332) | bump Java CDK | | 3.11.5 | 2024-08-07 | [\#43348](https://github.com/airbytehq/airbyte/pull/43348) | SnowflakeSqlGen cleanup to Kotlin string interpolation | | 3.11.4 | 2024-07-18 | [\#41940](https://github.com/airbytehq/airbyte/pull/41940) | Update host regex to allow connecting to LocalStack Snowflake | | 3.11.3 | 2024-07-15 | [\#41968](https://github.com/airbytehq/airbyte/pull/41968) | Don't hang forever on empty stream list; shorten error message on INCOMPLETE stream status |