Skip to content

Commit

Permalink
[FLINK-31990][Connectors/Kinesis] Use Configuration object instead of…
Browse files Browse the repository at this point in the history
… properties in KDS Source
  • Loading branch information
hlteoh37 authored and dannycranmer committed Jun 15, 2023
1 parent d1a3f63 commit 1006e6a
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand Down Expand Up @@ -81,22 +81,22 @@ public class KinesisStreamsSource<T>
implements Source<T, KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {

private final String streamArn;
private final Properties consumerConfig;
private final Configuration sourceConfig;
private final KinesisDeserializationSchema<T> deserializationSchema;
private final KinesisShardAssigner kinesisShardAssigner;

KinesisStreamsSource(
String streamArn,
Properties consumerConfig,
Configuration sourceConfig,
KinesisDeserializationSchema<T> deserializationSchema,
KinesisShardAssigner kinesisShardAssigner) {
Preconditions.checkNotNull(streamArn);
Preconditions.checkArgument(!streamArn.isEmpty(), "stream ARN cannot be empty string");
Preconditions.checkNotNull(consumerConfig);
Preconditions.checkNotNull(sourceConfig);
Preconditions.checkNotNull(deserializationSchema);
Preconditions.checkNotNull(kinesisShardAssigner);
this.streamArn = streamArn;
this.consumerConfig = consumerConfig;
this.sourceConfig = sourceConfig;
this.deserializationSchema = deserializationSchema;
this.kinesisShardAssigner = kinesisShardAssigner;
}
Expand Down Expand Up @@ -127,15 +127,15 @@ public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext reade
// We create a new stream proxy for each split reader since they have their own independent
// lifecycle.
Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
() -> new PollingKinesisShardSplitReader(createKinesisStreamProxy(consumerConfig));
() -> new PollingKinesisShardSplitReader(createKinesisStreamProxy(sourceConfig));
KinesisStreamsRecordEmitter<T> recordEmitter =
new KinesisStreamsRecordEmitter<>(deserializationSchema);

return new KinesisStreamsSourceReader<>(
elementsQueue,
new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier::get),
recordEmitter,
ConfigurationUtils.createConfiguration(consumerConfig),
sourceConfig,
readerContext);
}

Expand All @@ -154,8 +154,8 @@ public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> c
return new KinesisStreamsSourceEnumerator(
enumContext,
streamArn,
consumerConfig,
createKinesisStreamProxy(consumerConfig),
sourceConfig,
createKinesisStreamProxy(sourceConfig),
kinesisShardAssigner,
checkpoint);
}
Expand All @@ -171,15 +171,18 @@ public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
}

private KinesisStreamProxy createKinesisStreamProxy(Properties consumerConfig) {
private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
AttributeMap.builder().build(), ApacheHttpClient.builder());

This comment has been minimized.

Copy link
@IgorWalkowiak

IgorWalkowiak Mar 4, 2024

Hello, I'm absolutely new here. I'm trying to understand that code and I don't get why config is not passed to httpClient?


AWSGeneralUtil.validateAwsCredentials(consumerConfig);
Properties kinesisClientProperties = new Properties();
consumerConfig.addAllToProperties(kinesisClientProperties);

AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
KinesisClient kinesisClient =
AWSClientUtil.createAwsSyncClient(
consumerConfig,
kinesisClientProperties,
httpClient,
KinesisClient.builder(),
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;

import java.util.Properties;

/**
* Builder to construct the {@link KinesisStreamsSource}.
*
Expand All @@ -53,7 +52,7 @@
@Experimental
public class KinesisStreamsSourceBuilder<T> {
private String streamArn;
private Properties consumerConfig;
private Configuration sourceConfig;
private KinesisDeserializationSchema<T> deserializationSchema;
private KinesisShardAssigner kinesisShardAssigner = ShardAssignerFactory.uniformShardAssigner();

Expand All @@ -62,8 +61,8 @@ public KinesisStreamsSourceBuilder<T> setStreamArn(String streamArn) {
return this;
}

public KinesisStreamsSourceBuilder<T> setConsumerConfig(Properties consumerConfig) {
this.consumerConfig = consumerConfig;
public KinesisStreamsSourceBuilder<T> setSourceConfig(Configuration sourceConfig) {
this.sourceConfig = sourceConfig;
return this;
}

Expand All @@ -87,6 +86,6 @@ public KinesisStreamsSourceBuilder<T> setKinesisShardAssigner(

public KinesisStreamsSource<T> build() {
return new KinesisStreamsSource<>(
streamArn, consumerConfig, deserializationSchema, kinesisShardAssigner);
streamArn, sourceConfig, deserializationSchema, kinesisShardAssigner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.connector.kinesis.source.config;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** Constants to be used with the KinesisStreamsSource. */
@Experimental
Expand All @@ -30,30 +32,29 @@ public enum InitialPosition {
AT_TIMESTAMP
}

/** The initial position to start reading Kinesis streams from. */
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";

public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();

/**
* The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for
* STREAM_INITIAL_POSITION).
*/
public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";

/**
* The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP
* is set for STREAM_INITIAL_POSITION).
*/
public static final String STREAM_TIMESTAMP_DATE_FORMAT =
"flink.stream.initpos.timestamp.format";

public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSSXXX";

/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS =
"flink.shard.discovery.intervalmillis";

public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
public static final ConfigOption<InitialPosition> STREAM_INITIAL_POSITION =
ConfigOptions.key("flink.stream.initpos")
.enumType(InitialPosition.class)
.defaultValue(InitialPosition.LATEST)
.withDescription("The initial position to start reading Kinesis streams.");

public static final ConfigOption<String> STREAM_INITIAL_TIMESTAMP =
ConfigOptions.key("flink.stream.initpos.timestamp")
.stringType()
.noDefaultValue()
.withDescription(
"The initial timestamp at which to start reading from the Kinesis stream. This is used when AT_TIMESTAMP is configured for the STREAM_INITIAL_POSITION.");

public static final ConfigOption<String> STREAM_TIMESTAMP_DATE_FORMAT =
ConfigOptions.key("flink.stream.initpos.timestamp.format")
.stringType()
.defaultValue("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
.withDescription(
"The date format used to parse the initial timestamp at which to start reading from the Kinesis stream. This is used when AT_TIMESTAMP is configured for the STREAM_INITIAL_POSITION.");

public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS =
ConfigOptions.key("flink.shard.discovery.intervalmillis")
.longType()
.defaultValue(10000L)
.withDescription("The interval between each attempt to discover new shards.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package org.apache.flink.connector.kinesis.source.config;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT;

Expand All @@ -39,19 +38,18 @@ private KinesisStreamsSourceConfigUtil() {
}

/**
* Parses the timestamp in which to start consuming from the stream, from the given properties.
* Parses the timestamp in which to start consuming from the stream, from the given
* configuration.
*
* @param consumerConfig the properties to parse timestamp from
* @param sourceConfig the configuration to parse timestamp from
* @return the timestamp
*/
public static Date parseStreamTimestampStartingPosition(final Properties consumerConfig) {
Preconditions.checkNotNull(consumerConfig);
String timestamp = consumerConfig.getProperty(STREAM_INITIAL_TIMESTAMP);
public static Date parseStreamTimestampStartingPosition(final Configuration sourceConfig) {
Preconditions.checkNotNull(sourceConfig);
String timestamp = sourceConfig.get(STREAM_INITIAL_TIMESTAMP);

try {
String format =
consumerConfig.getProperty(
STREAM_TIMESTAMP_DATE_FORMAT, DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
String format = sourceConfig.get(STREAM_TIMESTAMP_DATE_FORMAT);
SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
return customDateFormat.parse(timestamp);
} catch (IllegalArgumentException | NullPointerException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.InitialPosition;
import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
Expand All @@ -48,11 +49,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
import static org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition;
Expand All @@ -69,7 +67,7 @@ public class KinesisStreamsSourceEnumerator

private final SplitEnumeratorContext<KinesisShardSplit> context;
private final String streamArn;
private final Properties consumerConfig;
private final Configuration sourceConfig;
private final StreamProxy streamProxy;
private final KinesisShardAssigner shardAssigner;
private final ShardAssignerContext shardAssignerContext;
Expand All @@ -83,13 +81,13 @@ public class KinesisStreamsSourceEnumerator
public KinesisStreamsSourceEnumerator(
SplitEnumeratorContext<KinesisShardSplit> context,
String streamArn,
Properties consumerConfig,
Configuration sourceConfig,
StreamProxy streamProxy,
KinesisShardAssigner shardAssigner,
KinesisStreamsSourceEnumeratorState state) {
this.context = context;
this.streamArn = streamArn;
this.consumerConfig = consumerConfig;
this.sourceConfig = sourceConfig;
this.streamProxy = streamProxy;
this.shardAssigner = shardAssigner;
this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context);
Expand All @@ -108,11 +106,7 @@ public void start() {
context.callAsync(this::initialDiscoverSplits, this::assignSplits);
}

final long shardDiscoveryInterval =
Long.parseLong(
consumerConfig.getProperty(
SHARD_DISCOVERY_INTERVAL_MILLIS,
String.valueOf(DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
final long shardDiscoveryInterval = sourceConfig.get(SHARD_DISCOVERY_INTERVAL_MILLIS);
context.callAsync(
this::periodicallyDiscoverSplits,
this::assignSplits,
Expand Down Expand Up @@ -163,13 +157,7 @@ public void close() throws IOException {

private List<KinesisShardSplit> initialDiscoverSplits() {
List<Shard> shards = streamProxy.listShards(streamArn, lastSeenShardId);
return mapToSplits(
shards,
InitialPosition.valueOf(
consumerConfig
.getOrDefault(
STREAM_INITIAL_POSITION, DEFAULT_STREAM_INITIAL_POSITION)
.toString()));
return mapToSplits(shards, sourceConfig.get(STREAM_INITIAL_POSITION));
}

/**
Expand Down Expand Up @@ -199,7 +187,7 @@ private List<KinesisShardSplit> mapToSplits(
case AT_TIMESTAMP:
startingPosition =
StartingPosition.fromTimestamp(
parseStreamTimestampStartingPosition(consumerConfig).toInstant());
parseStreamTimestampStartingPosition(sourceConfig).toInstant());
break;
case TRIM_HORIZON:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;

import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class KinesisStreamsSourceTest {

@Test
void testSupportsContinuousUnboundedOnly() throws Exception {
final Properties sourceConfig = new Properties();
final Configuration sourceConfig = new Configuration();
final KinesisStreamsSource<String> source =
KinesisStreamsSource.<String>builder()
.setStreamArn(STREAM_ARN)
.setConsumerConfig(sourceConfig)
.setSourceConfig(sourceConfig)
.setDeserializationSchema(new SimpleStringSchema())
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner())
.build();
Expand Down
Loading

0 comments on commit 1006e6a

Please sign in to comment.