diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
index 6c0a944d..5f24a435 100644
--- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
+++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -21,9 +21,9 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
-import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.util.DockerImageVersions;
diff --git a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
index 5597b57f..3b217a67 100644
--- a/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
+++ b/flink-connector-aws-e2e-tests/flink-connector-kinesis-e2e-tests/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
@@ -19,9 +19,9 @@
package org.apache.flink.streaming.kinesis.test;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
-import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
import org.apache.flink.streaming.kinesis.test.model.Order;
import org.apache.flink.test.resources.ResourceTestUtils;
diff --git a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
index 3a41abc2..0a745776 100644
--- a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
+++ b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
@@ -18,7 +18,7 @@
package org.apache.flink.glue.schema.registry.test;
import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.connector.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java b/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java
index c9336b9e..397b77a8 100644
--- a/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java
+++ b/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java
@@ -18,7 +18,7 @@
package org.apache.flink.glue.schema.registry.test.json;
import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.connector.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJsonDeserializationSchema;
import org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/flink-connector-aws-kinesis-streams/pom.xml b/flink-connector-aws-kinesis-streams/pom.xml
index 5e8c4542..e5fdcad6 100644
--- a/flink-connector-aws-kinesis-streams/pom.xml
+++ b/flink-connector-aws-kinesis-streams/pom.xml
@@ -57,6 +57,10 @@ under the License.
software.amazon.awssdk
kinesis
+
+ software.amazon.awssdk
+ aws-core
+
software.amazon.awssdk
@@ -69,6 +73,11 @@ under the License.
flink-table-common
${flink.version}
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java
index aa8079c6..77f6df5c 100644
--- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java
@@ -20,14 +20,20 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.util.Preconditions;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.Properties;
+import static org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsConfiguration;
+import static org.apache.flink.connector.base.table.util.ConfigurationValidatorUtil.validateOptionalDateProperty;
+import static org.apache.flink.connector.base.table.util.ConfigurationValidatorUtil.validateOptionalPositiveLongProperty;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/** Utility functions to use with {@link KinesisStreamsSourceConfigConstants}. */
@Internal
@@ -58,4 +64,51 @@ public static Date parseStreamTimestampStartingPosition(final Configuration sour
return new Date((long) (Double.parseDouble(timestamp) * 1000));
}
}
+
+ /**
+ * Validate configuration properties for {@link
+ * org.apache.flink.connector.kinesis.source.KinesisStreamsSource}.
+ */
+ public static void validateStreamSourceConfiguration(Configuration config) {
+ checkNotNull(config, "config can not be null");
+
+ Properties consumerProperties = new Properties();
+ config.addAllToProperties(consumerProperties);
+ validateAwsConfiguration(consumerProperties);
+
+ if (!(config.containsKey(AWSConfigConstants.AWS_REGION)
+ || config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) {
+ // per validation in AwsClientBuilder
+ throw new IllegalArgumentException(
+ String.format(
+ "For KinesisStreamsSource AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.",
+ AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
+ }
+
+ if (config.contains(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION)) {
+ KinesisStreamsSourceConfigConstants.InitialPosition initPosType =
+ config.get(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION);
+
+ // specified initial timestamp in stream when using AT_TIMESTAMP
+ if (initPosType == KinesisStreamsSourceConfigConstants.InitialPosition.AT_TIMESTAMP) {
+ if (!config.contains(STREAM_INITIAL_TIMESTAMP)) {
+ throw new IllegalArgumentException(
+ "Please set value for initial timestamp ('"
+ + STREAM_INITIAL_TIMESTAMP
+ + "') when using AT_TIMESTAMP initial position.");
+ }
+ validateOptionalDateProperty(
+ consumerProperties,
+ String.valueOf(STREAM_INITIAL_TIMESTAMP),
+ config.getString(STREAM_TIMESTAMP_DATE_FORMAT),
+ "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+ + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
+ }
+ }
+
+ validateOptionalPositiveLongProperty(
+ consumerProperties,
+ String.valueOf(KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS),
+ "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
+ }
}
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/model/Metadata.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/model/Metadata.java
new file mode 100644
index 00000000..e4f42951
--- /dev/null
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/model/Metadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.model;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** Internal type for enumerating available metadata. */
+public enum Metadata {
+ Timestamp("timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull()),
+ SequenceNumber("sequence-number", DataTypes.VARCHAR(128).notNull()),
+ ShardId("shard-id", DataTypes.VARCHAR(128).notNull());
+
+ private final String fieldName;
+ private final DataType dataType;
+
+ Metadata(String fieldName, DataType dataType) {
+ this.fieldName = fieldName;
+ this.dataType = dataType;
+ }
+
+ public String getFieldName() {
+ return this.fieldName;
+ }
+
+ public DataType getDataType() {
+ return this.dataType;
+ }
+
+ public static Metadata of(String fieldName) {
+ return Arrays.stream(Metadata.values())
+ .filter(m -> Objects.equals(m.fieldName, fieldName))
+ .findFirst()
+ .orElseThrow(
+ () -> {
+ String msg =
+ "Cannot find Metadata instance for field name '"
+ + fieldName
+ + "'";
+ return new IllegalArgumentException(msg);
+ });
+ }
+}
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
index 4f5bb8a4..353d4888 100644
--- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java
@@ -42,6 +42,12 @@ public class KinesisConnectorOptions extends AsyncSinkConnectorOptions {
.noDefaultValue()
.withDescription("Name of the Kinesis stream backing this table.");
+ public static final ConfigOption STREAM_ARN =
+ ConfigOptions.key("stream.arn")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("ARN of the Kinesis stream backing this table.");
+
public static final ConfigOption AWS_REGION =
ConfigOptions.key("aws.region")
.stringType()
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSource.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSource.java
new file mode 100644
index 00000000..52bd2b76
--- /dev/null
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSource.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
+import org.apache.flink.connector.kinesis.source.model.Metadata;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Kinesis-backed {@link ScanTableSource}. */
+public class KinesisDynamicSource implements ScanTableSource, SupportsReadingMetadata {
+
+ private final DecodingFormat> decodingFormat;
+ private final DataType physicalDataType;
+ private final String stream;
+
+ /** Configuration for the Kinesis consumer. */
+ private final Configuration sourceConfig;
+
+ /** List of read-only metadata fields that the source can provide upstream upon request. */
+ private static final Map READABLE_METADATA =
+ new HashMap() {
+ {
+ for (Metadata metadata : Metadata.values()) {
+ put(metadata.getFieldName(), metadata.getDataType());
+ }
+ }
+ };
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ private DataType producedDataType;
+
+ /** Metadata that is requested to be appended at the end of a physical source row. */
+ private List requestedMetadataFields;
+
+ public KinesisDynamicSource(
+ @Nullable DataType physicalDataType,
+ String stream,
+ Configuration sourceConfig,
+ DecodingFormat> decodingFormat) {
+ this(
+ physicalDataType,
+ stream,
+ sourceConfig,
+ decodingFormat,
+ physicalDataType,
+ Collections.emptyList());
+ }
+
+ public KinesisDynamicSource(
+ @Nullable DataType physicalDataType,
+ String stream,
+ Configuration sourceConfig,
+ DecodingFormat> decodingFormat,
+ DataType producedDataType,
+ List requestedMetadataFields) {
+ this.physicalDataType =
+ Preconditions.checkNotNull(
+ physicalDataType, "Physical data type must not be null.");
+ this.stream = Preconditions.checkNotNull(stream, "Stream must not be null.");
+ this.sourceConfig =
+ Preconditions.checkNotNull(
+ sourceConfig,
+ "Properties for the Flink Kinesis consumer must not be null.");
+ this.decodingFormat =
+ Preconditions.checkNotNull(decodingFormat, "Decoding format must not be null.");
+ this.producedDataType =
+ Preconditions.checkNotNull(
+ producedDataType, "Produced data type must not be null.");
+ this.requestedMetadataFields =
+ Preconditions.checkNotNull(
+ requestedMetadataFields, "Requested metadata fields must not be null.");
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ DeserializationSchema deserializationSchema =
+ decodingFormat.createRuntimeDecoder(scanContext, physicalDataType);
+
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream produceDataStream(
+ ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
+
+ KinesisStreamsSource kdsSource =
+ KinesisStreamsSource.builder()
+ .setStreamArn(stream)
+ .setSourceConfig(sourceConfig)
+ .setDeserializationSchema(deserializationSchema)
+ .build();
+
+ DataStreamSource sourceStream =
+ execEnv.fromSource(
+ kdsSource, WatermarkStrategy.noWatermarks(), "Kinesis source");
+
+ return sourceStream;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new KinesisDynamicSource(
+ physicalDataType,
+ stream,
+ sourceConfig,
+ decodingFormat,
+ producedDataType,
+ requestedMetadataFields);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Kinesis table source";
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // SupportsReadingMetadata
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public Map listReadableMetadata() {
+ return READABLE_METADATA;
+ }
+
+ @Override
+ public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {
+ this.requestedMetadataFields =
+ metadataKeys.stream().map(Metadata::of).collect(Collectors.toList());
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KinesisDynamicSource that = (KinesisDynamicSource) o;
+ return Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(requestedMetadataFields, that.requestedMetadataFields)
+ && Objects.equals(stream, that.stream)
+ && Objects.equals(sourceConfig, that.sourceConfig)
+ && Objects.equals(decodingFormat, that.decodingFormat);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ requestedMetadataFields, producedDataType, stream, sourceConfig, decodingFormat);
+ }
+
+ /** Builder class for {@link KinesisDynamicSource}. */
+ @Internal
+ public static class KinesisDynamicTableSourceBuilder {
+ private DataType consumedDataType = null;
+ private String stream = null;
+ private DecodingFormat> decodingFormat = null;
+
+ private Configuration sourceConfig = null;
+
+ public KinesisDynamicSource.KinesisDynamicTableSourceBuilder setConsumedDataType(
+ DataType consumedDataType) {
+ this.consumedDataType = consumedDataType;
+ return this;
+ }
+
+ public KinesisDynamicSource.KinesisDynamicTableSourceBuilder setStream(String stream) {
+ this.stream = stream;
+ return this;
+ }
+
+ public KinesisDynamicSource.KinesisDynamicTableSourceBuilder setDecodingFormat(
+ DecodingFormat> decodingFormat) {
+ this.decodingFormat = decodingFormat;
+ return this;
+ }
+
+ public KinesisDynamicSource.KinesisDynamicTableSourceBuilder setSourceConfig(
+ Configuration sourceConfig) {
+ this.sourceConfig = sourceConfig;
+ return this;
+ }
+
+ public KinesisDynamicSource build() {
+ return new KinesisDynamicSource(consumedDataType, stream, sourceConfig, decodingFormat);
+ }
+ }
+}
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
index 5aa0931d..1b990970 100644
--- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java
@@ -23,10 +23,11 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
-import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils;
+import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorSinkOptionsUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.logical.RowType;
import java.util.HashSet;
@@ -39,12 +40,13 @@
import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER;
import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER_FIELD_DELIMITER;
import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.STREAM;
-import static org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.KINESIS_CLIENT_PROPERTIES_KEY;
+import static org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorSinkOptionsUtils.KINESIS_CLIENT_PROPERTIES_KEY;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
/** Factory for creating {@link KinesisDynamicSink}. */
@Internal
-public class KinesisDynamicTableSinkFactory extends AsyncDynamicTableSinkFactory {
+public class KinesisDynamicTableSinkFactory extends AsyncDynamicTableSinkFactory
+ implements DynamicTableFactory {
public static final String IDENTIFIER = "kinesis";
@Override
@@ -52,8 +54,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);
- KinesisStreamsConnectorOptionsUtils optionsUtils =
- new KinesisStreamsConnectorOptionsUtils(
+ KinesisStreamsConnectorSinkOptionsUtils optionsUtils =
+ new KinesisStreamsConnectorSinkOptionsUtils(
factoryContext.getResolvedOptions(),
factoryContext.getTableOptions(),
(RowType) factoryContext.getPhysicalDataType().getLogicalType(),
@@ -105,8 +107,8 @@ public Set> optionalOptions() {
options.add(SINK_PARTITIONER);
options.add(SINK_PARTITIONER_FIELD_DELIMITER);
options.add(SINK_FAIL_ON_ERROR);
- return KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper.addDeprecatedKeys(
- options);
+ return KinesisStreamsConnectorSinkOptionsUtils.KinesisProducerOptionsMapper
+ .addDeprecatedKeys(options);
}
private static void validateKinesisPartitioner(
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactory.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactory.java
new file mode 100644
index 00000000..f44c7634
--- /dev/null
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorSourceOptionsUtil;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.STREAM_ARN;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/** Factory for creating {@link KinesisDynamicSource}. */
+public class KinesisDynamicTableSourceFactory implements DynamicTableSourceFactory {
+ public static final String IDENTIFIER = "kinesis-source";
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ ReadableConfig tableOptions = helper.getOptions();
+
+ DecodingFormat> decodingFormat =
+ helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT);
+
+ ResolvedCatalogTable catalogTable = context.getCatalogTable();
+
+ KinesisStreamsConnectorSourceOptionsUtil kinesisStreamsConnectorSourceOptionsUtil =
+ new KinesisStreamsConnectorSourceOptionsUtil(
+ catalogTable.getOptions(), tableOptions.get(STREAM_ARN));
+
+ Configuration sourceConfig =
+ kinesisStreamsConnectorSourceOptionsUtil.getValidatedSourceConfigurations();
+
+ KinesisDynamicSource.KinesisDynamicTableSourceBuilder builder =
+ new KinesisDynamicSource.KinesisDynamicTableSourceBuilder();
+
+ builder.setStream(tableOptions.get(STREAM_ARN))
+ .setDecodingFormat(decodingFormat)
+ .setConsumedDataType(context.getPhysicalRowDataType())
+ .setSourceConfig(sourceConfig);
+
+ KinesisDynamicSource kinesisDynamicSource = builder.build();
+ return kinesisDynamicSource;
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ final Set> options = new HashSet<>();
+ options.add(STREAM_ARN);
+ options.add(FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return new HashSet<>();
+ }
+}
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorSinkOptionsUtils.java
similarity index 99%
rename from flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
rename to flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorSinkOptionsUtils.java
index 4b30fe05..b295e4f9 100644
--- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorSinkOptionsUtils.java
@@ -58,7 +58,7 @@
* for handling each specified set of options.
*/
@Internal
-public class KinesisStreamsConnectorOptionsUtils {
+public class KinesisStreamsConnectorSinkOptionsUtils {
/** Key for accessing kinesisAsyncClient properties. */
public static final String KINESIS_CLIENT_PROPERTIES_KEY = "sink.client.properties";
@@ -79,7 +79,7 @@ public class KinesisStreamsConnectorOptionsUtils {
KinesisProducerOptionsMapper.KINESIS_PRODUCER_PREFIX
};
- public KinesisStreamsConnectorOptionsUtils(
+ public KinesisStreamsConnectorSinkOptionsUtils(
Map options,
ReadableConfig tableOptions,
RowType physicalType,
diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorSourceOptionsUtil.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorSourceOptionsUtil.java
new file mode 100644
index 00000000..4acd6751
--- /dev/null
+++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorSourceOptionsUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.table.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.connector.aws.table.util.AWSOptionUtils;
+import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/** Class for handling Kinesis Consumer specific table options. */
+@PublicEvolving
+public class KinesisStreamsConnectorSourceOptionsUtil extends AWSOptionUtils {
+ private final Map resolvedOptions;
+ private final String streamArn;
+ /**
+ * Prefix for properties defined in {
+ * org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants} that are
+ * delegated to { org.apache.flink.streaming.connectors.kinesis.source.KinesisStreamsSource}.
+ */
+ public static final String CONSUMER_PREFIX = "scan.";
+
+ public KinesisStreamsConnectorSourceOptionsUtil(
+ Map resolvedOptions, String streamArn) {
+ super(resolvedOptions);
+ this.resolvedOptions = resolvedOptions;
+ this.streamArn = streamArn;
+ }
+
+ @Override
+ public Map getProcessedResolvedOptions() {
+ Map mappedResolvedOptions = super.getProcessedResolvedOptions();
+ for (String key : resolvedOptions.keySet()) {
+ if (key.startsWith(CONSUMER_PREFIX)) {
+ mappedResolvedOptions.put(translateConsumerKey(key), resolvedOptions.get(key));
+ }
+ }
+ return mappedResolvedOptions;
+ }
+
+ @Override
+ public List getNonValidatedPrefixes() {
+ return Arrays.asList(AWS_PROPERTIES_PREFIX, CONSUMER_PREFIX);
+ }
+
+ public Configuration getValidatedSourceConfigurations() {
+ Configuration sourceConfig = Configuration.fromMap(this.getProcessedResolvedOptions());
+ sourceConfig.addAll(
+ ConfigurationUtils.createConfiguration(super.getValidatedConfigurations()));
+
+ KinesisStreamsSourceConfigUtil.validateStreamSourceConfiguration(sourceConfig);
+
+ return sourceConfig;
+ }
+
+ /** Map {@code scan.foo.bar} to {@code flink.foo.bar}. */
+ private static String translateConsumerKey(String key) {
+ String result = "flink." + key.substring(CONSUMER_PREFIX.length());
+
+ if (result.endsWith("initpos-timestamp-format")) {
+ return result.replace("initpos-timestamp-format", "initpos.timestamp.format");
+ } else if (result.endsWith("initpos-timestamp")) {
+ return result.replace("initpos-timestamp", "initpos.timestamp");
+ } else {
+ return result;
+ }
+ }
+}
diff --git a/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index adb7ea21..9bcab051 100644
--- a/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory
+org.apache.flink.connector.kinesis.table.KinesisDynamicTableSourceFactory
\ No newline at end of file
diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
index cee11e55..5b7135c1 100644
--- a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
+++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java
@@ -23,7 +23,7 @@
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
-import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.connector.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java
new file mode 100644
index 00000000..52fee6af
--- /dev/null
+++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.table;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.connector.kinesis.source.KinesisStreamsSource;
+import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import org.apache.flink.connector.kinesis.source.model.Metadata;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.factories.TableOptionsBuilder;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.connector.kinesis.source.model.Metadata.ShardId;
+import static org.apache.flink.connector.kinesis.source.model.Metadata.Timestamp;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KinesisDynamicSource} created by {@link KinesisDynamicTableSourceFactory}. */
+public class KinesisDynamicTableSourceFactoryTest extends TestLogger {
+
+ private static final String STREAM_ARN =
+ "arn:aws:kinesis:us-east-1:123456789123:stream/myStream";
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ // --------------------------------------------------------------------------------------------
+ // Positive tests
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testGoodTableSource() {
+ ResolvedSchema sourceSchema = defaultSourceSchema();
+ Map sourceOptions = defaultTableOptions().build();
+
+ // Construct actual DynamicTableSource using FactoryUtil
+ KinesisDynamicSource actualSource =
+ (KinesisDynamicSource) createTableSource(sourceSchema, sourceOptions);
+
+ // Construct expected DynamicTableSink using factory under test
+ KinesisDynamicSource expectedSource =
+ new KinesisDynamicSource(
+ sourceSchema.toPhysicalRowDataType(),
+ STREAM_ARN,
+ defaultSourceConfig(),
+ new TestFormatFactory.DecodingFormatMock(",", true));
+
+ // verify that the constructed DynamicTableSink is as expected
+ assertThat(actualSource).isEqualTo(expectedSource);
+
+ // verify that the copy of the constructed DynamicTableSink is as expected
+ assertThat(actualSource.copy()).isEqualTo(expectedSource);
+
+ // verify produced source
+ ScanTableSource.ScanRuntimeProvider functionProvider =
+ actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+
+ assertKinesisStreamsSource(functionProvider);
+ }
+
+ @Test
+ public void testGoodTableSourceWithMetadataFields() {
+ ResolvedSchema sourceSchema = defaultSourceSchema();
+ Map sourceOptions = defaultTableOptions().build();
+
+ Metadata[] requestedMetadata = new Metadata[] {ShardId, Timestamp};
+ List metadataKeys = Arrays.asList(ShardId.getFieldName(), Timestamp.getFieldName());
+ DataType producedDataType = getProducedType(sourceSchema, requestedMetadata);
+
+ // Construct actual DynamicTableSource using FactoryUtil
+ KinesisDynamicSource actualSource =
+ (KinesisDynamicSource) createTableSource(sourceSchema, sourceOptions);
+ actualSource.applyReadableMetadata(metadataKeys, producedDataType);
+
+ // Construct expected DynamicTableSink using factory under test
+ KinesisDynamicSource expectedSource =
+ new KinesisDynamicSource(
+ sourceSchema.toPhysicalRowDataType(),
+ STREAM_ARN,
+ defaultSourceConfig(),
+ new TestFormatFactory.DecodingFormatMock(",", true),
+ producedDataType,
+ Arrays.asList(requestedMetadata));
+
+ // verify that the constructed DynamicTableSource is as expected
+ assertThat(actualSource).isEqualTo(expectedSource);
+
+ // verify that the copy of the constructed DynamicTableSource is as expected
+ assertThat(actualSource.copy()).isEqualTo(expectedSource);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ private ResolvedSchema defaultSourceSchema() {
+ return new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("name", DataTypes.STRING()),
+ Column.physical("curr_id", DataTypes.BIGINT()),
+ Column.physical("time", DataTypes.TIMESTAMP(3)),
+ Column.computed(
+ "next_id",
+ ResolvedExpressionMock.of(DataTypes.BIGINT(), "curr_id + 1"))),
+ Collections.singletonList(
+ WatermarkSpec.of(
+ "time",
+ ResolvedExpressionMock.of(
+ DataTypes.TIMESTAMP(3), "time - INTERVAL '5' SECOND"))),
+ null);
+ }
+
+ private TableOptionsBuilder defaultTableOptions() {
+ String connector = KinesisDynamicTableSourceFactory.IDENTIFIER;
+ String format = TestFormatFactory.IDENTIFIER;
+ return new TableOptionsBuilder(connector, format)
+ // default table options
+ .withTableOption(KinesisConnectorOptions.STREAM_ARN, STREAM_ARN)
+ .withTableOption("aws.region", "us-west-2")
+ .withTableOption("aws.credentials.provider", "BASIC")
+ .withTableOption("aws.credentials.basic.accesskeyid", "ververicka")
+ .withTableOption("aws.credentials.basic.secretkey", "SuperSecretSecretSquirrel")
+ .withTableOption("scan.stream.initpos", "AT_TIMESTAMP")
+ .withTableOption("scan.stream.initpos-timestamp-format", "yyyy-MM-dd'T'HH:mm:ss")
+ .withTableOption("scan.stream.initpos-timestamp", "2014-10-22T12:00:00")
+
+ // default format options
+ .withFormatOption(TestFormatFactory.DELIMITER, ",")
+ .withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
+ }
+
+ private Configuration defaultSourceConfig() {
+ return ConfigurationUtils.createConfiguration(
+ new Properties() {
+ {
+ setProperty("aws.region", "us-west-2");
+ setProperty("aws.credentials.provider", "BASIC");
+ setProperty("aws.credentials.provider.basic.accesskeyid", "ververicka");
+ setProperty(
+ "aws.credentials.provider.basic.secretkey",
+ "SuperSecretSecretSquirrel");
+ setProperty("flink.stream.initpos", "AT_TIMESTAMP");
+ setProperty(
+ "flink.stream.initpos.timestamp.format", "yyyy-MM-dd'T'HH:mm:ss");
+ setProperty("flink.stream.initpos.timestamp", "2014-10-22T12:00:00");
+ }
+ });
+ }
+
+ private DataType getProducedType(ResolvedSchema schema, Metadata... requestedMetadata) {
+ Stream physicalFields =
+ IntStream.range(0, schema.getColumnCount())
+ .mapToObj(
+ i ->
+ DataTypes.FIELD(
+ schema.getColumnNames().get(i),
+ schema.getColumnDataTypes().get(i)));
+ Stream metadataFields =
+ Arrays.stream(requestedMetadata)
+ .map(m -> DataTypes.FIELD(m.name(), m.getDataType()));
+ Stream allFields = Stream.concat(physicalFields, metadataFields);
+
+ return DataTypes.ROW(allFields.toArray(DataTypes.Field[]::new));
+ }
+
+ private T as(Object object, Class clazz) {
+ assertThat(object).isInstanceOf(clazz);
+ return clazz.cast(object);
+ }
+
+ private KinesisStreamsSource> assertKinesisStreamsSource(
+ ScanTableSource.ScanRuntimeProvider provider) {
+ assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+ final DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) provider;
+ final Transformation transformation =
+ dataStreamScanProvider
+ .produceDataStream(
+ n -> Optional.empty(),
+ StreamExecutionEnvironment.createLocalEnvironment())
+ .getTransformation();
+ assertThat(transformation).isInstanceOf(SourceTransformation.class);
+ SourceTransformation
+ sourceTransformation =
+ (SourceTransformation<
+ RowData,
+ KinesisShardSplit,
+ KinesisStreamsSourceEnumeratorState>)
+ transformation;
+ assertThat(sourceTransformation.getSource()).isInstanceOf(KinesisStreamsSource.class);
+ return (KinesisStreamsSource>) sourceTransformation.getSource();
+ }
+}
diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
index 141e8709..ad8e9fe7 100644
--- a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
+++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.connector.kinesis.table.util;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper;
+import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorSinkOptionsUtils.KinesisProducerOptionsMapper;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -38,7 +38,7 @@ void testProducerVerifyCertificateOptionsMapping() {
expectedOptions.put(AWSConfigConstants.TRUST_ALL_CERTIFICATES, "true");
KinesisProducerOptionsMapper producerOptionsMapper =
- new KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper(
+ new KinesisStreamsConnectorSinkOptionsUtils.KinesisProducerOptionsMapper(
deprecatedOptions);
Map actualMappedProperties =
producerOptionsMapper.mapDeprecatedClientOptions();
diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/testutils/KinesaliteContainer.java
similarity index 99%
rename from flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
rename to flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/testutils/KinesaliteContainer.java
index b4044a52..f72fe2b9 100644
--- a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java
+++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/testutils/KinesaliteContainer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.connectors.kinesis.testutils;
+package org.apache.flink.connector.kinesis.testutils;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
diff --git a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 6066d826..9405e801 100644
--- a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -21,7 +21,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.connector.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.state.FunctionInitializationContext;