Skip to content

Commit

Permalink
Automated Commit - Formatting Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored and octavia-squidington-iii committed Sep 22, 2023
1 parent 19e8eca commit f9cc957
Show file tree
Hide file tree
Showing 88 changed files with 2,279 additions and 2,353 deletions.
30 changes: 15 additions & 15 deletions .devcontainer/destination-duckdb/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
"customizations": {
"vscode": {
"extensions": [
// Python extensions:
"charliermarsh.ruff",
"matangover.mypy",
"ms-python.black-formatter",
"ms-python.python",
"ms-python.vscode-pylance",
// Python extensions:
"charliermarsh.ruff",
"matangover.mypy",
"ms-python.black-formatter",
"ms-python.python",
"ms-python.vscode-pylance",

// Toml support
"tamasfe.even-better-toml",

// Yaml and JSON Schema support:
"redhat.vscode-yaml",
// Toml support
"tamasfe.even-better-toml",

// Contributing:
"GitHub.vscode-pull-request-github"
// Yaml and JSON Schema support:
"redhat.vscode-yaml",

// Contributing:
"GitHub.vscode-pull-request-github"
],
"settings": {
"extensions.ignoreRecommendations": true,
Expand All @@ -53,8 +53,8 @@
"."
],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
}
"editor.defaultFormatter": "ms-python.black-formatter"
}
}
}
},
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import java.io.IOException;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
parsedCatalog,
migrator,
v2RawTableMigrator,
8);
8);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,29 +133,29 @@ public void copyIntoTableFromStage(final String datasetId,
LOGGER.info("Uploading records from staging files to target table {} (dataset {}): {}",
tableId, datasetId, stagedFileName);

final String fullFilePath = String.format("gs://%s/%s%s", gcsConfig.getBucketName(), getStagingFullPath(datasetId, stream), stagedFileName);
LOGGER.info("Uploading staged file: {}", fullFilePath);
final LoadJobConfiguration configuration = LoadJobConfiguration.builder(tableId, fullFilePath)
.setFormatOptions(FormatOptions.csv())
.setSchema(tableSchema)
.setWriteDisposition(WriteDisposition.WRITE_APPEND)
.setJobTimeoutMs(60000L)
.build();

final Job loadJob = this.bigQuery.create(JobInfo.of(configuration));
LOGGER.info("[{}] Created a new job to upload record(s) to target table {} (dataset {}): {}", loadJob.getJobId(),
tableId, datasetId, loadJob);

try {
BigQueryUtils.waitForJobFinish(loadJob);
LOGGER.info("[{}] Target table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(),
tableId, datasetId);
} catch (final BigQueryException | InterruptedException e) {
throw new RuntimeException(
String.format("[%s] Failed to upload staging files to destination table %s (%s)", loadJob.getJobId(),
tableId, datasetId),
e);
}
final String fullFilePath = String.format("gs://%s/%s%s", gcsConfig.getBucketName(), getStagingFullPath(datasetId, stream), stagedFileName);
LOGGER.info("Uploading staged file: {}", fullFilePath);
final LoadJobConfiguration configuration = LoadJobConfiguration.builder(tableId, fullFilePath)
.setFormatOptions(FormatOptions.csv())
.setSchema(tableSchema)
.setWriteDisposition(WriteDisposition.WRITE_APPEND)
.setJobTimeoutMs(60000L)
.build();

final Job loadJob = this.bigQuery.create(JobInfo.of(configuration));
LOGGER.info("[{}] Created a new job to upload record(s) to target table {} (dataset {}): {}", loadJob.getJobId(),
tableId, datasetId, loadJob);

try {
BigQueryUtils.waitForJobFinish(loadJob);
LOGGER.info("[{}] Target table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(),
tableId, datasetId);
} catch (final BigQueryException | InterruptedException e) {
throw new RuntimeException(
String.format("[%s] Failed to upload staging files to destination table %s (%s)", loadJob.getJobId(),
tableId, datasetId),
e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public SerializedAirbyteMessageConsumer createAsync(
}

/**
* Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely
* due to the sdk client we use.
* Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely due to
* the sdk client we use.
*
* @return number of bytes to make available for message buffering.
*/
Expand All @@ -79,10 +79,10 @@ private long getBigQueryBufferMemoryLimit() {
}

private Map<StreamDescriptor, BigQueryWriteConfig> createWriteConfigs(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog,
final Function<JsonNode, BigQueryRecordFormatter> recordFormatterCreator,
final Function<String, String> tmpTableNameTransformer) {
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog,
final Function<JsonNode, BigQueryRecordFormatter> recordFormatterCreator,
final Function<String, String> tmpTableNameTransformer) {
return catalog.getStreams().stream()
.map(configuredStream -> {
Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ public BigQueryWriteConfig(final String streamName,
tableSchema,
syncMode);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public void execute(final String sql) throws InterruptedException {
if (job.getStatus().getError() != null) {
throw new BigQueryException(Streams.concat(
Stream.of(job.getStatus().getError()),
job.getStatus().getExecutionErrors().stream()
).toList());
job.getStatus().getExecutionErrors().stream()).toList());
}

final JobStatistics.QueryStatistics statistics = job.getStatistics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.nio.file.Path;
import java.util.HashSet;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,7 @@
import duckdb
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
ConfiguredAirbyteCatalog,
DestinationSyncMode,
Status,
Type,
)
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type

logger = getLogger("airbyte")

Expand All @@ -46,9 +39,7 @@ def _get_destination_path(destination_path: str) -> str:
Get a normalized version of the destination path.
Automatically append /local/ to the start of the path
"""
if destination_path.startswith("md:") or destination_path.startswith(
"motherduck:"
):
if destination_path.startswith("md:") or destination_path.startswith("motherduck:"):
return destination_path

if not destination_path.startswith("/local"):
Expand All @@ -57,8 +48,7 @@ def _get_destination_path(destination_path: str) -> str:
destination_path = os.path.normpath(destination_path)
if not destination_path.startswith("/local"):
raise ValueError(
f"destination_path={destination_path} is not a valid path."
"A valid path shall start with /local or no / prefix"
f"destination_path={destination_path} is not a valid path." "A valid path shall start with /local or no / prefix"
)

return destination_path
Expand Down Expand Up @@ -140,9 +130,7 @@ def write(
data = message.record.data
stream = message.record.stream
if stream not in streams:
logger.debug(
f"Stream {stream} was not present in configured streams, skipping"
)
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
continue

# add to buffer
Expand All @@ -167,9 +155,7 @@ def write(
con.executemany(query, buffer[stream_name])
con.commit()

def check(
self, logger: AirbyteLogger, config: Mapping[str, Any]
) -> AirbyteConnectionStatus:
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.
Expand Down Expand Up @@ -198,6 +184,4 @@ def check(
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

except Exception as e:
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"An exception occurred: {repr(e)}"
)
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void execute(final String sql) throws Exception {
database.execute(sql);
} catch (final SnowflakeSQLException e) {
LOGGER.error("Sql {} failed", queryId, e);
// Snowflake SQL exceptions by default may not be super helpful, so we try to extract the relevant part of the message.
// Snowflake SQL exceptions by default may not be super helpful, so we try to extract the relevant
// part of the message.
final String trimmedMessage;
if (e.getMessage().startsWith(EXCEPTION_COMMON_PREFIX)) {
// The first line is a pretty generic message, so just remove it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public class SnowflakeSqlGenerator implements SqlGenerator<SnowflakeTableDefinit
private final ColumnId CDC_DELETED_AT_COLUMN = buildColumnId("_ab_cdc_deleted_at");

// See https://docs.snowflake.com/en/sql-reference/reserved-keywords.html
// and https://github.com/airbytehq/airbyte/blob/f226503bd1d4cd9c7412b04d47de584523988443/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/reserved_keywords.py
// and
// https://github.com/airbytehq/airbyte/blob/f226503bd1d4cd9c7412b04d47de584523988443/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/reserved_keywords.py
private static final List<String> RESERVED_COLUMN_NAMES = ImmutableList.of(
"CURRENT_DATE",
"CURRENT_TIME",
Expand All @@ -62,8 +63,8 @@ public ColumnId buildColumnId(final String name, final String suffix) {
// No escaping needed, as far as I can tell. We quote all our identifier names.
final String nameWithSuffix = name + suffix;
return new ColumnId(prefixReservedColumnName(escapeSqlIdentifier(name).toUpperCase()) + suffix,
nameWithSuffix,
nameWithSuffix.toUpperCase());
nameWithSuffix,
nameWithSuffix.toUpperCase());
}

public String toDialectType(final AirbyteType type) {
Expand Down Expand Up @@ -196,18 +197,22 @@ private String extractAndCast(final ColumnId column, final AirbyteType airbyteTy
}

/**
* The `${` bigram causes problems inside script blocks. For example, a perfectly innocuous query like
* `SELECT "_airbyte_data":"${foo}" FROM ...` works fine normally, but running this block will throw
* an error:
* <pre>{@code
* The `${` bigram causes problems inside script blocks. For example, a perfectly innocuous query
* like `SELECT "_airbyte_data":"${foo}" FROM ...` works fine normally, but running this block will
* throw an error:
*
* <pre>
* {@code
* EXECUTE IMMEDIATE 'BEGIN
* LET x INTEGER := (SELECT "_airbyte_data":"${foo}" FROM ...);
* END;';
* }</pre>
* }
* </pre>
* <p>
* This method is a workaround for this behavior. We switch to using the {@code get} method to extract
* JSON values, and avoid the `${` sequence by using string concatenation. This generates a sql statement
* like {@code SELECT TRY_CAST((get("_airbyte_data", '$' + '{foo}')::text) as INTEGER) FROM ...}.
* This method is a workaround for this behavior. We switch to using the {@code get} method to
* extract JSON values, and avoid the `${` sequence by using string concatenation. This generates a
* sql statement like {@code SELECT TRY_CAST((get("_airbyte_data", '$' + '{foo}')::text) as INTEGER)
* FROM ...}.
*/
private String extractAndCastInsideScript(final ColumnId column, final AirbyteType airbyteType) {
final String[] parts = column.originalName().split("\\$\\{", -1);
Expand Down Expand Up @@ -561,8 +566,8 @@ public String migrateFromV1toV2(final StreamId streamId, final String namespace,
}

/**
* Snowflake json object access is done using double-quoted strings, e.g. `SELECT "_airbyte_data":"foo"`.
* As such, we need to escape double-quotes in the field name.
* Snowflake json object access is done using double-quoted strings, e.g. `SELECT
* "_airbyte_data":"foo"`. As such, we need to escape double-quotes in the field name.
*/
public static String escapeJsonIdentifier(final String identifier) {
// Note that we don't need to escape backslashes here!
Expand Down Expand Up @@ -592,8 +597,7 @@ public static String escapeSqlIdentifier(String identifier) {
}

private static String prefixReservedColumnName(final String columnName) {
return RESERVED_COLUMN_NAMES.stream().anyMatch(k -> k.equalsIgnoreCase(columnName)) ?
"_" + columnName : columnName;
return RESERVED_COLUMN_NAMES.stream().anyMatch(k -> k.equalsIgnoreCase(columnName)) ? "_" + columnName : columnName;
}

public static String escapeSingleQuotedString(final String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.junit.jupiter.api.AfterAll;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake.typing_deduping;

import static org.junit.jupiter.api.Assertions.assertAll;
Expand All @@ -19,20 +23,15 @@ void columnNameSpecialCharacterHandling() {
new ColumnId(
"__FOO_",
"${foo}",
"${FOO}"
),
generator.buildColumnId("${foo}")
),
"${FOO}"),
generator.buildColumnId("${foo}")),
// But normally, we should leave those characters untouched.
() -> assertEquals(
new ColumnId(
"{FO$O}",
"{fo$o}",
"{FO$O}"
),
generator.buildColumnId("{fo$o}")
)
);
"{FO$O}"),
generator.buildColumnId("{fo$o}")));
}

/**
Expand All @@ -48,21 +47,17 @@ void streamNameSpecialCharacterHandling() {
"airbyte_internal",
"__foo__raw__stream___bar_",
"${foo}",
"${bar}"
),
generator.buildStreamId("${foo}", "${bar}", "airbyte_internal")
),
"${bar}"),
generator.buildStreamId("${foo}", "${bar}", "airbyte_internal")),
() -> assertEquals(
new StreamId(
"{FO$O}",
"{BA$R}",
"airbyte_internal",
"{fo$o}_raw__stream_{ba$r}",
"{fo$o}",
"{ba$r}"
),
generator.buildStreamId("{fo$o}", "{ba$r}", "airbyte_internal")
)
);
"{ba$r}"),
generator.buildStreamId("{fo$o}", "{ba$r}", "airbyte_internal")));
}

}
Loading

0 comments on commit f9cc957

Please sign in to comment.