Skip to content

Commit

Permalink
Merge branch 'master' into baz/source-shopify-bump-to-2023-07
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov authored Aug 17, 2023
2 parents 8d82af9 + c8b5b7b commit 305f154
Show file tree
Hide file tree
Showing 97 changed files with 1,290 additions and 1,917 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ private JavaBaseConstants() {}
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META);

public static final String AIRBYTE_NAMESPACE_SCHEMA = "airbyte";
public static final String DEFAULT_AIRBYTE_INTERNAL_NAMESPACE = "airbyte_internal";

}
Original file line number Diff line number Diff line change
Expand Up @@ -718,20 +718,57 @@ public void weirdColumnNames() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

/**
* A stream with no columns is weird, but we shouldn't treat it specially in any way. It should
* create a final table as usual, and populate it with the relevant metadata columns.
*/
@Test
public void noColumns() throws Exception {
createRawTable(streamId);
insertRawTableRecords(
streamId,
List.of(Jsons.deserialize(
"""
{
"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {}
}
""")));
final StreamConfig stream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList(),
Optional.empty(),
new LinkedHashMap<>());

final String createTable = generator.createTable(stream, "");
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
destinationHandler.execute(updateTable);

verifyRecords(
"sqlgenerator/nocolumns_expectedrecords_raw.jsonl",
dumpRawTableRecords(streamId),
"sqlgenerator/nocolumns_expectedrecords_final.jsonl",
dumpFinalTableRecords(streamId, ""));
}

@Test
public void testV1V2migration() throws Exception {
// This is maybe a little hacky, but it avoids having to refactor this entire class and subclasses
// for something that is going away
StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
final StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
createV1RawTable(v1RawTableStreamId);
insertV1RawTableRecords(v1RawTableStreamId, singletonList(Jsons.jsonNode(Map.of(
"_airbyte_ab_id", "v1v2",
"_airbyte_emitted_at", "2023-01-01T00:00:00Z",
"_airbyte_data", "{\"hello\": \"world\"}"))));
final String migration = generator.migrateFromV1toV2(streamId, v1RawTableStreamId.rawNamespace(), v1RawTableStreamId.rawName());
destinationHandler.execute(migration);
List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
final List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
final List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
assertAll(
() -> assertEquals(1, v1RawRecords.size()),
() -> assertEquals(1, v2RawRecords.size()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@

public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {

Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);

@Override
public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException {
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
migrate(sqlGenerator, destinationHandler, streamConfig);
} else {
LOGGER.info("No Migration Required for stream: {}", streamConfig.id().finalName());
}

}

/**
Expand All @@ -37,9 +41,13 @@ public void migrateIfNecessary(
*/
protected boolean shouldMigrate(final StreamConfig streamConfig) {
final var v1RawTable = convertToV1RawName(streamConfig);
return isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode())
&& !doesValidV2RawTableAlreadyExist(streamConfig)
&& doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace());
final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode());
final var noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig);
final var aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
LOGGER.info("Migration Info: Required for Sync mode: {}, No existing v2 raw tables: {}, A v1 raw table exists: {}",
syncModeNeedsMigration, noValidV2RawTableExists, aValidV1RawTableExists);
return syncModeNeedsMigration && noValidV2RawTableExists && aValidV1RawTableExists;
}

