From 04789acbd10fc4d46fafd18fcff3ce67109499b5 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 6 Mar 2024 10:00:54 -0800 Subject: [PATCH] Destination Snowflake: Write extracted_at in UTC (#35308) Signed-off-by: Gireesh Sreepathi Co-authored-by: Gireesh Sreepathi --- .../destination-snowflake/build.gradle | 3 +- .../destination-snowflake/metadata.yaml | 2 +- .../SnowflakeInternalStagingDestination.java | 25 +- .../SnowflakeDestinationHandler.java | 107 ++- .../SnowflakeSqlGenerator.java | 40 +- .../migrations/SnowflakeState.kt | 19 + .../AbstractSnowflakeTypingDedupingTest.java | 33 +- .../SnowflakeSqlGeneratorIntegrationTest.java | 905 +++++++++++++++++- ...at_sync1_expectedrecords_dedup_final.jsonl | 5 + ...tracted_at_sync1_expectedrecords_raw.jsonl | 6 + ...orchange_expectedrecords_dedup_final.jsonl | 6 +- ...rsorchange_expectedrecords_dedup_raw.jsonl | 8 +- .../sync1_expectedrecords_dedup_final.jsonl | 8 +- .../sync1_expectedrecords_dedup_final2.jsonl | 2 +- ...sync1_expectedrecords_nondedup_final.jsonl | 10 +- .../dat/sync1_expectedrecords_raw.jsonl | 10 +- .../dat/sync1_expectedrecords_raw2.jsonl | 2 +- ...ectedrecords_incremental_dedup_final.jsonl | 4 +- ...xpectedrecords_incremental_dedup_raw.jsonl | 14 +- ...ctedrecords_fullrefresh_append_final.jsonl | 16 +- ...drecords_fullrefresh_overwrite_final.jsonl | 6 +- ...tedrecords_fullrefresh_overwrite_raw.jsonl | 6 +- ...ectedrecords_incremental_dedup_final.jsonl | 6 +- ...ctedrecords_incremental_dedup_final2.jsonl | 2 +- ...ds_incremental_dedup_final_mixed_tzs.jsonl | 4 + .../dat/sync2_expectedrecords_raw.jsonl | 16 +- .../dat/sync2_expectedrecords_raw2.jsonl | 4 +- .../sync2_expectedrecords_raw_mixed_tzs.jsonl | 10 + docs/integrations/destinations/snowflake.md | 1 + 29 files changed, 1165 insertions(+), 115 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/migrations/SnowflakeState.kt create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_dedup_final.jsonl create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_raw.jsonl create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final_mixed_tzs.jsonl create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw_mixed_tzs.jsonl diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index b84e054c0609..4cc747506746 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -1,9 +1,10 @@ plugins { id 'airbyte-java-connector' + id 'org.jetbrains.kotlin.jvm' version '1.9.22' } airbyteJavaConnector { - cdkVersionRequired = '0.23.2' + cdkVersionRequired = '0.23.11' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index d39c5a8c9669..fc0c46bd82cc 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.5.14 + dockerImageTag: 3.6.0 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index 253212ecf628..29eb9175e988 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -9,6 +9,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.Destination; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; @@ -23,15 +24,18 @@ import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve; import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler; import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator; import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator; import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator; +import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -131,7 +135,7 @@ protected JdbcSqlGenerator getSqlGenerator() { } @Override - protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) { + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) { throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface"); } @@ -151,22 +155,33 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final TyperDeduper typerDeduper; final JdbcDatabase database = getDatabase(getDataSource(config)); final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText(); - final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database); + final String rawTableSchemaName; final CatalogParser catalogParser; if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) { - catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()); + rawTableSchemaName = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get(); + catalogParser = new CatalogParser(sqlGenerator, rawTableSchemaName); } else { + rawTableSchemaName = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; catalogParser = new CatalogParser(sqlGenerator); } + final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database, rawTableSchemaName); parsedCatalog = catalogParser.parseCatalog(catalog); final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName); final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler); final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); + final List> migrations = List.of(); if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator); + typerDeduper = + new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations); } else { typerDeduper = - new DefaultTyperDeduper(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator); + new DefaultTyperDeduper<>( + sqlGenerator, + snowflakeDestinationHandler, + parsedCatalog, + migrator, + v2TableMigrator, + migrations); } return StagingConsumerFactory.builder( diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 5bfeb5d6b25e..61b500ffccdf 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -14,19 +14,22 @@ import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl; -import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.integrations.base.destination.typing_deduping.Union; import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; +import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; @@ -40,10 +43,11 @@ import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeSQLException; import org.apache.commons.text.StringSubstitutor; +import org.jooq.SQLDialect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SnowflakeDestinationHandler extends JdbcDestinationHandler { +public class SnowflakeDestinationHandler extends JdbcDestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class); public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement"; @@ -51,9 +55,11 @@ public class SnowflakeDestinationHandler extends JdbcDestinationHandler { private final String databaseName; private final JdbcDatabase database; - public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database) { - super(databaseName, database); - this.databaseName = databaseName; + public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database, final String rawTableSchema) { + // Postgres is close enough to Snowflake SQL for our purposes. + super(databaseName, database, rawTableSchema, SQLDialect.POSTGRES); + // We don't quote the database name in any queries, so just upcase it. + this.databaseName = databaseName.toUpperCase(); this.database = database; } @@ -107,7 +113,7 @@ AND table_schema IN (%s) AND table_name IN (%s) """.formatted(paramHolder, paramHolder); final String[] bindValues = new String[streamIds.size() * 2 + 1]; - bindValues[0] = databaseName.toUpperCase(); + bindValues[0] = databaseName; System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length); System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length); final List results = database.queryJsons(query, bindValues); @@ -120,14 +126,18 @@ AND table_name IN (%s) return tableRowCounts; } - public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { + private InitialRawTableStatus getInitialRawTableState(final StreamId id, final DestinationSyncMode destinationSyncMode) throws Exception { + // Short-circuit for overwrite, table will be truncated anyway + if (destinationSyncMode == DestinationSyncMode.OVERWRITE) { + return new InitialRawTableStatus(false, false, Optional.empty()); + } final ResultSet tables = database.getMetaData().getTables( databaseName, id.rawNamespace(), id.rawName(), null); if (!tables.next()) { - return new InitialRawTableState(false, Optional.empty()); + return new InitialRawTableStatus(false, false, Optional.empty()); } // Snowflake timestamps have nanosecond precision, so decrement by 1ns // And use two explicit queries because COALESCE doesn't short-circuit. @@ -136,33 +146,55 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of( "raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace( """ - SELECT to_varchar( - TIMESTAMPADD(NANOSECOND, -1, MIN("_airbyte_extracted_at")), - 'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM' - ) AS MIN_TIMESTAMP - FROM ${raw_table} - WHERE "_airbyte_loaded_at" IS NULL + WITH MIN_TS AS ( + SELECT TIMESTAMPADD(NANOSECOND, -1, + MIN(TIMESTAMPADD( + HOUR, + EXTRACT(timezone_hour from "_airbyte_extracted_at"), + TIMESTAMPADD( + MINUTE, + EXTRACT(timezone_minute from "_airbyte_extracted_at"), + CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at") + ) + ))) AS MIN_TIMESTAMP + FROM ${raw_table} + WHERE "_airbyte_loaded_at" IS NULL + ) SELECT TO_VARCHAR(MIN_TIMESTAMP,'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MIN_TIMESTAMP_UTC from MIN_TS; """)), // The query will always return exactly one record, so use .get(0) - record -> record.getString("MIN_TIMESTAMP")).get(0)); + record -> record.getString("MIN_TIMESTAMP_UTC")).get(0)); if (minUnloadedTimestamp.isPresent()) { - return new InitialRawTableState(true, minUnloadedTimestamp.map(Instant::parse)); + return new InitialRawTableStatus(true, true, minUnloadedTimestamp.map(Instant::parse)); } // If there are no unloaded raw records, then we can safely skip all existing raw records. // This second query just finds the newest raw record. + + // This is _technically_ wrong, because during the DST transition we might select + // the wrong max timestamp. We _should_ do the UTC conversion inside the CTE, but that's a lot + // of work for a very small edge case. + // We released the fix to write extracted_at in UTC before DST changed, so this is fine. final Optional maxTimestamp = Optional.ofNullable(database.queryStrings( conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of( "raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace( """ - SELECT to_varchar( - MAX("_airbyte_extracted_at"), - 'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM' - ) AS MIN_TIMESTAMP - FROM ${raw_table} + WITH MAX_TS AS ( + SELECT MAX("_airbyte_extracted_at") + AS MAX_TIMESTAMP + FROM ${raw_table} + ) SELECT TO_VARCHAR( + TIMESTAMPADD( + HOUR, + EXTRACT(timezone_hour from MAX_TIMESTAMP), + TIMESTAMPADD( + MINUTE, + EXTRACT(timezone_minute from MAX_TIMESTAMP), + CONVERT_TIMEZONE('UTC', MAX_TIMESTAMP) + ) + ),'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MAX_TIMESTAMP_UTC from MAX_TS; """)), - record -> record.getString("MIN_TIMESTAMP")).get(0)); - return new InitialRawTableState(false, maxTimestamp.map(Instant::parse)); + record -> record.getString("MAX_TIMESTAMP_UTC")).get(0)); + return new InitialRawTableStatus(true, false, maxTimestamp.map(Instant::parse)); } @Override @@ -171,7 +203,7 @@ public void execute(final Sql sql) throws Exception { final UUID queryId = UUID.randomUUID(); for (final String transaction : transactions) { final UUID transactionId = UUID.randomUUID(); - LOGGER.debug("Executing sql {}-{}: {}", queryId, transactionId, transaction); + LOGGER.info("Executing sql {}-{}: {}", queryId, transactionId, transaction); final long startTime = System.currentTimeMillis(); try { @@ -190,7 +222,7 @@ public void execute(final Sql sql) throws Exception { throw new RuntimeException(trimmedMessage, e); } - LOGGER.debug("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime); + LOGGER.info("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime); } } @@ -250,7 +282,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f } @Override - public List gatherInitialState(List streamConfigs) throws Exception { + public List> gatherInitialState(List streamConfigs) throws Exception { + final Map destinationStates = super.getAllDestinationStates(); + List streamIds = streamConfigs.stream().map(StreamConfig::id).toList(); final LinkedHashMap> existingTables = findExistingTables(database, databaseName, streamIds); final LinkedHashMap> tableRowCounts = getFinalTableRowCount(streamIds); @@ -267,8 +301,15 @@ public List gatherInitialState(List strea isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, existingTable); isFinalTableEmpty = hasRowCount && tableRowCounts.get(namespace).get(name) == 0; } - final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id()); - return new DestinationInitialStateImpl(streamConfig, isFinalTablePresent, initialRawTableState, isSchemaMismatch, isFinalTableEmpty); + final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.id(), streamConfig.destinationSyncMode()); + final SnowflakeState destinationState = destinationStates.getOrDefault(streamConfig.id().asPair(), toDestinationState(Jsons.emptyObject())); + return new DestinationInitialStatus<>( + streamConfig, + isFinalTablePresent, + initialRawTableState, + isSchemaMismatch, + isFinalTableEmpty, + destinationState); } catch (Exception e) { throw new RuntimeException(e); } @@ -290,6 +331,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) { }; } + @Override + protected SnowflakeState toDestinationState(JsonNode json) { + return new SnowflakeState( + json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean()); + } + private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { return switch (airbyteProtocolType) { case STRING -> "TEXT"; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 37b0bdaefff8..9c87733e6611 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -142,7 +142,7 @@ public Sql updateTable(final StreamConfig stream, dedupFinalTable = dedupFinalTable(stream.id(), finalSuffix, stream.primaryKey(), stream.cursor()); cdcDeletes = cdcDeletes(stream, finalSuffix); } - final String commitRawTable = commitRawTable(stream.id(), minRawTimestamp); + final String commitRawTable = commitRawTable(stream.id()); return transactionally(insertNewRecords, dedupFinalTable, cdcDeletes, commitRawTable); } @@ -227,6 +227,21 @@ WHEN TYPEOF(${expression}) != 'ARRAY' } } + private static String airbyteExtractedAtUtcForced(final String sqlExpression) { + return new StringSubstitutor(Map.of("expression", sqlExpression)).replace( + """ + TIMESTAMPADD( + HOUR, + EXTRACT(timezone_hour from ${expression}), + TIMESTAMPADD( + MINUTE, + EXTRACT(timezone_minute from ${expression}), + CONVERT_TIMEZONE('UTC', ${expression}) + ) + ) + """); + } + @VisibleForTesting String insertNewRecords(final StreamConfig stream, final String finalSuffix, @@ -297,14 +312,15 @@ AND TYPEOF("_airbyte_data":"_ab_cdc_deleted_at") NOT IN ('NULL', 'NULL_VALUE') "extractedAtCondition", extractedAtCondition, "column_list", columnList, "pk_list", pkList, - "cursor_order_clause", cursorOrderClause)).replace( + "cursor_order_clause", cursorOrderClause, + "airbyte_extracted_at_utc", airbyteExtractedAtUtcForced("\"_airbyte_extracted_at\""))).replace( """ WITH intermediate_data AS ( SELECT ${column_casts} ARRAY_CONSTRUCT_COMPACT(${column_errors}) as "_airbyte_cast_errors", "_airbyte_raw_id", - "_airbyte_extracted_at" + ${airbyte_extracted_at_utc} as "_airbyte_extracted_at" FROM ${raw_table_id} WHERE ( "_airbyte_loaded_at" IS NULL @@ -332,14 +348,15 @@ WITH intermediate_data AS ( "column_casts", columnCasts, "column_errors", columnErrors, "extractedAtCondition", extractedAtCondition, - "column_list", columnList)).replace( + "column_list", columnList, + "airbyte_extracted_at_utc", airbyteExtractedAtUtcForced("\"_airbyte_extracted_at\""))).replace( """ WITH intermediate_data AS ( SELECT ${column_casts} ARRAY_CONSTRUCT_COMPACT(${column_errors}) as "_airbyte_cast_errors", "_airbyte_raw_id", - "_airbyte_extracted_at" + ${airbyte_extracted_at_utc} as "_airbyte_extracted_at" FROM ${raw_table_id} WHERE "_airbyte_loaded_at" IS NULL @@ -356,7 +373,7 @@ WITH intermediate_data AS ( private static String buildExtractedAtCondition(final Optional minRawTimestamp) { return minRawTimestamp - .map(ts -> " AND \"_airbyte_extracted_at\" > '" + ts + "'") + .map(ts -> " AND " + airbyteExtractedAtUtcForced("\"_airbyte_extracted_at\"") + " > '" + ts + "'") .orElse(""); } @@ -373,13 +390,14 @@ String dedupFinalTable(final StreamId id, return new StringSubstitutor(Map.of( "final_table_id", id.finalTableId(QUOTE, finalSuffix.toUpperCase()), "pk_list", pkList, - "cursor_order_clause", cursorOrderClause)).replace( + "cursor_order_clause", cursorOrderClause, + "airbyte_extracted_at_utc", airbyteExtractedAtUtcForced("\"_AIRBYTE_EXTRACTED_AT\""))).replace( """ DELETE FROM ${final_table_id} WHERE "_AIRBYTE_RAW_ID" IN ( SELECT "_AIRBYTE_RAW_ID" FROM ( SELECT "_AIRBYTE_RAW_ID", row_number() OVER ( - PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} "_AIRBYTE_EXTRACTED_AT" DESC + PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} ${airbyte_extracted_at_utc} DESC ) as row_number FROM ${final_table_id} ) WHERE row_number != 1 @@ -406,15 +424,13 @@ private String cdcDeletes(final StreamConfig stream, final String finalSuffix) { } @VisibleForTesting - String commitRawTable(final StreamId id, final Optional minRawTimestamp) { + String commitRawTable(final StreamId id) { return new StringSubstitutor(Map.of( - "raw_table_id", id.rawTableId(QUOTE), - "extractedAtCondition", buildExtractedAtCondition(minRawTimestamp))).replace( + "raw_table_id", id.rawTableId(QUOTE))).replace( """ UPDATE ${raw_table_id} SET "_airbyte_loaded_at" = CURRENT_TIMESTAMP() WHERE "_airbyte_loaded_at" IS NULL - ${extractedAtCondition} ;"""); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/migrations/SnowflakeState.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/migrations/SnowflakeState.kt new file mode 100644 index 000000000000..d6648acb142b --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/migrations/SnowflakeState.kt @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake.typing_deduping.migrations + +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState + +// Note the nonnullable fields. Even though the underlying storage medium (a JSON blob) supports +// nullability, we don't want to deal with that in our codebase. +data class SnowflakeState(val needsSoftReset: Boolean) : MinimumDestinationState { + override fun needsSoftReset(): Boolean { + return needsSoftReset + } + + override fun withSoftReset(needsSoftReset: Boolean): T { + return copy(needsSoftReset = needsSoftReset) as T + } +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java index 2c502d1c1ac9..de6f4f849868 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java @@ -149,7 +149,7 @@ public void testFinalTableUppercasingMigration_append() throws Exception { runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw_mixed_tzs.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); } finally { @@ -218,6 +218,37 @@ public void testRemovingPKNonNullIndexes() throws Exception { assertEquals(1, dumpFinalTableRecords(streamNamespace, streamName).toArray().length); } + @Test + public void testExtractedAtUtcTimezoneMigration() throws Exception { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) + .withCursorField(List.of("updated_at")) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + // First sync + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog, messages1, "airbyte/destination-snowflake:3.5.11"); + + final List expectedRawRecords1 = readRecords("dat/ltz_extracted_at_sync1_expectedrecords_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/ltz_extracted_at_sync1_expectedrecords_dedup_final.jsonl"); + verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); + + // Second sync + final List messages2 = readMessages("dat/sync2_messages.jsonl"); + + runSync(catalog, messages2); + + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw_mixed_tzs.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final_mixed_tzs.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); + } + private String getDefaultSchema() { return getConfig().get("schema").asText(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index bf204e1909d7..7277f5991957 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -22,13 +22,15 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction; import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts; import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase; import io.airbyte.integrations.destination.snowflake.SnowflakeTestSourceOperations; import io.airbyte.integrations.destination.snowflake.SnowflakeTestUtils; +import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState; import java.nio.file.Path; import java.sql.SQLException; import java.util.Arrays; @@ -43,9 +45,10 @@ import org.apache.commons.text.StringSubstitutor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static String databaseName; private static JdbcDatabase database; @@ -71,7 +74,7 @@ protected SnowflakeSqlGenerator getSqlGenerator() { @Override protected SnowflakeDestinationHandler getDestinationHandler() { - return new SnowflakeDestinationHandler(databaseName, database); + return new SnowflakeDestinationHandler(databaseName, database, namespace.toUpperCase()); } @Override @@ -387,6 +390,13 @@ record -> record.get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).asText(), }); } + @Disabled("We removed the optimization to only set the loaded_at column for new records after certain _extracted_at") + @Test + @Override + public void ignoreOldRawRecords() throws Exception { + super.ignoreOldRawRecords(); + } + /** * Verify that the final table does not include NON-NULL PKs (after * https://github.com/airbytehq/airbyte/pull/31082) @@ -412,9 +422,9 @@ public void ensurePKsAreIndexedUnique() throws Exception { // should be OK with new tables destinationHandler.execute(createTable); - List initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + List> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); assertEquals(1, initialStates.size()); - assertFalse(initialStates.get(0).isSchemaMismatch()); + assertFalse(initialStates.getFirst().isSchemaMismatch()); destinationHandler.execute(Sql.of("DROP TABLE " + streamId.finalTableId(""))); // Hack the create query to add NOT NULLs to emulate the old behavior @@ -431,4 +441,889 @@ public void ensurePKsAreIndexedUnique() throws Exception { assertTrue(initialStates.get(0).isSchemaMismatch()); } + @Test + public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_dedup() throws Exception { + this.createRawTable(this.streamId); + this.createFinalTable(this.incrementalDedupStream, ""); + this.insertRawTableRecords(this.streamId, List.of( + // 2 records written by a sync running on the old version of snowflake + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 1", + "_airbyte_extracted_at": "2024-03-10T02:00:00-08:00", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice00" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst local tz 2", + "_airbyte_extracted_at": "2024-03-10T02:01:00-07:00", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob00" + } + } + """), + // and 2 records that got successfully loaded. + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 3", + "_airbyte_extracted_at": "2024-03-10T02:00:00-08:00", + "_airbyte_loaded_at": "1970-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 3, + "id2": 100, + "string": "Charlie00" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst local tz 4", + "_airbyte_extracted_at": "2024-03-10T02:01:00-07:00", + "_airbyte_loaded_at": "1970-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 4, + "id2": 100, + "string": "Dave00" + } + } + """))); + this.insertFinalTableRecords(false, this.streamId, "", List.of( + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 3", + "_airbyte_extracted_at": "2024-03-10T02:00:00-08:00", + "_airbyte_meta": {"errors": []}, + "id1": 3, + "id2": 100, + "string": "Charlie00" + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst local tz 4", + "_airbyte_extracted_at": "2024-03-10T02:01:00-07:00", + "_airbyte_meta": {"errors": []}, + "id1": 4, + "id2": 100, + "string": "Dave00" + } + """))); + // Gather initial state at the start of our updated sync + DestinationInitialStatus initialState = + this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // insert raw records with updates + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 2", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 3", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 3, + "id2": 100, + "string": "Charlie01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 4", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 4, + "id2": 100, + "string": "Dave01" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, + initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice01" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 2, + "ID2": 100, + "STRING": "Bob01" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 3", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 3, + "ID2": 100, + "STRING": "Charlie01" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 4", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 4, + "ID2": 100, + "STRING": "Dave01" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + } + + @Test + public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_dedup() throws Exception { + this.createRawTable(this.streamId); + this.createFinalTable(this.incrementalDedupStream, ""); + this.insertRawTableRecords(this.streamId, List.of( + // record written by a sync running on the old version of snowflake + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 1", + "_airbyte_extracted_at": "2024-03-10T01:59:00-08:00", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice00" + } + } + """))); + // Gather initial state at the start of our updated sync + DestinationInitialStatus initialState = + this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // update the record twice + // this never really happens, but verify that it works + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:00:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:01:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice02" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, + initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice02" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + } + + @Test + public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_dedup() throws Exception { + this.createRawTable(this.streamId); + this.createFinalTable(this.incrementalDedupStream, ""); + this.insertRawTableRecords(this.streamId, List.of( + // records written by a sync running on the old version of snowflake + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 1", + "_airbyte_extracted_at": "2024-03-10T01:59:00-08:00", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice00" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 2", + "_airbyte_extracted_at": "2024-03-10T01:59:00-08:00", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob00" + } + } + """))); + + // Gather initial state at the start of our first new sync + DestinationInitialStatus initialState = + this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // update the records + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:00:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst utc 2", + "_airbyte_extracted_at": "2024-03-10T02:00:00Z", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob01" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, + initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice01" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst utc 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 2, + "ID2": 100, + "STRING": "Bob01" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + + // Gather initial state at the start of our second new sync + DestinationInitialStatus initialState2 = + this.destinationHandler.gatherInitialState(List.of(this.incrementalDedupStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // update the records again + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:01:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice02" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 2", + "_airbyte_extracted_at": "2024-03-10T02:01:00Z", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob02" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalDedupStream, + initialState2.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice02" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 2, + "ID2": 100, + "STRING": "Bob02" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + } + + @Test + public void dst_test_oldSyncRunsThroughTransition_thenNewSyncRuns_append() throws Exception { + this.createRawTable(this.streamId); + this.createFinalTable(this.incrementalAppendStream, ""); + this.insertRawTableRecords(this.streamId, List.of( + // 2 records written by a sync running on the old version of snowflake + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 1", + "_airbyte_extracted_at": "2024-03-10T02:00:00-08:00", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice00" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst local tz 2", + "_airbyte_extracted_at": "2024-03-10T02:01:00-07:00", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob00" + } + } + """), + // and 2 records that got successfully loaded with local TZ. + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 3", + "_airbyte_extracted_at": "2024-03-10T02:00:00-08:00", + "_airbyte_loaded_at": "1970-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 3, + "id2": 100, + "string": "Charlie00" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst local tz 4", + "_airbyte_extracted_at": "2024-03-10T02:01:00-07:00", + "_airbyte_loaded_at": "1970-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 4, + "id2": 100, + "string": "Dave00" + } + } + """))); + this.insertFinalTableRecords(false, this.streamId, "", List.of( + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 3", + "_airbyte_extracted_at": "2024-03-10T02:00:00-08:00", + "_airbyte_meta": {"errors": []}, + "id1": 3, + "id2": 100, + "string": "Charlie00" + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst local tz 4", + "_airbyte_extracted_at": "2024-03-10T02:01:00-07:00", + "_airbyte_meta": {"errors": []}, + "id1": 4, + "id2": 100, + "string": "Dave00" + } + """))); + // Gather initial state at the start of our updated sync + DestinationInitialStatus initialState = + this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // insert raw records with updates + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 2", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 3", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 3, + "id2": 100, + "string": "Charlie01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 4", + "_airbyte_extracted_at": "2024-03-10T02:02:00Z", + "_airbyte_data": { + "id1": 4, + "id2": 100, + "string": "Dave01" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, + initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst local tz 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice00" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice01" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst local tz 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 2, + "ID2": 100, + "STRING": "Bob00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 2, + "ID2": 100, + "STRING": "Bob01" + } + """), + // note local TZ here. This record was loaded by an older version of the connector. + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst local tz 3", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000-08:00", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 3, + "ID2": 100, + "STRING": "Charlie00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 3", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 3, + "ID2": 100, + "STRING": "Charlie01" + } + """), + // note local TZ here. This record was loaded by an older version of the connector. + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst local tz 4", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000-07:00", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 4, + "ID2": 100, + "STRING": "Dave00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 4", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:02:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 4, + "ID2": 100, + "STRING": "Dave01" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + } + + @Test + public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_append() throws Exception { + this.createRawTable(this.streamId); + this.createFinalTable(this.incrementalAppendStream, ""); + this.insertRawTableRecords(this.streamId, List.of( + // record written by a sync running on the old version of snowflake + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 1", + "_airbyte_extracted_at": "2024-03-10T01:59:00-08:00", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice00" + } + } + """))); + // Gather initial state at the start of our updated sync + DestinationInitialStatus initialState = + this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // update the record twice + // this never really happens, but verify that it works + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:00:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:01:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice02" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, + initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst local tz 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T01:59:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 1, + "ID2": 100, + "STRING": "Alice00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 1, + "ID2": 100, + "STRING": "Alice01" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice02" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + } + + @Test + public void dst_test_oldSyncRunsBeforeTransition_thenNewSyncRunsBeforeTransition_thenNewSyncRunsThroughTransition_append() throws Exception { + this.createRawTable(this.streamId); + this.createFinalTable(this.incrementalAppendStream, ""); + this.insertRawTableRecords(this.streamId, List.of( + // records written by a sync running on the old version of snowflake + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 1", + "_airbyte_extracted_at": "2024-03-10T01:59:00-08:00", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice00" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst local tz 2", + "_airbyte_extracted_at": "2024-03-10T01:59:00-08:00", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob00" + } + } + """))); + + // Gather initial state at the start of our first new sync + DestinationInitialStatus initialState = + this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // update the records + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:00:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice01" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "pre-dst utc 2", + "_airbyte_extracted_at": "2024-03-10T02:00:00Z", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob01" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, + initialState.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst local tz 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T01:59:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 1, + "ID2": 100, + "STRING": "Alice00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice01" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst local tz 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T01:59:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 2, + "ID2": 100, + "STRING": "Bob00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst utc 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 2, + "ID2": 100, + "STRING": "Bob01" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + + // Gather initial state at the start of our second new sync + DestinationInitialStatus initialState2 = + this.destinationHandler.gatherInitialState(List.of(this.incrementalAppendStream)).getFirst(); + this.insertRawTableRecords(this.streamId, List.of( + // update the records again + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 1", + "_airbyte_extracted_at": "2024-03-10T02:01:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "string": "Alice02" + } + } + """), + Jsons.deserialize(""" + { + "_airbyte_raw_id": "post-dst utc 2", + "_airbyte_extracted_at": "2024-03-10T02:01:00Z", + "_airbyte_data": { + "id1": 2, + "id2": 100, + "string": "Bob02" + } + } + """))); + + TypeAndDedupeTransaction.executeTypeAndDedupe(this.generator, this.destinationHandler, this.incrementalAppendStream, + initialState2.initialRawTableStatus().maxProcessedTimestamp(), ""); + + DIFFER.diffFinalTableRecords( + List.of( + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst local tz 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T01:59:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 1, + "ID2": 100, + "STRING": "Alice00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 1, + "ID2": 100, + "STRING": "Alice01" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 1", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 1, + "ID2": 100, + "STRING": "Alice02" + } + """), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst local tz 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T01:59:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 2, + "ID2": 100, + "STRING": "Bob00" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "pre-dst utc 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:00:00.000000000Z", + "_AIRBYTE_META": { + "errors": [] + }, + "ID1": 2, + "ID2": 100, + "STRING": "Bob01" + }"""), + Jsons.deserialize(""" + { + "_AIRBYTE_RAW_ID": "post-dst utc 2", + "_AIRBYTE_EXTRACTED_AT": "2024-03-10T02:01:00.000000000Z", + "_AIRBYTE_META": {"errors": []}, + "ID1": 2, + "ID2": 100, + "STRING": "Bob02" + } + """)), + this.dumpFinalTableRecords(this.streamId, "")); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_dedup_final.jsonl new file mode 100644 index 000000000000..cb50cd6fcc31 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_dedup_final.jsonl @@ -0,0 +1,5 @@ +// Note the -08:00 offset in extracted_at. +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_raw.jsonl new file mode 100644 index 000000000000..6849b1072a0b --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/ltz_extracted_at_sync1_expectedrecords_raw.jsonl @@ -0,0 +1,6 @@ +// Note the -08:00 offset in extracted_at. +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl index 7c9e93b21705..9672e61c9678 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl @@ -1,3 +1,3 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "OLD_CURSOR": 1, "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "OLD_CURSOR": 2, "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "OLD_CURSOR": 3, "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "OLD_CURSOR": 1, "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "OLD_CURSOR": 2, "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "OLD_CURSOR": 3, "NAME": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl index fcf596ac0380..2f2b22731087 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl @@ -1,4 +1,4 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl index 136fa8a99003..0338cae59ac4 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl @@ -1,5 +1,5 @@ // Keep the Alice record with more recent UPDATED_AT -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final2.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final2.jsonl index 5f9395498870..83294d657935 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final2.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final2.jsonl @@ -1 +1 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2001-01-01T00:00:00.000000000Z", "NAME": "Someone completely different"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2001-01-01T00:00:00.000000000Z", "NAME": "Someone completely different"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl index 575aa338976c..ca3c0aafa537 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl @@ -1,6 +1,6 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "San Francisco", "state": "CA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "San Francisco", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} // Invalid columns are nulled out (i.e. SQL null, not JSON null) -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl index d1c3045997b3..8dbfcd6cbb9c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl @@ -1,7 +1,7 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} // Note the duplicate record. In this sync mode, we don't dedup anything. -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} // Invalid data is still allowed in the raw table. -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl index b0f0f8823c90..6849e306164f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_raw2.jsonl @@ -1 +1 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl index 93e29eb904e4..a22c21dfee41 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl @@ -1,3 +1,3 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} // Charlie wasn't reemitted with UPDATED_AT, so it still has a null cursor -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "NAME": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl index 347a9248d265..871f03978f60 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl @@ -1,7 +1,7 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl index 67171fa4c01b..8b2a3f160f44 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl @@ -1,9 +1,9 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "San Francisco", "state": "CA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "San Francisco", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl index 61366dee9ab4..3f3fd3f1f3e7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl @@ -1,3 +1,3 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl index 2607c9f73a49..7ea21e905fe2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl @@ -1,3 +1,3 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl index 2f7a58c51499..02e36c558939 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl @@ -1,4 +1,4 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} // Delete Bob, keep Charlie -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000Z", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final2.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final2.jsonl index b86eb147ba89..1eefb353ce6b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final2.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final2.jsonl @@ -1 +1 @@ -{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000-08:00", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2001-01-02T00:00:00.000000000Z", "NAME": "Someone completely different v2"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2001-01-02T00:00:00.000000000Z", "NAME": "Someone completely different v2"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final_mixed_tzs.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final_mixed_tzs.jsonl new file mode 100644 index 000000000000..686793ed026b --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final_mixed_tzs.jsonl @@ -0,0 +1,4 @@ +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:02.000000000Z", "_AIRBYTE_META":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +// Delete Bob, keep Charlie. We continue to keep old records in PST +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} +{"_AIRBYTE_EXTRACTED_AT": "1970-01-01T00:00:01.000000000-08:00", "_AIRBYTE_META": {"errors":[]}, "ID1": 3, "ID2": 200, "UPDATED_AT": "2000-01-01T00:04:00.000000000Z", "NAME": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl index d4bd6c49d4e7..2509cc47735e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl @@ -1,10 +1,10 @@ // We keep the records from the first sync -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} // And append the records from the second sync -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl index 4d2e3167888c..0c8fd4eceab0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw2.jsonl @@ -1,2 +1,2 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different v2"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2001-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Someone completely different v2"}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw_mixed_tzs.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw_mixed_tzs.jsonl new file mode 100644 index 000000000000..8bd778660427 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_raw_mixed_tzs.jsonl @@ -0,0 +1,10 @@ +// We keep the records from the first sync which used to be in PST TZ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} +// And append the records from the second sync +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}} diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 39be90148e99..966224d41e13 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.6.0 | 2024-02-22 | [35308](https://github.com/airbytehq/airbyte/pull/35308) | Upgrade CDK; use utc tz for extracted_at; Migrate existing extracted_at to utc; | | 3.5.14 | 2024-02-22 | [35456](https://github.com/airbytehq/airbyte/pull/35456) | Adopt CDK 0.23.0; Gather initial state upfront, reduce information_schema calls | | 3.5.13 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | | 3.5.12 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 |