Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33132] Flink Connector Redshift TableSink Implementation #114

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Samrat002
Copy link
Contributor

@Samrat002 Samrat002 commented Nov 9, 2023

Purpose of the change

Flink Connector Redshift Sink Implementation

Verifying this change

JDBC mode testing

CREATE TABLE users1 (
>     `id` BIGINT,
>     `data` STRING,
>     PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>     'connector' = 'redshift',
>     'sink.mode' = 'JDBC',
>     'sink.copy-mode.aws.s3-uri' = 's3://dbsamrat-aws-bucket/redshift/flink_sink/users/',
>     'sink.database-name' = 'flink_sink',
>     'hostname' = 'flink-redshift.xxxxxxxxxx.xx-xxxx-x.redshift.amazonaws.com',
>     'sink.aws.iam-role-arn' = 'arn:aws:iam::xxxxxxxxxxxxx:role/service-role/AmazonRedshift-CommandsAccessRole-xxxxxxxxxxxxxxx',
>     'username' = 'admin',
>     'password' = 'xxxxxx',
>     'port' = '5439',
>     'sink.batch-size' = '10',
>     'sink.flush-interval' = '10',
>     'sink.max-retries' = '2',
>     'sink.table-name' = 'users1'
> );

 CREATE TABLE datagentable (
>     id   INT,
>     data STRING
>   ) WITH ('connector' = 'datagen',  'number-of-rows' = '10000');

insert into user1 select * from datagentable;

Screenshot 2023-12-17 at 11 42 15 AM

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@Samrat002 Samrat002 marked this pull request as draft November 9, 2023 00:07
@Samrat002 Samrat002 changed the title [WIP][FLINK-33132] Flink Connector Redshift Sink Implementation [FLINK-33132][WIP] Flink Connector Redshift Sink Implementation Nov 9, 2023
@Samrat002 Samrat002 changed the title [FLINK-33132][WIP] Flink Connector Redshift Sink Implementation [FLINK-33132] Flink Connector Redshift TableSink Implementation Nov 10, 2023
@Samrat002 Samrat002 marked this pull request as ready for review November 10, 2023 20:42
@Samrat002 Samrat002 changed the title [FLINK-33132] Flink Connector Redshift TableSink Implementation [WIP][FLINK-33132] Flink Connector Redshift TableSink Implementation Nov 15, 2023
@Samrat002 Samrat002 marked this pull request as draft November 15, 2023 12:46
@Samrat002 Samrat002 changed the title [WIP][FLINK-33132] Flink Connector Redshift TableSink Implementation [FLINK-33132] Flink Connector Redshift TableSink Implementation Dec 18, 2023
@Samrat002 Samrat002 marked this pull request as ready for review December 18, 2023 19:01
@Samrat002
Copy link
Contributor Author

@hlteoh37, @vahmed-hamdy please review in free time 🙏🏻

@melin
Copy link

melin commented Jan 20, 2024

1、The tuncate table paramter is supported in the batch import scenario. If data exists in a table, duplicate data will be generated and the table must be cleared first
2、Can upsert write data in Batch data import scenarios。
https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html

@Samrat002
Copy link
Contributor Author

Samrat002 commented Jan 30, 2024

thank you for reviewing the pr .

1、The tuncate table paramter is supported in the batch import scenario. If data exists in a table, duplicate data will be generated and the table must be cleared first

If the record exisits in table and redshift table created contains primary key or composite key . it carries out merge into operation . If you check the code we are doing merge into operation if ddl contains primary key .

2、Can upsert write data in Batch data import scenarios。 https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html

can you please elaborate more , as per what i understand you are concern how staged data get merged , in code we are using https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html#merge-method-specify-column-list .

Copy link
Contributor

@vahmed-hamdy vahmed-hamdy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left some comments, I will continue the review later.
I believe this PR is incomplete right? we still need to add tests.

/** Dynamic Table Factory. */
@PublicEvolving
public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "redshift";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would move the configs to a separate file as in flink-connector-aws-kinesis-streams

public static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("sink.database-name")
.stringType()
.defaultValue("dev")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to set that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dev is the default database name created by redshift. i assumed if user dont provide database name as a config then it should assume database name as default one

.noDefaultValue()
.withDescription("AWS Redshift cluster sink table name.");

public static final ConfigOption<Integer> SINK_BATCH_SIZE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: have you considered using AsyncDynamicTableSink it seems you are reusing some properties here

.stringType()
.noDefaultValue()
.withDescription("using Redshift COPY command must provide a S3 URI.");
public static final ConfigOption<String> IAM_ROLE_ARN =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the existing aws authentication way?

"Currently, 2 modes are supported for Flink connector redshift.\n"
+ "\t 1) COPY Mode."
+ "\t 2) JDBC Mode.");
public static final ConfigOption<String> TEMP_S3_URI =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why TEMP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in copy mode , redhift data needs to be written into temporary s3 path. these path only useful till copy mode reads the data from temporary location (s3) and uploads to redhsift workers.
In flip this config was mentioned


private static final Logger LOG = LoggerFactory.getLogger(AbstractRedshiftOutputFormat.class);

protected transient volatile boolean closed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is smelly, have you tested that away from local clusters and with checkpointing?

public synchronized void close() {
if (!closed) {
closed = true;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove new line

try {
flush();
} catch (Exception exception) {
LOG.warn("Flushing records to Redshift failed.", exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are swallowing all exceptions here, this seems like a smell and could possibly break delivery guarantees. We should capture specific exceptions only and bubble/wrap up the rest.

public void scheduledFlush(long intervalMillis, String executorName) {
Preconditions.checkArgument(intervalMillis > 0, "flush interval must be greater than 0");
scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(executorName));
scheduledFuture =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks the execution model, You should use the mailboxExecutor instead.

import java.time.Duration;
import java.util.Optional;

/** Options. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we use more descriptive Javadoc for example: "Options to configure connection to redshift"

@Samrat002
Copy link
Contributor Author

I have left some comments, I will continue the review later. I believe this PR is incomplete right? we still need to add tests.

yes , tests were not added , since it will increase the size of PR.
As discussed offline , i will reduce the scope of this pr to only Async v2 and move other things to succeeding pr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants