From 1006e6ae47bcc6185b2782188958dd1e6f45634a Mon Sep 17 00:00:00 2001 From: Hong Liang Teoh Date: Mon, 12 Jun 2023 22:16:54 +0100 Subject: [PATCH] [FLINK-31990][Connectors/Kinesis] Use Configuration object instead of properties in KDS Source --- .../kinesis/source/KinesisStreamsSource.java | 27 ++++---- .../source/KinesisStreamsSourceBuilder.java | 11 ++-- .../KinesisStreamsSourceConfigConstants.java | 53 ++++++++-------- .../KinesisStreamsSourceConfigUtil.java | 18 +++--- .../KinesisStreamsSourceEnumerator.java | 26 +++----- .../source/KinesisStreamsSourceTest.java | 7 +-- .../KinesisStreamsSourceConfigUtilTest.java | 36 +++++------ .../KinesisStreamsSourceEnumeratorTest.java | 61 +++++++++---------- .../source/examples/SourceFromKinesis.java | 9 ++- 9 files changed, 115 insertions(+), 133 deletions(-) diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 82e17310..4ed24c01 100644 --- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -26,7 +26,7 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.aws.util.AWSClientUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -81,22 +81,22 @@ public class KinesisStreamsSource implements Source { private final String streamArn; - private final Properties consumerConfig; + private final Configuration sourceConfig; private final KinesisDeserializationSchema deserializationSchema; private final KinesisShardAssigner kinesisShardAssigner; KinesisStreamsSource( String streamArn, - Properties consumerConfig, + Configuration sourceConfig, KinesisDeserializationSchema deserializationSchema, KinesisShardAssigner kinesisShardAssigner) { Preconditions.checkNotNull(streamArn); Preconditions.checkArgument(!streamArn.isEmpty(), "stream ARN cannot be empty string"); - Preconditions.checkNotNull(consumerConfig); + Preconditions.checkNotNull(sourceConfig); Preconditions.checkNotNull(deserializationSchema); Preconditions.checkNotNull(kinesisShardAssigner); this.streamArn = streamArn; - this.consumerConfig = consumerConfig; + this.sourceConfig = sourceConfig; this.deserializationSchema = deserializationSchema; this.kinesisShardAssigner = kinesisShardAssigner; } @@ -127,7 +127,7 @@ public SourceReader createReader(SourceReaderContext reade // We create a new stream proxy for each split reader since they have their own independent // lifecycle. Supplier splitReaderSupplier = - () -> new PollingKinesisShardSplitReader(createKinesisStreamProxy(consumerConfig)); + () -> new PollingKinesisShardSplitReader(createKinesisStreamProxy(sourceConfig)); KinesisStreamsRecordEmitter recordEmitter = new KinesisStreamsRecordEmitter<>(deserializationSchema); @@ -135,7 +135,7 @@ public SourceReader createReader(SourceReaderContext reade elementsQueue, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get), recordEmitter, - ConfigurationUtils.createConfiguration(consumerConfig), + sourceConfig, readerContext); } @@ -154,8 +154,8 @@ public SplitEnumerator c return new KinesisStreamsSourceEnumerator( enumContext, streamArn, - consumerConfig, - createKinesisStreamProxy(consumerConfig), + sourceConfig, + createKinesisStreamProxy(sourceConfig), kinesisShardAssigner, checkpoint); } @@ -171,15 +171,18 @@ public SimpleVersionedSerializer getSplitSerializer() { return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer()); } - private KinesisStreamProxy createKinesisStreamProxy(Properties consumerConfig) { + private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) { SdkHttpClient httpClient = AWSGeneralUtil.createSyncHttpClient( AttributeMap.builder().build(), ApacheHttpClient.builder()); - AWSGeneralUtil.validateAwsCredentials(consumerConfig); + Properties kinesisClientProperties = new Properties(); + consumerConfig.addAllToProperties(kinesisClientProperties); + + AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties); KinesisClient kinesisClient = AWSClientUtil.createAwsSyncClient( - consumerConfig, + kinesisClientProperties, httpClient, KinesisClient.builder(), KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java index c9c0a5e7..dabfedac 100644 --- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java +++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java @@ -20,13 +20,12 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; import org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner; import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema; -import java.util.Properties; - /** * Builder to construct the {@link KinesisStreamsSource}. * @@ -53,7 +52,7 @@ @Experimental public class KinesisStreamsSourceBuilder { private String streamArn; - private Properties consumerConfig; + private Configuration sourceConfig; private KinesisDeserializationSchema deserializationSchema; private KinesisShardAssigner kinesisShardAssigner = ShardAssignerFactory.uniformShardAssigner(); @@ -62,8 +61,8 @@ public KinesisStreamsSourceBuilder setStreamArn(String streamArn) { return this; } - public KinesisStreamsSourceBuilder setConsumerConfig(Properties consumerConfig) { - this.consumerConfig = consumerConfig; + public KinesisStreamsSourceBuilder setSourceConfig(Configuration sourceConfig) { + this.sourceConfig = sourceConfig; return this; } @@ -87,6 +86,6 @@ public KinesisStreamsSourceBuilder setKinesisShardAssigner( public KinesisStreamsSource build() { return new KinesisStreamsSource<>( - streamArn, consumerConfig, deserializationSchema, kinesisShardAssigner); + streamArn, sourceConfig, deserializationSchema, kinesisShardAssigner); } } diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigConstants.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigConstants.java index 6e210ddd..76ab546d 100644 --- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigConstants.java +++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigConstants.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.kinesis.source.config; import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; /** Constants to be used with the KinesisStreamsSource. */ @Experimental @@ -30,30 +32,29 @@ public enum InitialPosition { AT_TIMESTAMP } - /** The initial position to start reading Kinesis streams from. */ - public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; - - public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); - - /** - * The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for - * STREAM_INITIAL_POSITION). - */ - public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; - - /** - * The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP - * is set for STREAM_INITIAL_POSITION). - */ - public static final String STREAM_TIMESTAMP_DATE_FORMAT = - "flink.stream.initpos.timestamp.format"; - - public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = - "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; - - /** The interval between each attempt to discover new shards. */ - public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = - "flink.shard.discovery.intervalmillis"; - - public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L; + public static final ConfigOption STREAM_INITIAL_POSITION = + ConfigOptions.key("flink.stream.initpos") + .enumType(InitialPosition.class) + .defaultValue(InitialPosition.LATEST) + .withDescription("The initial position to start reading Kinesis streams."); + + public static final ConfigOption STREAM_INITIAL_TIMESTAMP = + ConfigOptions.key("flink.stream.initpos.timestamp") + .stringType() + .noDefaultValue() + .withDescription( + "The initial timestamp at which to start reading from the Kinesis stream. This is used when AT_TIMESTAMP is configured for the STREAM_INITIAL_POSITION."); + + public static final ConfigOption STREAM_TIMESTAMP_DATE_FORMAT = + ConfigOptions.key("flink.stream.initpos.timestamp.format") + .stringType() + .defaultValue("yyyy-MM-dd'T'HH:mm:ss.SSSXXX") + .withDescription( + "The date format used to parse the initial timestamp at which to start reading from the Kinesis stream. This is used when AT_TIMESTAMP is configured for the STREAM_INITIAL_POSITION."); + + public static final ConfigOption SHARD_DISCOVERY_INTERVAL_MILLIS = + ConfigOptions.key("flink.shard.discovery.intervalmillis") + .longType() + .defaultValue(10000L) + .withDescription("The interval between each attempt to discover new shards."); } diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java index fc85d1f7..aa8079c6 100644 --- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java +++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtil.java @@ -19,14 +19,13 @@ package org.apache.flink.connector.kinesis.source.config; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.Properties; -import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; @@ -39,19 +38,18 @@ private KinesisStreamsSourceConfigUtil() { } /** - * Parses the timestamp in which to start consuming from the stream, from the given properties. + * Parses the timestamp in which to start consuming from the stream, from the given + * configuration. * - * @param consumerConfig the properties to parse timestamp from + * @param sourceConfig the configuration to parse timestamp from * @return the timestamp */ - public static Date parseStreamTimestampStartingPosition(final Properties consumerConfig) { - Preconditions.checkNotNull(consumerConfig); - String timestamp = consumerConfig.getProperty(STREAM_INITIAL_TIMESTAMP); + public static Date parseStreamTimestampStartingPosition(final Configuration sourceConfig) { + Preconditions.checkNotNull(sourceConfig); + String timestamp = sourceConfig.get(STREAM_INITIAL_TIMESTAMP); try { - String format = - consumerConfig.getProperty( - STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); + String format = sourceConfig.get(STREAM_TIMESTAMP_DATE_FORMAT); SimpleDateFormat customDateFormat = new SimpleDateFormat(format); return customDateFormat.parse(timestamp); } catch (IllegalArgumentException | NullPointerException exception) { diff --git a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java index 5d5c0be7..14bc65db 100644 --- a/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java +++ b/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; @@ -48,11 +49,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; -import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS; -import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition; @@ -69,7 +67,7 @@ public class KinesisStreamsSourceEnumerator private final SplitEnumeratorContext context; private final String streamArn; - private final Properties consumerConfig; + private final Configuration sourceConfig; private final StreamProxy streamProxy; private final KinesisShardAssigner shardAssigner; private final ShardAssignerContext shardAssignerContext; @@ -83,13 +81,13 @@ public class KinesisStreamsSourceEnumerator public KinesisStreamsSourceEnumerator( SplitEnumeratorContext context, String streamArn, - Properties consumerConfig, + Configuration sourceConfig, StreamProxy streamProxy, KinesisShardAssigner shardAssigner, KinesisStreamsSourceEnumeratorState state) { this.context = context; this.streamArn = streamArn; - this.consumerConfig = consumerConfig; + this.sourceConfig = sourceConfig; this.streamProxy = streamProxy; this.shardAssigner = shardAssigner; this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); @@ -108,11 +106,7 @@ public void start() { context.callAsync(this::initialDiscoverSplits, this::assignSplits); } - final long shardDiscoveryInterval = - Long.parseLong( - consumerConfig.getProperty( - SHARD_DISCOVERY_INTERVAL_MILLIS, - String.valueOf(DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); + final long shardDiscoveryInterval = sourceConfig.get(SHARD_DISCOVERY_INTERVAL_MILLIS); context.callAsync( this::periodicallyDiscoverSplits, this::assignSplits, @@ -163,13 +157,7 @@ public void close() throws IOException { private List initialDiscoverSplits() { List shards = streamProxy.listShards(streamArn, lastSeenShardId); - return mapToSplits( - shards, - InitialPosition.valueOf( - consumerConfig - .getOrDefault( - STREAM_INITIAL_POSITION, DEFAULT_STREAM_INITIAL_POSITION) - .toString())); + return mapToSplits(shards, sourceConfig.get(STREAM_INITIAL_POSITION)); } /** @@ -199,7 +187,7 @@ private List mapToSplits( case AT_TIMESTAMP: startingPosition = StartingPosition.fromTimestamp( - parseStreamTimestampStartingPosition(consumerConfig).toInstant()); + parseStreamTimestampStartingPosition(sourceConfig).toInstant()); break; case TRIM_HORIZON: default: diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceTest.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceTest.java index 58396563..c321e873 100644 --- a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceTest.java +++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceTest.java @@ -20,12 +20,11 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; import org.junit.jupiter.api.Test; -import java.util.Properties; - import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -33,11 +32,11 @@ class KinesisStreamsSourceTest { @Test void testSupportsContinuousUnboundedOnly() throws Exception { - final Properties sourceConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); final KinesisStreamsSource source = KinesisStreamsSource.builder() .setStreamArn(STREAM_ARN) - .setConsumerConfig(sourceConfig) + .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) .build(); diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java index dd344edd..89a4a2ff 100644 --- a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java +++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigUtilTest.java @@ -18,13 +18,13 @@ package org.apache.flink.connector.kinesis.source.config; +import org.apache.flink.configuration.Configuration; + import org.junit.jupiter.api.Test; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.Properties; -import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP; import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -36,14 +36,14 @@ class KinesisStreamsSourceConfigUtilTest { void testParseStreamTimestampUsingDefaultFormat() throws Exception { String timestamp = "2023-04-13T09:18:00.0+01:00"; Date expectedTimestamp = - new SimpleDateFormat(DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT).parse(timestamp); + new SimpleDateFormat(STREAM_TIMESTAMP_DATE_FORMAT.defaultValue()).parse(timestamp); - Properties consumerProperties = new Properties(); - consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp); + Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, timestamp); assertThat( KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( - consumerProperties)) + sourceConfig)) .isEqualTo(expectedTimestamp); } @@ -53,13 +53,13 @@ void testParseStreamTimestampUsingCustomFormat() throws Exception { String timestamp = "2023-04-13T09:23"; Date expectedTimestamp = new SimpleDateFormat(format).parse(timestamp); - Properties consumerProperties = new Properties(); - consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, timestamp); - consumerProperties.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, format); + Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, timestamp); + sourceConfig.set(STREAM_TIMESTAMP_DATE_FORMAT, format); assertThat( KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( - consumerProperties)) + sourceConfig)) .isEqualTo(expectedTimestamp); } @@ -68,12 +68,12 @@ void testParseStreamTimestampEpoch() { long epoch = 1681910583L; Date expectedTimestamp = new Date(epoch * 1000); - Properties consumerProperties = new Properties(); - consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, String.valueOf(epoch)); + Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, String.valueOf(epoch)); assertThat( KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( - consumerProperties)) + sourceConfig)) .isEqualTo(expectedTimestamp); } @@ -81,24 +81,24 @@ void testParseStreamTimestampEpoch() { void testParseStreamTimestampParseError() { String badTimestamp = "badTimestamp"; - Properties consumerProperties = new Properties(); - consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, badTimestamp); + Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, badTimestamp); assertThatExceptionOfType(NumberFormatException.class) .isThrownBy( () -> KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( - consumerProperties)); + sourceConfig)); } @Test void testParseStreamTimestampTimestampNotSpecified() { - Properties consumerProperties = new Properties(); + Configuration sourceConfig = new Configuration(); assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy( () -> KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition( - consumerProperties)); + sourceConfig)); } } diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java index d21594ca..da9d344c 100644 --- a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java +++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java @@ -21,7 +21,7 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; -import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition; import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; @@ -42,10 +42,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Properties; import java.util.Set; import java.util.stream.Stream; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; +import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP; import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.TestKinesisStreamProxy; import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy; import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; @@ -70,19 +71,16 @@ void testStartWithoutStateDiscoversAndAssignsShards( try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); - consumerConfig.setProperty( - KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, - initialPosition.name()); - consumerConfig.setProperty( - KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP, initialTimestamp); + final Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_POSITION, initialPosition); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, initialTimestamp); // Given enumerator is initialized with no state KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -170,19 +168,16 @@ void testStartWithStateDoesNotAssignCompletedShards( KinesisStreamsSourceEnumeratorState state = new KinesisStreamsSourceEnumeratorState(Collections.emptySet(), lastSeenShard); - final Properties consumerConfig = new Properties(); - consumerConfig.setProperty( - KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, - initialPosition.name()); - consumerConfig.setProperty( - KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP, initialTimestamp); + final Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_POSITION, initialPosition); + sourceConfig.set(STREAM_INITIAL_TIMESTAMP, initialTimestamp); // Given enumerator is initialised with state KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), state); @@ -354,12 +349,12 @@ void testAssignSplitsSurfacesThrowableIfUnableToListShards() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -383,12 +378,12 @@ void testAssignSplitsHandlesRepeatSplitsGracefully() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -436,12 +431,12 @@ void testAssignSplitWithoutRegisteredReaders() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -494,12 +489,12 @@ void testAssignSplitWithInsufficientRegisteredReaders() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(2)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -560,12 +555,12 @@ void testRestoreFromStateRemembersLastSeenShardId() throws Throwable { MockSplitEnumeratorContext restoredContext = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -592,7 +587,7 @@ void testRestoreFromStateRemembersLastSeenShardId() throws Throwable { new KinesisStreamsSourceEnumerator( restoredContext, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), snapshottedState); @@ -613,12 +608,12 @@ void testHandleUnrecognisedSourceEventIsNoOp() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -633,12 +628,12 @@ void testCloseClosesStreamProxy() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { TestKinesisStreamProxy streamProxy = getTestStreamProxy(); - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); @@ -651,12 +646,12 @@ void testCloseClosesStreamProxy() throws Throwable { private KinesisStreamsSourceEnumerator getSimpleEnumeratorWithNoState( MockSplitEnumeratorContext context, StreamProxy streamProxy) { - final Properties consumerConfig = new Properties(); + final Configuration sourceConfig = new Configuration(); KinesisStreamsSourceEnumerator enumerator = new KinesisStreamsSourceEnumerator( context, STREAM_ARN, - consumerConfig, + sourceConfig, streamProxy, ShardAssignerFactory.uniformShardAssigner(), null); diff --git a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/examples/SourceFromKinesis.java b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/examples/SourceFromKinesis.java index 9001f5d7..bd4501fe 100644 --- a/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/examples/SourceFromKinesis.java +++ b/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/examples/SourceFromKinesis.java @@ -21,13 +21,12 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.aws.config.AWSConfigConstants; import org.apache.flink.connector.kinesis.source.KinesisStreamsSource; import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import java.util.Properties; - /** * An example application demonstrating how to use the {@link KinesisStreamsSource} to read from * KDS. @@ -39,13 +38,13 @@ public static void main(String[] args) throws Exception { env.enableCheckpointing(10_000); env.setParallelism(2); - Properties consumerConfig = new Properties(); - consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + Configuration sourceConfig = new Configuration(); + sourceConfig.setString(AWSConfigConstants.AWS_REGION, "us-east-1"); KinesisStreamsSource kdsSource = KinesisStreamsSource.builder() .setStreamArn( "arn:aws:kinesis:us-east-1:290038087681:stream/LoadTestBeta_Input_35") - .setConsumerConfig(consumerConfig) + .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) .build();