-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-31987] Implement Table API support #84
base: main
Are you sure you want to change the base?
Conversation
2b69601
to
3adffea
Compare
if (!(config.containsKey(AWSConfigConstants.AWS_REGION) | ||
|| config.containsKey(AWSConfigConstants.AWS_ENDPOINT))) { | ||
// per validation in AwsClientBuilder | ||
throw new IllegalArgumentException( | ||
String.format( | ||
"For KinesisStreamsSource AWS region ('%s') and/or AWS endpoint ('%s') must be set in the config.", | ||
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this should be duplicated, can we consolidate into a common place for all table APIs?
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " | ||
+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message is only valid when STREAM_TIMESTAMP_DATE_FORMAT
is not given. Can we use STREAM_TIMESTAMP_DATE_FORMAT
in the error message?
import java.util.Objects; | ||
|
||
/** Internal type for enumerating available metadata. */ | ||
public enum Metadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing compatibility annotation (@Internal
)
import java.util.Objects; | ||
|
||
/** Internal type for enumerating available metadata. */ | ||
public enum Metadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is named very generically, and the package name does not tell me it is related to Table API. Can we rename/move package?
import java.util.stream.Collectors; | ||
|
||
/** Kinesis-backed {@link ScanTableSource}. */ | ||
public class KinesisDynamicSource implements ScanTableSource, SupportsReadingMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing compatibility annotation
|
||
/** Factory for creating {@link KinesisDynamicSource}. */ | ||
public class KinesisDynamicTableSourceFactory implements DynamicTableSourceFactory { | ||
public static final String IDENTIFIER = "kinesis-source"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a decision we can make so lightly. For the KDS sink we superseded the previous sink with the new one. We reused kinesis
identifier and maintained backwards compatibility for config options, where possible. Now we have externalised connectors we have 2 options, but regardless, I do not want to change the identifier:
- Minor version (4.x) with backwards compatibility for old source config
- Major version (5.x) with breaking changes, no support for old config
@Override | ||
public Set<ConfigOption<?>> requiredOptions() { | ||
final Set<ConfigOption<?>> options = new HashSet<>(); | ||
options.add(STREAM_ARN); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a breaking change for a minor version update
Purpose of the change
Implements the Table API for the Kinesis Source.
Verifying this change
This change added tests and can be verified as follows:
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving)
)