Skip to content

Commit

Permalink
Destination Snowflake: Write extracted_at in UTC (#35308)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
  • Loading branch information
2 people authored and xiaohansong committed Mar 7, 2024
1 parent af78d82 commit 04789ac
Show file tree
Hide file tree
Showing 29 changed files with 1,165 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.5.14
dockerImageTag: 3.6.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
Expand All @@ -23,15 +24,18 @@
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -131,7 +135,7 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
protected JdbcDestinationHandler<SnowflakeState> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface");
}

Expand All @@ -151,22 +155,33 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database);
final String rawTableSchemaName;
final CatalogParser catalogParser;
if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) {
catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get());
rawTableSchemaName = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get();
catalogParser = new CatalogParser(sqlGenerator, rawTableSchemaName);
} else {
rawTableSchemaName = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
catalogParser = new CatalogParser(sqlGenerator);
}
final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database, rawTableSchemaName);
parsedCatalog = catalogParser.parseCatalog(catalog);
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final List<Migration<SnowflakeState>> migrations = List.of();
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
typerDeduper =
new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
} else {
typerDeduper =
new DefaultTyperDeduper(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
new DefaultTyperDeduper<>(
sqlGenerator,
snowflakeDestinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations);
}

return StagingConsumerFactory.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
Expand All @@ -40,20 +43,23 @@
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.commons.text.StringSubstitutor;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeDestinationHandler extends JdbcDestinationHandler {
public class SnowflakeDestinationHandler extends JdbcDestinationHandler<SnowflakeState> {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class);
public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement";

private final String databaseName;
private final JdbcDatabase database;

public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database) {
super(databaseName, database);
this.databaseName = databaseName;
public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database, final String rawTableSchema) {
// Postgres is close enough to Snowflake SQL for our purposes.
super(databaseName, database, rawTableSchema, SQLDialect.POSTGRES);
// We don't quote the database name in any queries, so just upcase it.
this.databaseName = databaseName.toUpperCase();
this.database = database;
}

Expand Down Expand Up @@ -107,7 +113,7 @@ AND table_schema IN (%s)
AND table_name IN (%s)
""".formatted(paramHolder, paramHolder);
final String[] bindValues = new String[streamIds.size() * 2 + 1];
bindValues[0] = databaseName.toUpperCase();
bindValues[0] = databaseName;
System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length);
System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length);
final List<JsonNode> results = database.queryJsons(query, bindValues);
Expand All @@ -120,14 +126,18 @@ AND table_name IN (%s)
return tableRowCounts;
}

public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
private InitialRawTableStatus getInitialRawTableState(final StreamId id, final DestinationSyncMode destinationSyncMode) throws Exception {
// Short-circuit for overwrite, table will be truncated anyway
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
return new InitialRawTableStatus(false, false, Optional.empty());
}
final ResultSet tables = database.getMetaData().getTables(
databaseName,
id.rawNamespace(),
id.rawName(),
null);
if (!tables.next()) {
return new InitialRawTableState(false, Optional.empty());
return new InitialRawTableStatus(false, false, Optional.empty());
}
// Snowflake timestamps have nanosecond precision, so decrement by 1ns
// And use two explicit queries because COALESCE doesn't short-circuit.
Expand All @@ -136,33 +146,55 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
"""
SELECT to_varchar(
TIMESTAMPADD(NANOSECOND, -1, MIN("_airbyte_extracted_at")),
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
) AS MIN_TIMESTAMP
FROM ${raw_table}
WHERE "_airbyte_loaded_at" IS NULL
WITH MIN_TS AS (
SELECT TIMESTAMPADD(NANOSECOND, -1,
MIN(TIMESTAMPADD(
HOUR,
EXTRACT(timezone_hour from "_airbyte_extracted_at"),
TIMESTAMPADD(
MINUTE,
EXTRACT(timezone_minute from "_airbyte_extracted_at"),
CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at")
)
))) AS MIN_TIMESTAMP
FROM ${raw_table}
WHERE "_airbyte_loaded_at" IS NULL
) SELECT TO_VARCHAR(MIN_TIMESTAMP,'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MIN_TIMESTAMP_UTC from MIN_TS;
""")),
// The query will always return exactly one record, so use .get(0)
record -> record.getString("MIN_TIMESTAMP")).get(0));
record -> record.getString("MIN_TIMESTAMP_UTC")).get(0));
if (minUnloadedTimestamp.isPresent()) {
return new InitialRawTableState(true, minUnloadedTimestamp.map(Instant::parse));
return new InitialRawTableStatus(true, true, minUnloadedTimestamp.map(Instant::parse));
}

// If there are no unloaded raw records, then we can safely skip all existing raw records.
// This second query just finds the newest raw record.

// This is _technically_ wrong, because during the DST transition we might select
// the wrong max timestamp. We _should_ do the UTC conversion inside the CTE, but that's a lot
// of work for a very small edge case.
// We released the fix to write extracted_at in UTC before DST changed, so this is fine.
final Optional<String> maxTimestamp = Optional.ofNullable(database.queryStrings(
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
"""
SELECT to_varchar(
MAX("_airbyte_extracted_at"),
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
) AS MIN_TIMESTAMP
FROM ${raw_table}
WITH MAX_TS AS (
SELECT MAX("_airbyte_extracted_at")
AS MAX_TIMESTAMP
FROM ${raw_table}
) SELECT TO_VARCHAR(
TIMESTAMPADD(
HOUR,
EXTRACT(timezone_hour from MAX_TIMESTAMP),
TIMESTAMPADD(
MINUTE,
EXTRACT(timezone_minute from MAX_TIMESTAMP),
CONVERT_TIMEZONE('UTC', MAX_TIMESTAMP)
)
),'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MAX_TIMESTAMP_UTC from MAX_TS;
""")),
record -> record.getString("MIN_TIMESTAMP")).get(0));
return new InitialRawTableState(false, maxTimestamp.map(Instant::parse));
record -> record.getString("MAX_TIMESTAMP_UTC")).get(0));
return new InitialRawTableStatus(true, false, maxTimestamp.map(Instant::parse));
}

@Override
Expand All @@ -171,7 +203,7 @@ public void execute(final Sql sql) throws Exception {
final UUID queryId = UUID.randomUUID();
for (final String transaction : transactions) {
final UUID transactionId = UUID.randomUUID();
LOGGER.debug("Executing sql {}-{}: {}", queryId, transactionId, transaction);
LOGGER.info("Executing sql {}-{}: {}", queryId, transactionId, transaction);
final long startTime = System.currentTimeMillis();

try {
Expand All @@ -190,7 +222,7 @@ public void execute(final Sql sql) throws Exception {
throw new RuntimeException(trimmedMessage, e);
}

LOGGER.debug("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime);
LOGGER.info("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime);
}
}

Expand Down Expand Up @@ -250,7 +282,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
}

@Override
public List<DestinationInitialState> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
public List<DestinationInitialStatus<SnowflakeState>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final Map<AirbyteStreamNameNamespacePair, SnowflakeState> destinationStates = super.getAllDestinationStates();

List<StreamId> streamIds = streamConfigs.stream().map(StreamConfig::id).toList();
final LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> existingTables = findExistingTables(database, databaseName, streamIds);
final LinkedHashMap<String, LinkedHashMap<String, Integer>> tableRowCounts = getFinalTableRowCount(streamIds);
Expand All @@ -267,8 +301,15 @@ public List<DestinationInitialState> gatherInitialState(List<StreamConfig> strea
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, existingTable);
isFinalTableEmpty = hasRowCount && tableRowCounts.get(namespace).get(name) == 0;
}
final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id());
return new DestinationInitialStateImpl(streamConfig, isFinalTablePresent, initialRawTableState, isSchemaMismatch, isFinalTableEmpty);
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.id(), streamConfig.destinationSyncMode());
final SnowflakeState destinationState = destinationStates.getOrDefault(streamConfig.id().asPair(), toDestinationState(Jsons.emptyObject()));
return new DestinationInitialStatus<>(
streamConfig,
isFinalTablePresent,
initialRawTableState,
isSchemaMismatch,
isFinalTableEmpty,
destinationState);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -290,6 +331,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
};
}

@Override
protected SnowflakeState toDestinationState(JsonNode json) {
return new SnowflakeState(
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "TEXT";
Expand Down
Loading

0 comments on commit 04789ac

Please sign in to comment.