From 5400e445ff7e6610b36a42a6dc8cd03044bb0cf4 Mon Sep 17 00:00:00 2001 From: Ahmed Hamdy Date: Sat, 17 Aug 2024 10:52:41 +0100 Subject: [PATCH] [FLINK-35401] Add Sqs Table API connector --- .../flink-connector-sqs/pom.xml | 22 ++ .../sqs/table/SqsConnectorOptions.java | 52 ++++ .../connector/sqs/table/SqsDynamicSink.java | 261 ++++++++++++++++++ .../sqs/table/SqsDynamicTableFactory.java | 140 ++++++++++ .../org.apache.flink.table.factories.Factory | 16 ++ .../sqs/table/SqsDynamicTableFactoryTest.java | 257 +++++++++++++++++ 6 files changed, 748 insertions(+) create mode 100644 flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsConnectorOptions.java create mode 100644 flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicSink.java create mode 100644 flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactory.java create mode 100644 flink-connector-aws/flink-connector-sqs/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactoryTest.java diff --git a/flink-connector-aws/flink-connector-sqs/pom.xml b/flink-connector-aws/flink-connector-sqs/pom.xml index 00e3c298..6ab20276 100644 --- a/flink-connector-aws/flink-connector-sqs/pom.xml +++ b/flink-connector-aws/flink-connector-sqs/pom.xml @@ -103,6 +103,28 @@ under the License. jackson-datatype-jsr310 + + + org.apache.flink + flink-table-common + ${flink.version} + + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test + test-jar + + diff --git a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsConnectorOptions.java b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsConnectorOptions.java new file mode 100644 index 00000000..dcfb36a8 --- /dev/null +++ b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsConnectorOptions.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.util.Map; + +/** Options for the SQS connector. */ +@PublicEvolving +public class SqsConnectorOptions { + public static final ConfigOption QUEUE_URL = + ConfigOptions.key("queue-url") + .stringType() + .noDefaultValue() + .withDescription("The URL of the SQS queue."); + + public static final ConfigOption AWS_REGION = + ConfigOptions.key("aws.region") + .stringType() + .noDefaultValue() + .withDescription("AWS region of used SQS queue."); + + public static final ConfigOption> AWS_CONFIG_PROPERTIES = + ConfigOptions.key("aws") + .mapType() + .noDefaultValue() + .withDescription("AWS configuration properties."); + + public static final ConfigOption FAIL_ON_ERROR = + ConfigOptions.key("sink.fail-on-error") + .booleanType() + .defaultValue(false) + .withDescription("Flag to trigger global failure on error."); +} diff --git a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicSink.java b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicSink.java new file mode 100644 index 00000000..719fee18 --- /dev/null +++ b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicSink.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.SqsSinkBuilder; +import org.apache.flink.connector.sqs.sink.SqsSinkElementConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +/** A {@link DynamicTableSink} for SQS. */ +@Internal +public class SqsDynamicSink extends AsyncDynamicTableSink { + + /** Consumed data type of the table. */ + private final DataType consumedDataType; + + /** Url of Sqs queue to write to. */ + private final String sqsUrl; + + /** Properties for the Sqs Aws Client. */ + private final Properties sqsClientProps; + + /** Encoding format to convert between row data and byte array. */ + EncodingFormat> encodingFormat; + + /** Flag to determine whether to fail on error. */ + private final Boolean failOnError; + + protected SqsDynamicSink( + @Nullable Integer maxBatchSize, + @Nullable Integer maxInFlightRequests, + @Nullable Integer maxBufferedRequests, + @Nullable Long maxBufferSizeInBytes, + @Nullable Long maxTimeInBufferMS, + @Nullable Boolean failOnError, + @Nullable DataType consumedDataType, + EncodingFormat> encodingFormat, + String sqsUrl, + @Nullable Properties sqsClientProps) { + super( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS); + this.consumedDataType = consumedDataType; + this.sqsUrl = sqsUrl; + this.sqsClientProps = sqsClientProps; + this.failOnError = failOnError; + this.encodingFormat = encodingFormat; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { + return encodingFormat.getChangelogMode(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + SqsSinkBuilder builder = SqsSink.builder(); + builder.setSqsUrl(sqsUrl); + Optional.ofNullable(sqsClientProps).ifPresent(builder::setSqsClientProperties); + builder.setSqsSinkElementConverter( + SqsSinkElementConverter.builder() + .setSerializationSchema( + encodingFormat.createRuntimeEncoder(context, consumedDataType)) + .build()); + Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError); + return SinkV2Provider.of(builder.build()); + } + + @Override + public DynamicTableSink copy() { + return new SqsDynamicSink( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS, + failOnError, + consumedDataType, + encodingFormat, + sqsUrl, + sqsClientProps); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqsDynamicSink that = (SqsDynamicSink) o; + return super.equals(o) + && failOnError == that.failOnError + && Objects.equals(consumedDataType, that.consumedDataType) + && Objects.equals(sqsUrl, that.sqsUrl) + && Objects.equals(sqsClientProps, that.sqsClientProps) + && Objects.equals(encodingFormat, that.encodingFormat); + } + + @Override + public int hashCode() { + return Objects.hash( + super.hashCode(), + consumedDataType, + sqsUrl, + sqsClientProps, + encodingFormat, + failOnError); + } + + @Override + public String asSummaryString() { + StringBuilder sb = new StringBuilder(); + sb.append("SqsDynamicSink{"); + sb.append("sqsUrl='").append(sqsUrl).append('\''); + sb.append(", consumedDataType=").append(consumedDataType); + sb.append(", encodingFormat=").append(encodingFormat); + sb.append(", failOnError=").append(failOnError); + Optional.ofNullable(sqsClientProps) + .ifPresent( + props -> + props.forEach( + (k, v) -> sb.append(", ").append(k).append("=").append(v))); + sb.append(", maxBatchSize=").append(maxBatchSize); + sb.append(", maxInFlightRequests=").append(maxInFlightRequests); + sb.append(", maxBufferedRequests=").append(maxBufferedRequests); + sb.append(", maxBufferSizeInBytes=").append(maxBufferSizeInBytes); + sb.append(", maxTimeInBufferMS=").append(maxTimeInBufferMS); + sb.append('}'); + return sb.toString(); + } + + @Override + public String toString() { + return asSummaryString(); + } + + public static SqsQueueUrlConfigurator builder() { + return new SqsDynamicSinkBuilder(); + } + + /** Builder for {@link SqsDynamicSink}. */ + @Internal + public static class SqsDynamicSinkBuilder + extends AsyncDynamicTableSinkBuilder< + SendMessageBatchRequestEntry, SqsDynamicSinkBuilder> + implements SqsQueueUrlConfigurator, SqsSinkEncodingFormatConfigurator { + + private String sqsUrl; + + private Properties sqsClientProps; + + private EncodingFormat> encodingFormat; + + private Boolean failOnError; + + private DataType consumedDataType; + + @Override + public SqsSinkEncodingFormatConfigurator setSqsQueueUrl(String sqsUrl) { + this.sqsUrl = sqsUrl; + return this; + } + + @Override + public SqsDynamicSinkBuilder setEncodingFormat( + EncodingFormat> encodingFormat) { + this.encodingFormat = encodingFormat; + return this; + } + + public SqsDynamicSinkBuilder setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + return this; + } + + public SqsDynamicSinkBuilder setSqsClientProperties(Properties sqsClientProps) { + this.sqsClientProps = sqsClientProps; + return this; + } + + public SqsDynamicSinkBuilder setConsumedDataType(DataType consumedDataType) { + this.consumedDataType = consumedDataType; + return this; + } + + @Override + public SqsDynamicSink build() { + return new SqsDynamicSink( + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBufferSizeInBytes(), + getMaxTimeInBufferMS(), + failOnError, + consumedDataType, + encodingFormat, + sqsUrl, + sqsClientProps); + } + } + + /** Configurator for the required Sqs queue url. */ + @Internal + public interface SqsQueueUrlConfigurator { + /** + * Configures the Sqs queue url. + * + * @param sqsUrl the url of the Sqs queue + */ + SqsSinkEncodingFormatConfigurator setSqsQueueUrl(String sqsUrl); + } + + /** Configurator for the required encoding format. */ + @Internal + public interface SqsSinkEncodingFormatConfigurator { + /** + * Configures the encoding format. + * + * @param encodingFormat the encoding format + */ + SqsDynamicSinkBuilder setEncodingFormat( + EncodingFormat> encodingFormat); + } +} diff --git a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactory.java b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactory.java new file mode 100644 index 00000000..16e871b6 --- /dev/null +++ b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactory.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory; +import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions; +import org.apache.flink.table.connector.sink.DynamicTableSink; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; + +/** Factory for creating configured instances of {@link SqsDynamicSink}. */ +@Internal +public class SqsDynamicTableFactory extends AsyncDynamicTableSinkFactory { + private static final String IDENTIFIER = "sqs"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context); + factoryContext.getFactoryHelper().validate(); + ReadableConfig config = factoryContext.getTableOptions(); + Properties clientProperties = getSqsClientProperties(config); + AWSGeneralUtil.validateAwsConfiguration(clientProperties); + + SqsDynamicSink.SqsDynamicSinkBuilder builder = + SqsDynamicSink.builder() + .setSqsQueueUrl(config.get(SqsConnectorOptions.QUEUE_URL)) + .setEncodingFormat(factoryContext.getEncodingFormat()) + .setSqsClientProperties(clientProperties) + .setConsumedDataType(factoryContext.getPhysicalDataType()) + .setFailOnError(config.get(SqsConnectorOptions.FAIL_ON_ERROR)); + + addAsyncOptionsToBuilder(getAsyncSinkOptions(config), builder); + return builder.build(); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(SqsConnectorOptions.QUEUE_URL); + options.add(SqsConnectorOptions.AWS_REGION); + options.add(FORMAT); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = super.optionalOptions(); + options.add(SqsConnectorOptions.FAIL_ON_ERROR); + options.add(SqsConnectorOptions.AWS_CONFIG_PROPERTIES); + return options; + } + + @Override + public Set> forwardOptions() { + Set> options = new HashSet<>(); + options.add(SqsConnectorOptions.QUEUE_URL); + options.add(SqsConnectorOptions.AWS_REGION); + options.add(SqsConnectorOptions.AWS_CONFIG_PROPERTIES); + return options; + } + + private Properties getSqsClientProperties(ReadableConfig config) { + Properties properties = new Properties(); + properties.putAll( + appendAwsPrefixToOptions(config.get(SqsConnectorOptions.AWS_CONFIG_PROPERTIES))); + return properties; + } + + private Map appendAwsPrefixToOptions(Map options) { + Map prefixedProperties = new HashMap<>(); + options.forEach((key, value) -> prefixedProperties.put("aws" + "." + key, value)); + return prefixedProperties; + } + + private Properties getAsyncSinkOptions(ReadableConfig config) { + Properties properties = new Properties(); + Optional.ofNullable(config.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE)) + .ifPresent( + flushBufferSize -> + properties.put( + AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(), + flushBufferSize)); + Optional.ofNullable(config.get(AsyncSinkConnectorOptions.MAX_BATCH_SIZE)) + .ifPresent( + maxBatchSize -> + properties.put( + AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(), + maxBatchSize)); + Optional.ofNullable(config.get(AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS)) + .ifPresent( + maxInflightRequests -> + properties.put( + AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS.key(), + maxInflightRequests)); + Optional.ofNullable(config.get(AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS)) + .ifPresent( + maxBufferedRequests -> + properties.put( + AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS.key(), + maxBufferedRequests)); + Optional.ofNullable(config.get(AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT)) + .ifPresent( + timeout -> + properties.put( + AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(), + timeout)); + return properties; + } +} diff --git a/flink-connector-aws/flink-connector-sqs/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-aws/flink-connector-sqs/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000..5f3b61c4 --- /dev/null +++ b/flink-connector-aws/flink-connector-sqs/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.sqs.table.SqsDynamicTableFactory \ No newline at end of file diff --git a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactoryTest.java b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactoryTest.java new file mode 100644 index 00000000..a4f4b49f --- /dev/null +++ b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/table/SqsDynamicTableFactoryTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.table; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.TableOptionsBuilder; +import org.apache.flink.table.factories.TestFormatFactory; +import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; +import org.apache.flink.table.types.DataType; + +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** Test class for {@link SqsDynamicTableFactory}. */ +public class SqsDynamicTableFactoryTest { + private static final String SQS_QUEUE_URL = "sqs_queue_url"; + + @Test + public void createSqsDynamicTableSinkWithDefaultOptions() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + DataType physicalDataType = sinkSchema.toPhysicalRowDataType(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + + // Construct actual DynamicTableSink using FactoryUtil + SqsDynamicSink actualSink = (SqsDynamicSink) createTableSink(sinkSchema, sinkOptions); + // Construct expected DynamicTableSink using factory under test + SqsDynamicSink expectedSink = constructExpectedSink(physicalDataType, false); + assertTableSinkEqualsAndOfCorrectType(actualSink, expectedSink); + } + + @Test + public void createSqsTableSinkWithoutQueueUrlFails() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.remove("queue-url"); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> createTableSink(sinkSchema, sinkOptions)) + .havingCause() + .withMessageMatching( + "One or more required options are missing\\.[\\s\\n]*Missing required options are:[\\s\\n]*queue-url"); + } + + @Test + public void createSqsTableSinkWithoutRegionFails() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.remove("aws.region"); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> createTableSink(sinkSchema, sinkOptions)) + .havingCause() + .withMessageMatching( + "One or more required options are missing\\.[\\s\\n]*Missing required options are:[\\s\\n]*aws.region"); + } + + @Test + public void createSqsTableSinkWithoutFormatFails() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.remove("format"); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> createTableSink(sinkSchema, sinkOptions)) + .havingCause() + .withMessageContaining("Could not find required sink format 'format'"); + } + + @Test + public void createSqsTableSinkWithInvalidOptionFails() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.put("invalid-option", "invalid-value"); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> createTableSink(sinkSchema, sinkOptions)) + .havingCause() + .withMessageContaining("Unsupported options:\n\ninvalid-option"); + } + + @Test + public void createSqsTableSinkWithAwsOptionIsPropagatedToClientProperties() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + DataType physicalDataType = sinkSchema.toPhysicalRowDataType(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.put("aws.http-client.read-timeout", "1000"); + + // Construct actual DynamicTableSink using FactoryUtil + SqsDynamicSink actualSink = (SqsDynamicSink) createTableSink(sinkSchema, sinkOptions); + // Construct expected DynamicTableSink using factory under test + Properties clientProperties = getDefaultAwsClientProperties(); + clientProperties.put("aws.http-client.read-timeout", "1000"); + SqsDynamicSink expectedSink = + getDefaultExpectedSinkBuilder(physicalDataType, clientProperties, false).build(); + assertTableSinkEqualsAndOfCorrectType(actualSink, expectedSink); + } + + @Test + public void createSqsTableSinkWithFailOnErrorOptionIsPropagated() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + DataType physicalDataType = sinkSchema.toPhysicalRowDataType(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.put("sink.fail-on-error", "true"); + + // Construct actual DynamicTableSink using FactoryUtil + SqsDynamicSink actualSink = (SqsDynamicSink) createTableSink(sinkSchema, sinkOptions); + // Construct expected DynamicTableSink using factory under test + SqsDynamicSink expectedSink = constructExpectedSink(physicalDataType, true); + assertTableSinkEqualsAndOfCorrectType(actualSink, expectedSink); + } + + @Test + public void createSqsTableSinkWithUnknownAwsOptionSucceeds() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + DataType physicalDataType = sinkSchema.toPhysicalRowDataType(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.put("aws.unknown-option", "unknown-value"); + + // Construct actual DynamicTableSink using FactoryUtil + SqsDynamicSink actualSink = (SqsDynamicSink) createTableSink(sinkSchema, sinkOptions); + // Construct expected DynamicTableSink using factory under test + Properties clientProperties = getDefaultAwsClientProperties(); + clientProperties.put("aws.unknown-option", "unknown-value"); + SqsDynamicSink expectedSink = + getDefaultExpectedSinkBuilder(physicalDataType, clientProperties, false).build(); + assertTableSinkEqualsAndOfCorrectType(actualSink, expectedSink); + } + + @Test + public void createSqsTableSinkWithAsyncSinkOptionsArePropagated() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + DataType physicalDataType = sinkSchema.toPhysicalRowDataType(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.put("sink.batch.max-size", "1000"); + sinkOptions.put("sink.requests.max-inflight", "10"); + sinkOptions.put("sink.requests.max-buffered", "100"); + sinkOptions.put("sink.flush-buffer.size", "100"); + sinkOptions.put("sink.flush-buffer.timeout", "1000"); + + SqsDynamicSink actualSink = (SqsDynamicSink) createTableSink(sinkSchema, sinkOptions); + SqsDynamicSink expectedSink = + getDefaultExpectedSinkBuilder( + physicalDataType, getDefaultAwsClientProperties(), false) + .setMaxBatchSize(1000) + .setMaxInFlightRequests(10) + .setMaxBufferedRequests(100) + .setMaxBufferSizeInBytes(100) + .setMaxTimeInBufferMS(1000) + .build(); + + assertTableSinkEqualsAndOfCorrectType(actualSink, expectedSink); + } + + @Test + public void createSqsTableSinkWithInvalidRegionFailsOnCreate() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.put("aws.region", "invalid-region"); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> createTableSink(sinkSchema, sinkOptions)) + .havingCause() + .withMessageContaining("Invalid AWS region"); + } + + @Test + public void createSqsTableSinkWithInvalidCredentialsProviderFailsOnCreate() { + ResolvedSchema sinkSchema = defaultSinkSchema(); + Map sinkOptions = getDefaultTableOptionsBuilder().build(); + sinkOptions.put("aws.credentials.provider", "invalid-provider"); + assertThatExceptionOfType(ValidationException.class) + .isThrownBy(() -> createTableSink(sinkSchema, sinkOptions)) + .havingCause() + .withMessageContaining("Invalid AWS Credential Provider Type"); + } + + private ResolvedSchema defaultSinkSchema() { + return ResolvedSchema.of( + Column.physical("name", DataTypes.STRING()), + Column.physical("curr_id", DataTypes.BIGINT()), + Column.physical("time", DataTypes.TIMESTAMP(3))); + } + + private TableOptionsBuilder getDefaultTableOptionsBuilder() { + String format = TestFormatFactory.IDENTIFIER; + TableOptionsBuilder builder = + new TableOptionsBuilder("sqs", format) + // default table options + .withTableOption("queue-url", SQS_QUEUE_URL) + .withFormatOption(TestFormatFactory.DELIMITER, ",") + .withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true"); + Properties clientProperties = getDefaultAwsClientProperties(); + clientProperties.forEach((k, v) -> builder.withTableOption(k.toString(), v.toString())); + return builder; + } + + private SqsDynamicSink constructExpectedSink(DataType physicalDataType, boolean failOnError) { + return getDefaultExpectedSinkBuilder( + physicalDataType, getDefaultAwsClientProperties(), failOnError) + .build(); + } + + private Properties getDefaultAwsClientProperties() { + Properties clientProperties = new Properties(); + clientProperties.put("aws.region", "us-west-2"); + clientProperties.put("aws.credentials.provider", "BASIC"); + clientProperties.put("aws.credentials.provider.basic.accesskeyid", "aws_access_key_id"); + clientProperties.put("aws.credentials.provider.basic.secretkey", "secret_access_key"); + return clientProperties; + } + + private SqsDynamicSink.SqsDynamicSinkBuilder getDefaultExpectedSinkBuilder( + DataType physicalDataType, Properties clientProperties, boolean failOnError) { + return SqsDynamicSink.builder() + .setSqsQueueUrl(SQS_QUEUE_URL) + .setEncodingFormat(new TestFormatFactory.EncodingFormatMock(",")) + .setSqsClientProperties(clientProperties) + .setConsumedDataType(physicalDataType) + .setFailOnError(failOnError); + } + + private void assertTableSinkEqualsAndOfCorrectType( + DynamicTableSink actualSink, DynamicTableSink expectedSink) { + // verify that the constructed DynamicTableSink is as expected + assertThat(actualSink).isEqualTo(expectedSink); + + // verify the produced sink + DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider = + actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + Sink sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink(); + assertThat(sinkFunction).isInstanceOf(SqsSink.class); + } +}