Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31987] Implement Table API support #84

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

darenwkt
Copy link
Contributor

Purpose of the change

Implements the Table API for the Kinesis Source.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • [x ] Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • [x ] New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@darenwkt darenwkt force-pushed the FLINK-31987 branch 2 times, most recently from 2b69601 to 3adffea Compare June 20, 2023 09:19
Comment on lines +79 to +86
if (!(config.containsKey(AWSConfigConstants.AWS_REGION)
|| config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(
String.format(
"For KinesisStreamsSource AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this should be duplicated, can we consolidate into a common place for all table APIs?

Comment on lines +104 to +105
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message is only valid when STREAM_TIMESTAMP_DATE_FORMAT is not given. Can we use STREAM_TIMESTAMP_DATE_FORMAT in the error message?

import java.util.Objects;

/** Internal type for enumerating available metadata. */
public enum Metadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing compatibility annotation (@Internal)

import java.util.Objects;

/** Internal type for enumerating available metadata. */
public enum Metadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is named very generically, and the package name does not tell me it is related to Table API. Can we rename/move package?

import java.util.stream.Collectors;

/** Kinesis-backed {@link ScanTableSource}. */
public class KinesisDynamicSource implements ScanTableSource, SupportsReadingMetadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing compatibility annotation


/** Factory for creating {@link KinesisDynamicSource}. */
public class KinesisDynamicTableSourceFactory implements DynamicTableSourceFactory {
public static final String IDENTIFIER = "kinesis-source";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a decision we can make so lightly. For the KDS sink we superseded the previous sink with the new one. We reused kinesis identifier and maintained backwards compatibility for config options, where possible. Now we have externalised connectors we have 2 options, but regardless, I do not want to change the identifier:

  1. Minor version (4.x) with backwards compatibility for old source config
  2. Major version (5.x) with breaking changes, no support for old config

@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(STREAM_ARN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change for a minor version update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants