Skip to content

Commit

Permalink
Merge branch 'master' into baz/source-shopify-bump-to-2023-07
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov authored Aug 21, 2023
2 parents 446e37b + 4782a2b commit 9af7bdd
Show file tree
Hide file tree
Showing 157 changed files with 5,828 additions and 2,161 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.50.18
current_version = 0.50.19
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def read_all_files_in_directory(
".hypothesis",
}

IGNORED_FILENAME_PATTERN_FOR_HTTPS_CHECKS = {"*Test.java", "*.pyc", "*.gz", "*.svg"}
IGNORED_FILENAME_PATTERN_FOR_HTTPS_CHECKS = {"*Test.java", "*.jar", "*.pyc", "*.gz", "*.svg"}
IGNORED_URLS_PREFIX = {
"http://json-schema.org",
"http://localhost",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def validate_all_tags_are_keyvalue_pairs(
def is_major_version(version: str) -> bool:
"""Check whether the version is of format N.0.0"""
semver_version = semver.Version.parse(version)
return semver_version.minor == 0 and semver_version.patch == 0
return semver_version.minor == 0 and semver_version.patch == 0 and semver_version.prerelease is None


def validate_major_version_bump_has_breaking_change_entry(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
metadataSpecVersion: 1.0
data:
name: AlloyDB for PostgreSQL
definitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
connectorType: source
dockerRepository: airbyte/image-exists-1
githubIssueLabel: source-alloydb-strict-encrypt
dockerImageTag: 2.0.0-dev.cf3628ccf3
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
connectorSubtype: database
releaseStage: generally_available
license: MIT
releases:
breakingChanges:
2.0.0:
upgradeDeadline: 2023-08-22
message: "This version changes the connector’s authentication method from `ApiKey` to `oAuth`, per the [API guide](https://amazon-sqs.com/api/someguide)."
tags:
- language:java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public String uploadRecordsToBucket(final SerializableBuffer recordsData,
private String loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
final long partSize = DEFAULT_PART_SIZE;
final String bucket = s3Config.getBucketName();
final String partId = getPartId(objectPath);
final String partId = UUID.randomUUID().toString();
final String fileExtension = getExtension(recordsData.getFilename());
final String fullObjectKey;
if (StringUtils.isNotBlank(s3Config.getFileNamePattern())) {
Expand Down Expand Up @@ -230,17 +230,6 @@ protected static String getExtension(final String filename) {
return "." + result;
}

private String getPartId(final String objectPath) {
final String bucket = s3Config.getBucketName();
final ObjectListing objects = s3Client.listObjects(bucket, objectPath);
if (objects.isTruncated()) {
// bucket contains too many objects, use an uuid instead
return UUID.randomUUID().toString();
} else {
return Integer.toString(objects.getObjectSummaries().size());
}
}

@Override
public void dropBucketObject(final String objectPath) {
cleanUpBucketObject(objectPath, List.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ AirbyteMessageConsumer getConsumer(JsonNode config,

/**
* Default implementation allows us to not have to touch existing destinations while avoiding a lot
* of conditional statements in {@link IntegrationRunner}.
* of conditional statements in {@link IntegrationRunner}. This is preferred over #getConsumer and
* is the default Async Framework method.
*
* @param config config
* @param catalog catalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.version=0.6.5
LABEL io.airbyte.name=airbyte/destination-redshift

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.6.4
dockerImageTag: 0.6.5
dockerRepository: airbyte/destination-redshift
githubIssueLabel: destination-redshift
icon: redshift.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true


LABEL io.airbyte.version=1.3.3
LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.name=airbyte/destination-snowflake

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.3.3
dockerImageTag: 2.0.0
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand All @@ -28,6 +28,11 @@ data:
supportsDbt: true
tags:
- language:java
releases:
breakingChanges:
2.0.0:
message: "Remove GCS/S3 loading method support."
upgradeDeadline: "2023-08-31"
ab_internal:
sl: 200
ql: 400
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,19 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

// TODO: Remove the Switching Destination from this class as part of code cleanup.
@Slf4j
public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> {

public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
private final String airbyteEnvironment;

enum DestinationType {
COPY_S3,
COPY_GCS,
INTERNAL_STAGING
}

Expand All @@ -40,29 +35,8 @@ public SnowflakeDestination(final String airbyteEnvironment) {
@Override
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
log.info("destination class: {}", getClass());
final var useAsyncSnowflake = useAsyncSnowflake(config);
log.info("using async snowflake: {}", useAsyncSnowflake);
if (useAsyncSnowflake) {
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
} else {
return new ShimToSerializedAirbyteMessageConsumer(getConsumer(config, catalog, outputRecordCollector));
}

}

public static boolean useAsyncSnowflake(final JsonNode config) {
final Set<String> stagingLoadingMethods = Set.of("internal staging", "internal-staging", "internal_staging");

return Optional.of(config)
.map(node -> node.get("loading_method"))
.map(node -> node.get("method"))
.map(JsonNode::asText)
.map(String::toLowerCase)
.map(loadingMethod -> stagingLoadingMethods.contains(loadingMethod))
.orElse(false);
final Consumer<AirbyteMessage> outputRecordCollector) {
return new SnowflakeInternalStagingDestination(airbyteEnvironment).getSerializedMessageConsumer(config, catalog, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,13 @@
public class SnowflakeDestinationResolver {

public static DestinationType getTypeFromConfig(final JsonNode config) {
if (isS3Copy(config)) {
return DestinationType.COPY_S3;
} else if (isGcsCopy(config)) {
return DestinationType.COPY_GCS;
} else {
return DestinationType.INTERNAL_STAGING;
}
}

public static boolean isS3Copy(final JsonNode config) {
return config.has("loading_method") && config.get("loading_method").isObject() && config.get("loading_method").has("s3_bucket_name");
}

public static boolean isGcsCopy(final JsonNode config) {
return config.has("loading_method") && config.get("loading_method").isObject() && config.get("loading_method").has("project_id");
}

public static int getNumberOfFileBuffers(final JsonNode config) {
int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}

public static Map<DestinationType, Destination> getTypeToDestination(
final String airbyteEnvironment) {
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination(airbyteEnvironment);
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination(airbyteEnvironment);
public static Map<DestinationType, Destination> getTypeToDestination(final String airbyteEnvironment) {
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(airbyteEnvironment);

return ImmutableMap.of(
DestinationType.COPY_S3, s3StagingDestination,
DestinationType.COPY_GCS, gcsStagingDestination,
DestinationType.INTERNAL_STAGING, internalStagingDestination);
return ImmutableMap.of(DestinationType.INTERNAL_STAGING, internalStagingDestination);
}

}
Loading

0 comments on commit 9af7bdd

Please sign in to comment.