/**
Expand All @@ -55,7 +63,7 @@ public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final StreamConfig streamConfig)
throws TableNotMigratedException {
final var namespacedTableName = convertToV1RawName(streamConfig);
final var migrateAndReset = String.join("\n",
final var migrateAndReset = String.join("\n\n",
sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(),
namespacedTableName.tableName()),
sqlGenerator.softReset(streamConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.ArrayList;
Expand All @@ -15,12 +17,11 @@

public class CatalogParser {

public static final String DEFAULT_RAW_TABLE_NAMESPACE = "airbyte_internal";
private final SqlGenerator<?> sqlGenerator;
private final String rawNamespace;

public CatalogParser(final SqlGenerator<?> sqlGenerator) {
this(sqlGenerator, DEFAULT_RAW_TABLE_NAMESPACE);
this(sqlGenerator, DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
}

public CatalogParser(final SqlGenerator<?> sqlGenerator, final String rawNamespace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void testMigrate() {
final var sqlGenerator = new MockSqlGenerator();
final StreamConfig stream = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
final DestinationHandler<String> handler = Mockito.mock(DestinationHandler.class);
final var sql = String.join("\n", sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table"), sqlGenerator.softReset(stream));
final var sql = String.join("\n\n", sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table"),
sqlGenerator.softReset(stream));
// All is well
final var migrator = noIssuesMigrator();
migrator.migrate(sqlGenerator, handler, stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ data:
tags:
- language:python
ab_internal:
sl: 200
sl: 100
ql: 200
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ data:
tags:
- language:java
ab_internal:
sl: 200
sl: 100
ql: 300
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.7.6
LABEL io.airbyte.version=1.7.8
LABEL io.airbyte.name=airbyte/destination-bigquery

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.7.6
dockerImageTag: 1.7.8
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.base.JavaBaseConstants.AIRBYTE_NAMESPACE_SCHEMA;
import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Functions;
Expand Down Expand Up @@ -154,11 +154,15 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
final TyperDeduper typerDeduper) {
return () -> {
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size());
if (TypingAndDedupingFlag.isDestinationV2()) {
typerDeduper.prepareTables();
}
for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) {
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName());
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName()
);
// In Destinations V2, we will always use the 'airbyte' schema/namespace for raw tables
final String rawDatasetId = TypingAndDedupingFlag.isDestinationV2() ? AIRBYTE_NAMESPACE_SCHEMA : writeConfig.datasetId();
final String rawDatasetId = TypingAndDedupingFlag.isDestinationV2() ? DEFAULT_AIRBYTE_INTERNAL_NAMESPACE : writeConfig.datasetId();
// Regardless, ensure the schema the customer wants to write to exists
bigQueryGcsOperations.createSchemaIfNotExists(writeConfig.datasetId(), writeConfig.datasetLocation());
// Schema used for raw and airbyte internal tables
Expand All @@ -174,7 +178,6 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, writeConfig.targetTableId(), writeConfig.tableSchema());
}
}
typerDeduper.prepareTables();
LOGGER.info("Preparing tables in destination completed.");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class BigQuerySqlGenerator implements SqlGenerator<TableDefinition> {
* @param datasetLocation This is technically redundant with {@link BigQueryDestinationHandler} setting the query
* execution location, but let's be explicit since this is typically a compliance requirement.
*/
public BigQuerySqlGenerator(String datasetLocation) {
public BigQuerySqlGenerator(final String datasetLocation) {
this.datasetLocation = datasetLocation;
}

Expand Down Expand Up @@ -343,7 +343,7 @@ private String clearLoadedAt(final StreamId streamId) {
public String updateTable(final StreamConfig stream, final String finalSuffix) {
return updateTable(stream, finalSuffix, true);
}
private String updateTable(final StreamConfig stream, final String finalSuffix, boolean verifyPrimaryKeys) {
private String updateTable(final StreamConfig stream, final String finalSuffix, final boolean verifyPrimaryKeys) {
String pkVarDeclaration = "";
String validatePrimaryKeys = "";
if (verifyPrimaryKeys && stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) {
Expand Down Expand Up @@ -424,20 +424,26 @@ String insertNewRecords(final StreamConfig stream, final String finalSuffix, fin
final String columnCasts = streamColumns.entrySet().stream().map(
col -> extractAndCast(col.getKey(), col.getValue()) + " as " + col.getKey().name(QUOTE) + ",")
.collect(joining("\n"));
final String columnErrors = streamColumns.entrySet().stream().map(
col -> new StringSubstitutor(Map.of(
"raw_col_name", col.getKey().originalName(),
"col_type", toDialectType(col.getValue()).name(),
"json_extract", extractAndCast(col.getKey(), col.getValue()))).replace(
"""
CASE
WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN ["Problem with `${raw_col_name}`"]
ELSE []
END"""))
.collect(joining(",\n"));
final String columnErrors;
if (streamColumns.isEmpty()) {
// ARRAY_CONCAT doesn't like having an empty argument list, so handle that case separately
columnErrors = "[]";
} else {
columnErrors = "ARRAY_CONCAT(" + streamColumns.entrySet().stream().map(
col -> new StringSubstitutor(Map.of(
"raw_col_name", col.getKey().originalName(),
"col_type", toDialectType(col.getValue()).name(),
"json_extract", extractAndCast(col.getKey(), col.getValue()))).replace(
"""
CASE
WHEN (JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(`_airbyte_data`, '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN ["Problem with `${raw_col_name}`"]
ELSE []
END"""))
.collect(joining(",\n")) + ")";
}
final String columnList = streamColumns.keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));

String cdcConditionalOrIncludeStatement = "";
Expand Down Expand Up @@ -468,9 +474,7 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
WITH intermediate_data AS (
SELECT
${column_casts}
array_concat(
${column_errors}
) as _airbyte_cast_errors,
${column_errors} as _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM ${raw_table_id}
Expand Down Expand Up @@ -598,7 +602,7 @@ private String wrapAndQuote(final String namespace, final String tableName) {
}

@Override
public String migrateFromV1toV2(StreamId streamId, String namespace, String tableName) {
public String migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) {
return new StringSubstitutor(Map.of(
"v2_raw_table", streamId.rawTableId(QUOTE),
"v1_raw_table", wrapAndQuote(namespace, tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ protected boolean schemaMatchesExpectation(TableDefinition existingTable, Collec
@Override
protected NamespacedTableName convertToV1RawName(StreamConfig streamConfig) {
return new NamespacedTableName(
this.nameTransformer.getRawTableName(streamConfig.id().originalName()),
this.nameTransformer.getNamespace(streamConfig.id().originalNamespace())
this.nameTransformer.getNamespace(streamConfig.id().originalNamespace()),
this.nameTransformer.getRawTableName(streamConfig.id().originalName())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
Expand Down Expand Up @@ -71,6 +70,6 @@ protected void teardownStreamAndNamespace(String streamNamespace, String streamN
* Subclasses using a config with a nonstandard raw table dataset should override this method.
*/
protected String getRawDataset() {
return CatalogParser.DEFAULT_RAW_TABLE_NAMESPACE;
return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {}}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ data:
tags:
- language:java
ab_internal:
sl: 200
sl: 100
ql: 200
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ data:
tags:
- language:java
ab_internal:
sl: 200
sl: 100
ql: 200
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/destination-gcs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ data:
tags:
- language:java
ab_internal:
sl: 200
sl: 100
ql: 300
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ data:
tags:
- language:python
ab_internal:
sl: 300
sl: 100
ql: 200
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ data:
tags:
- language:java
ab_internal:
sl: 300
sl: 100
ql: 200
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ data:
tags:
- language:java
ab_internal:
sl: 300
sl: 100
ql: 200
supportLevel: certified
supportLevel: community
metadataSpecVersion: "1.0"
Loading

0 comments on commit 305f154

Please sign in to comment.