From 118e644b858be384fe36169ee5000e2fd187cf45 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Thu, 16 Nov 2023 18:41:12 +0530 Subject: [PATCH] Serialization error issue --- .../RedshiftJdbcConnectionProvider.java | 7 +- .../RedshiftCopyModeBatchExecutor.java | 30 ++-- .../RedshiftCopyModeUpsertExecutor.java | 34 ++-- .../internal/executor/RedshiftExecutor.java | 68 ++++++-- .../RedshiftJdbcModeBatchExecutor.java | 12 +- .../RedshiftJdbcModeUpsertExecutor.java | 14 +- .../InternalRedshiftConfigOptions.java | 156 +----------------- .../RedshiftConnectionConfigOptions.java | 87 ++++++++++ .../options/RedshiftConnectorOptions.java | 12 +- .../RedshiftCopyModeConfigOptions.java | 48 ++++++ .../RedshiftExecutionConfigOptions.java | 71 ++++++++ .../table/RedshiftDynamicTableFactory.java | 129 +++++++++------ .../table/RedshiftDynamicTableSink.java | 37 +++-- .../AbstractRedshiftRichOutputFormat.java | 32 +++- .../util/RedshiftRichOutputFormat.java | 38 +++-- 15 files changed, 490 insertions(+), 285 deletions(-) create mode 100644 flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectionConfigOptions.java create mode 100644 flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftCopyModeConfigOptions.java create mode 100644 flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftExecutionConfigOptions.java diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java index 3f07fd59..d7aff543 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.redshift.internal.connection; import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; import com.amazon.redshift.core.BaseConnection; import com.amazon.redshift.jdbc.RedshiftConnectionImpl; @@ -44,10 +44,9 @@ public class RedshiftJdbcConnectionProvider implements RedshiftConnectionProvide private transient BaseConnection connection; - private final transient InternalRedshiftConfigOptions.ConnectionConfigOptions connectionOptions; + private final transient RedshiftConnectionConfigOptions connectionOptions; - public RedshiftJdbcConnectionProvider( - InternalRedshiftConfigOptions.ConnectionConfigOptions configOptions) { + public RedshiftJdbcConnectionProvider(RedshiftConnectionConfigOptions configOptions) { this.connectionOptions = configOptions; } diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeBatchExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeBatchExecutor.java index d5d3a225..0bac434d 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeBatchExecutor.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeBatchExecutor.java @@ -23,6 +23,9 @@ import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement; import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement; import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.connector.redshift.util.S3Util; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; @@ -43,19 +46,19 @@ public class RedshiftCopyModeBatchExecutor implements RedshiftExecutor { private static final Logger LOG = LoggerFactory.getLogger(RedshiftCopyModeBatchExecutor.class); - private final transient int maxRetries; + private final int maxRetries; - private final transient String tableName; + private final String tableName; private final transient String[] fieldNames; - private transient String sql; + private String sql; - private final transient RedshiftCopyModeRowConverter copyRowConverter; + private final RedshiftCopyModeRowConverter copyRowConverter; - private final transient String tempS3Uri; + private final String tempS3Uri; - private final transient String iamRoleArn; + private final String iamRoleArn; private final transient List csvData; @@ -64,16 +67,21 @@ public class RedshiftCopyModeBatchExecutor implements RedshiftExecutor { private transient FieldNamedRedshiftPreparedStatement statement; public RedshiftCopyModeBatchExecutor( - String[] fieldNames, LogicalType[] fieldTypes, InternalRedshiftConfigOptions options) { - this.tableName = options.getConnectionConfigOptions().getTableName(); + String[] fieldNames, + LogicalType[] fieldTypes, + InternalRedshiftConfigOptions options, + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions, + RedshiftCopyModeConfigOptions copyModeConfigOptions) { + this.tableName = connectionConfigOptions.getTableName(); this.fieldNames = fieldNames; - this.maxRetries = options.getExecutionConfigOptions().getMaxRetries(); + this.maxRetries = executionConfigOptions.getMaxRetries(); this.csvData = new ArrayList<>(); this.s3Client = S3Client.create(); this.copyRowConverter = new RedshiftCopyModeRowConverter(fieldTypes); - this.tempS3Uri = S3Util.getS3UriWithFileName(options.getCopyModeConfigOptions().getS3Uri()); - this.iamRoleArn = options.getCopyModeConfigOptions().getIamRoleArn(); + this.tempS3Uri = S3Util.getS3UriWithFileName(copyModeConfigOptions.getS3Uri()); + this.iamRoleArn = copyModeConfigOptions.getIamRoleArn(); } @Override diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeUpsertExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeUpsertExecutor.java index 0579f12b..cc0e66d4 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeUpsertExecutor.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftCopyModeUpsertExecutor.java @@ -22,7 +22,9 @@ import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider; import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement; import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement; -import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.connector.redshift.util.S3Util; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; @@ -45,9 +47,9 @@ public class RedshiftCopyModeUpsertExecutor implements RedshiftExecutor { private static final Logger LOG = LoggerFactory.getLogger(RedshiftCopyModeUpsertExecutor.class); - private final transient int maxRetries; + private final int maxRetries; - private final transient String tableName; + private final String tableName; private final transient String stageTableName; @@ -55,19 +57,19 @@ public class RedshiftCopyModeUpsertExecutor implements RedshiftExecutor { private final transient String[] keyFields; - private final transient String tempS3Uri; + private final String tempS3Uri; - private final transient String iamRoleArn; + private final String iamRoleArn; - private transient String copyInsertSql; + private String copyInsertSql; - private transient String updateTransactionSql; + private String updateTransactionSql; - private transient String deleteSql; + private String deleteSql; - private final transient RedshiftCopyModeRowConverter deleteConverter; + private final RedshiftCopyModeRowConverter deleteConverter; - private final transient Function deleteExtractor; + private final Function deleteExtractor; private transient FieldNamedRedshiftPreparedStatement insertStatement; @@ -89,9 +91,11 @@ public RedshiftCopyModeUpsertExecutor( LogicalType[] fieldTypes, RedshiftCopyModeRowConverter deleteConverter, Function deleteExtractor, - InternalRedshiftConfigOptions options) { + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions, + RedshiftCopyModeConfigOptions copyModeConfigOptions) { - this.maxRetries = options.getExecutionConfigOptions().getMaxRetries(); + this.maxRetries = executionConfigOptions.getMaxRetries(); this.fieldNames = fieldNames; this.keyFields = keyFields; this.deleteConverter = deleteConverter; @@ -99,12 +103,12 @@ public RedshiftCopyModeUpsertExecutor( this.csvInsertData = new ArrayList<>(); this.csvUpdateData = new ArrayList<>(); - this.tableName = options.getConnectionConfigOptions().getTableName(); - this.iamRoleArn = options.getCopyModeConfigOptions().getIamRoleArn(); + this.tableName = connectionConfigOptions.getTableName(); + this.iamRoleArn = copyModeConfigOptions.getIamRoleArn(); this.s3Client = S3Client.create(); this.copyRowConverter = new RedshiftCopyModeRowConverter(fieldTypes); this.stageTableName = "_" + tableName + "_stage"; - this.tempS3Uri = S3Util.getS3UriWithFileName(options.getCopyModeConfigOptions().getS3Uri()); + this.tempS3Uri = S3Util.getS3UriWithFileName(copyModeConfigOptions.getS3Uri()); } @Override diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java index 306c260c..3b53dd4e 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java @@ -25,6 +25,9 @@ import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement; import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement; import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; @@ -96,6 +99,9 @@ default void attemptExecuteBatch( static RedshiftExecutor createRedshiftExecutor( InternalRedshiftConfigOptions redshiftConfigOptions, + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions, + RedshiftCopyModeConfigOptions copyModeConfigOptions, String[] fieldNames, String[] primaryKeyFields, LogicalType[] fieldTypes) { @@ -104,11 +110,20 @@ static RedshiftExecutor createRedshiftExecutor( case COPY: LOG.info("Create Upload Copy UPSERT Executor."); return createCopyModeUpsertExecutor( - fieldNames, primaryKeyFields, fieldTypes, redshiftConfigOptions); + fieldNames, + primaryKeyFields, + fieldTypes, + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions); case JDBC: LOG.info("Create pure JDBC UPSERT Executor."); return createJdbcModeUpsertExecutor( - fieldNames, primaryKeyFields, fieldTypes, redshiftConfigOptions); + fieldNames, + primaryKeyFields, + fieldTypes, + connectionConfigOptions, + executionConfigOptions); } } else { @@ -116,11 +131,19 @@ static RedshiftExecutor createRedshiftExecutor( case JDBC: LOG.info("Create pure JDBC batch Executor."); return createJdbcModeBatchExecutor( - fieldNames, fieldTypes, redshiftConfigOptions); + fieldNames, + fieldTypes, + connectionConfigOptions, + executionConfigOptions); case COPY: LOG.info("Create Upload Copy batch Executor."); return createCopyModeBatchExecutor( - fieldNames, fieldTypes, redshiftConfigOptions); + fieldNames, + fieldTypes, + redshiftConfigOptions, + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions); } } throw new FlinkRuntimeException("No correct Executor found"); @@ -129,15 +152,26 @@ static RedshiftExecutor createRedshiftExecutor( static RedshiftCopyModeBatchExecutor createCopyModeBatchExecutor( String[] fieldNames, LogicalType[] fieldTypes, - InternalRedshiftConfigOptions redshiftConfigOptions) { - return new RedshiftCopyModeBatchExecutor(fieldNames, fieldTypes, redshiftConfigOptions); + InternalRedshiftConfigOptions redshiftConfigOptions, + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions, + RedshiftCopyModeConfigOptions copyModeConfigOptions) { + return new RedshiftCopyModeBatchExecutor( + fieldNames, + fieldTypes, + redshiftConfigOptions, + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions); } static RedshiftCopyModeUpsertExecutor createCopyModeUpsertExecutor( String[] fieldNames, String[] keyFields, LogicalType[] fieldTypes, - InternalRedshiftConfigOptions options) { + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions, + RedshiftCopyModeConfigOptions copyModeConfigOptions) { int[] delFields = Arrays.stream(keyFields) @@ -153,24 +187,30 @@ static RedshiftCopyModeUpsertExecutor createCopyModeUpsertExecutor( fieldTypes, new RedshiftCopyModeRowConverter(delTypes), createExtractor(fieldTypes, delFields), - options); + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions); } static RedshiftJdbcModeBatchExecutor createJdbcModeBatchExecutor( - String[] fieldNames, LogicalType[] fieldTypes, InternalRedshiftConfigOptions options) { + String[] fieldNames, + LogicalType[] fieldTypes, + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions) { String insertSql = RedshiftStatement.getInsertIntoStatement( - options.getConnectionConfigOptions().getTableName(), fieldNames); + connectionConfigOptions.getTableName(), fieldNames); RedshiftRowConverter converter = new RedshiftJdbcModeRowConverter(RowType.of(fieldTypes)); - return new RedshiftJdbcModeBatchExecutor(insertSql, converter, options); + return new RedshiftJdbcModeBatchExecutor(insertSql, converter, executionConfigOptions); } static RedshiftJdbcModeUpsertExecutor createJdbcModeUpsertExecutor( String[] fieldNames, String[] keyFields, LogicalType[] fieldTypes, - InternalRedshiftConfigOptions options) { - String tableName = options.getConnectionConfigOptions().getTableName(); + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions) { + String tableName = connectionConfigOptions.getTableName(); String insertSql = RedshiftStatement.getInsertIntoStatement(tableName, fieldNames); String updateSql = RedshiftStatement.getUpdateStatement(tableName, fieldNames, keyFields); String deleteSql = RedshiftStatement.getDeleteStatement(tableName, keyFields); @@ -200,7 +240,7 @@ static RedshiftJdbcModeUpsertExecutor createJdbcModeUpsertExecutor( new RedshiftJdbcModeRowConverter(RowType.of(delTypes)), createExtractor(fieldTypes, updFields), createExtractor(fieldTypes, delFields), - options, + executionConfigOptions, fieldNames); } diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeBatchExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeBatchExecutor.java index b8a7b425..9b6f7f5d 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeBatchExecutor.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeBatchExecutor.java @@ -21,7 +21,7 @@ import org.apache.flink.connector.redshift.converter.RedshiftRowConverter; import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider; import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement; -import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.util.FlinkRuntimeException; @@ -37,19 +37,21 @@ public class RedshiftJdbcModeBatchExecutor implements RedshiftExecutor { private static final Logger LOG = LoggerFactory.getLogger(RedshiftJdbcModeBatchExecutor.class); - private final transient String sql; + private final String sql; private final transient RedshiftRowConverter converter; - private final transient int maxRetries; + private final int maxRetries; private transient FieldNamedRedshiftPreparedStatement statement; public RedshiftJdbcModeBatchExecutor( - String sql, RedshiftRowConverter converter, InternalRedshiftConfigOptions options) { + String sql, + RedshiftRowConverter converter, + RedshiftExecutionConfigOptions executionConfigOptions) { this.sql = sql; this.converter = converter; - this.maxRetries = options.getExecutionConfigOptions().getMaxRetries(); + this.maxRetries = executionConfigOptions.getMaxRetries(); } @Override diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeUpsertExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeUpsertExecutor.java index ddd644cb..0eb168a4 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeUpsertExecutor.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftJdbcModeUpsertExecutor.java @@ -21,7 +21,7 @@ import org.apache.flink.connector.redshift.converter.RedshiftRowConverter; import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider; import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement; -import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.util.FlinkRuntimeException; @@ -39,11 +39,11 @@ public class RedshiftJdbcModeUpsertExecutor implements RedshiftExecutor { private static final Logger LOG = LoggerFactory.getLogger(RedshiftJdbcModeUpsertExecutor.class); - private final transient String insertSql; + private final String insertSql; - private final transient String updateSql; + private final String updateSql; - private final transient String deleteSql; + private final String deleteSql; private final transient RedshiftRowConverter insertConverter; @@ -55,7 +55,7 @@ public class RedshiftJdbcModeUpsertExecutor implements RedshiftExecutor { private final transient Function deleteExtractor; - private final transient int maxRetries; + private final int maxRetries; private final transient String[] fieldNames; private transient FieldNamedRedshiftPreparedStatement insertStatement; @@ -73,7 +73,7 @@ public RedshiftJdbcModeUpsertExecutor( RedshiftRowConverter deleteConverter, Function updateExtractor, Function deleteExtractor, - InternalRedshiftConfigOptions options, + RedshiftExecutionConfigOptions executionConfigOptions, String[] fieldNames) { this.insertSql = insertSql; this.updateSql = updateSql; @@ -83,7 +83,7 @@ public RedshiftJdbcModeUpsertExecutor( this.deleteConverter = deleteConverter; this.updateExtractor = updateExtractor; this.deleteExtractor = deleteExtractor; - this.maxRetries = options.getExecutionConfigOptions().getMaxRetries(); + this.maxRetries = executionConfigOptions.getMaxRetries(); this.fieldNames = fieldNames; } diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/InternalRedshiftConfigOptions.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/InternalRedshiftConfigOptions.java index 46cb93b7..e3a0cefe 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/InternalRedshiftConfigOptions.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/InternalRedshiftConfigOptions.java @@ -24,7 +24,6 @@ import lombok.Getter; import java.io.Serializable; -import java.time.Duration; /** Redshift Connection Option. */ @PublicEvolving @@ -34,15 +33,9 @@ public class InternalRedshiftConfigOptions implements Serializable { private static final long serialVersionUID = 1L; - private final transient SinkMode sinkMode; + private final SinkMode sinkMode; - private final transient ClassLoader classLoader; - - private final transient ConnectionConfigOptions connectionConfigOptions; - - private final transient ExecutionConfigOptions executionConfigOptions; - - private final transient CopyModeConfigOptions copyModeConfigOptions; + private final ClassLoader classLoader; public static InternalRedshiftConfigOptionsBuilder builder() { return new InternalRedshiftConfigOptionsBuilder(); @@ -51,93 +44,10 @@ public static InternalRedshiftConfigOptionsBuilder builder() { /** Builder for creating redshift Options. */ public static final class InternalRedshiftConfigOptionsBuilder { - private String hostName; - private String userName; - - private String password; - - private int port; - - private String databaseName; - - private String tableName; - - private String s3Uri; - - private String iamRoleArn; - - private Duration timeout; - - private Duration flushInterval; - - private int batchSize; - - private int maxRetries; - private SinkMode sinkMode; private ClassLoader classLoader; - public InternalRedshiftConfigOptionsBuilder withHostName(String hostName) { - this.hostName = hostName; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withUserName(String userName) { - this.userName = userName; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withPassword(String password) { - this.password = password; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withPort(int port) { - this.port = port; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withDatabaseName(String databaseName) { - this.databaseName = databaseName; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withTableName(String tableName) { - this.tableName = tableName; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withS3Uri(String s3Uri) { - this.s3Uri = s3Uri; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withIamRoleArn(String iamRoleArn) { - this.iamRoleArn = iamRoleArn; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withTimeout(Duration timeout) { - this.timeout = timeout; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withFlushInterval(Duration flushInterval) { - this.flushInterval = flushInterval; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withBatchSize(int batchSize) { - this.batchSize = batchSize; - return this; - } - - public InternalRedshiftConfigOptionsBuilder withMaxRetries(int maxRetries) { - this.maxRetries = maxRetries; - return this; - } - public InternalRedshiftConfigOptionsBuilder withSinkMode(SinkMode sinkMode) { this.sinkMode = sinkMode; return this; @@ -149,66 +59,8 @@ public InternalRedshiftConfigOptionsBuilder withClassLoader(ClassLoader classLoa } public InternalRedshiftConfigOptions build() { - ConnectionConfigOptions connectionConfigOptions = - new ConnectionConfigOptions( - hostName, userName, password, port, databaseName, tableName); - ExecutionConfigOptions executionConfigOptions = - new ExecutionConfigOptions(timeout, flushInterval, batchSize, maxRetries); - CopyModeConfigOptions copyModeConfigOptions = - new CopyModeConfigOptions(s3Uri, iamRoleArn); - return new InternalRedshiftConfigOptions( - sinkMode, - classLoader, - connectionConfigOptions, - executionConfigOptions, - copyModeConfigOptions); - } - } - - /** executor config. */ - @AllArgsConstructor - @Getter - public static class ConnectionConfigOptions implements Serializable { - - private static final long serialVersionUID = 1L; - private final String hostName; - private final String userName; - - private final String password; - - private final int port; - - private final String databaseName; - - private final String tableName; - } - - /** executor config. */ - @AllArgsConstructor - @Getter - public static class CopyModeConfigOptions implements Serializable { - - private static final long serialVersionUID = 1L; - - private final String s3Uri; - - private final String iamRoleArn; - } - - /** executor config. */ - @AllArgsConstructor - @Getter - public static class ExecutionConfigOptions implements Serializable { - - private static final long serialVersionUID = 1L; - - private final Duration timeout; - - private final Duration flushInterval; - - private final int batchSize; - - private final int maxRetries; + return new InternalRedshiftConfigOptions(sinkMode, classLoader); + } } } diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectionConfigOptions.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectionConfigOptions.java new file mode 100644 index 00000000..fa062db9 --- /dev/null +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectionConfigOptions.java @@ -0,0 +1,87 @@ +package org.apache.flink.connector.redshift.options; + +import org.apache.flink.annotation.PublicEvolving; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serializable; + +/** RedshiftConnectionConfigOptions. */ +@PublicEvolving +@AllArgsConstructor +@Getter +public class RedshiftConnectionConfigOptions implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String hostName; + private final String userName; + + private final String password; + + private final int port; + + private final String databaseName; + + private final String tableName; + + public static RedshiftConnectionConfigOptionsBuilder builder() { + return new RedshiftConnectionConfigOptionsBuilder(); + } + + /** RedshiftConnectionConfigOptions. */ + public static class RedshiftConnectionConfigOptionsBuilder { + private String hostName; + private String userName; + + private String password; + + private int port; + + private String databaseName; + + private String tableName; + + public RedshiftConnectionConfigOptions.RedshiftConnectionConfigOptionsBuilder withHostName( + String hostName) { + this.hostName = hostName; + return this; + } + + public RedshiftConnectionConfigOptions.RedshiftConnectionConfigOptionsBuilder withUserName( + String userName) { + this.userName = userName; + return this; + } + + public RedshiftConnectionConfigOptions.RedshiftConnectionConfigOptionsBuilder withPassword( + String password) { + this.password = password; + return this; + } + + public RedshiftConnectionConfigOptions.RedshiftConnectionConfigOptionsBuilder withPort( + int port) { + this.port = port; + return this; + } + + public RedshiftConnectionConfigOptions.RedshiftConnectionConfigOptionsBuilder + withDatabaseName(String databaseName) { + this.databaseName = databaseName; + return this; + } + + public RedshiftConnectionConfigOptions.RedshiftConnectionConfigOptionsBuilder withTableName( + String tableName) { + this.tableName = tableName; + return this; + } + + public RedshiftConnectionConfigOptions build() { + return new RedshiftConnectionConfigOptions( + hostName, userName, password, port, databaseName, tableName); + } + } +} diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectorOptions.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectorOptions.java index 115d1f12..7725ac6f 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectorOptions.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftConnectorOptions.java @@ -36,7 +36,7 @@ public class RedshiftConnectorOptions { public static final ConfigOption HOSTNAME = ConfigOptions.key("hostname") .stringType() - .noDefaultValue() + .defaultValue("flink-redshift.cvfcdsqgq0oj.us-west-2.redshift.amazonaws.com") .withDescription("Redshift Cluster's Hostname"); public static final ConfigOption PORT = @@ -49,13 +49,13 @@ public class RedshiftConnectorOptions { public static final ConfigOption USERNAME = ConfigOptions.key("username") .stringType() - .noDefaultValue() + .defaultValue("admin") .withDescription("username for Redshift Cluster connection."); public static final ConfigOption PASSWORD = ConfigOptions.key("password") .stringType() - .noDefaultValue() + .defaultValue("Sam#1000") .withDescription("Password of Redshift Cluster associated with username."); public static final ConfigOption IAM_ROLE_ARN = @@ -74,13 +74,13 @@ public class RedshiftConnectorOptions { public static final ConfigOption DATABASE_NAME = ConfigOptions.key("database-name") .stringType() - .noDefaultValue() + .defaultValue("flink_sink") .withDescription("Name of Database to which connector is intended to connect."); public static final ConfigOption TABLE_NAME = ConfigOptions.key("table-name") .stringType() - .noDefaultValue() + .defaultValue("users") .withDescription("Table Name to which sink/source to be setup."); // ----------------------------------------------------------------------------------------- @@ -100,7 +100,7 @@ public class RedshiftConnectorOptions { .defaultValue(SinkMode.JDBC) .withDescription("Mode of sink. Currently it only supports JDBC / COPY "); - public static final ConfigOption MAX_RETIRES = + public static final ConfigOption MAX_RETRIES = ConfigOptions.key("sink.max.retries") .intType() .defaultValue(2) diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftCopyModeConfigOptions.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftCopyModeConfigOptions.java new file mode 100644 index 00000000..cc634806 --- /dev/null +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftCopyModeConfigOptions.java @@ -0,0 +1,48 @@ +package org.apache.flink.connector.redshift.options; + +import org.apache.flink.annotation.PublicEvolving; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serializable; + +/** RedshiftConnectionConfigOptions. */ +@PublicEvolving +@AllArgsConstructor +@Getter +public class RedshiftCopyModeConfigOptions implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String s3Uri; + + private final String iamRoleArn; + + public static RedshiftCopyModeConfigOptionsBuilder builder() { + return new RedshiftCopyModeConfigOptionsBuilder(); + } + + /** RedshiftConnectionConfigOptions. */ + public static class RedshiftCopyModeConfigOptionsBuilder { + private String s3Uri; + + private String iamRoleArn; + + public RedshiftCopyModeConfigOptions.RedshiftCopyModeConfigOptionsBuilder withS3Uri( + String s3Uri) { + this.s3Uri = s3Uri; + return this; + } + + public RedshiftCopyModeConfigOptions.RedshiftCopyModeConfigOptionsBuilder withIamRoleArn( + String iamRoleArn) { + this.iamRoleArn = iamRoleArn; + return this; + } + + public RedshiftCopyModeConfigOptions build() { + return new RedshiftCopyModeConfigOptions(s3Uri, iamRoleArn); + } + } +} diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftExecutionConfigOptions.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftExecutionConfigOptions.java new file mode 100644 index 00000000..9e7af9f0 --- /dev/null +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/options/RedshiftExecutionConfigOptions.java @@ -0,0 +1,71 @@ +package org.apache.flink.connector.redshift.options; + +import org.apache.flink.annotation.PublicEvolving; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serializable; +import java.time.Duration; + +/** RedshiftConnectionConfigOptions. */ +@PublicEvolving +@AllArgsConstructor +@Getter +public class RedshiftExecutionConfigOptions implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Duration timeout; + + private final Duration flushInterval; + + private final int batchSize; + + private final int maxRetries; + + public static RedshiftExecutionConfigOptionsBuilder builder() { + return new RedshiftExecutionConfigOptionsBuilder(); + } + + /** RedshiftConnectionConfigOptions. */ + public static class RedshiftExecutionConfigOptionsBuilder { + + private Duration timeout; + + private Duration flushInterval; + + private int batchSize; + + private int maxRetries; + + public RedshiftExecutionConfigOptions.RedshiftExecutionConfigOptionsBuilder withTimeout( + Duration timeout) { + this.timeout = timeout; + return this; + } + + public RedshiftExecutionConfigOptions.RedshiftExecutionConfigOptionsBuilder + withFlushInterval(Duration flushInterval) { + this.flushInterval = flushInterval; + return this; + } + + public RedshiftExecutionConfigOptions.RedshiftExecutionConfigOptionsBuilder withBatchSize( + int batchSize) { + this.batchSize = batchSize; + return this; + } + + public RedshiftExecutionConfigOptions.RedshiftExecutionConfigOptionsBuilder withMaxRetries( + int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public RedshiftExecutionConfigOptions build() { + return new RedshiftExecutionConfigOptions( + timeout, flushInterval, batchSize, maxRetries); + } + } +} diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java index eed39ba2..52ef6648 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java @@ -22,7 +22,12 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.redshift.mode.SinkMode; import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.connector.redshift.util.S3Util; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.FactoryUtil; @@ -31,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -40,6 +44,7 @@ import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.FLUSH_INTERVAL; import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.HOSTNAME; import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.IAM_ROLE_ARN; +import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.MAX_RETRIES; import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.PASSWORD; import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.PORT; import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.S3_URI; @@ -47,7 +52,6 @@ import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.TABLE_NAME; import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.TIMEOUT; import static org.apache.flink.connector.redshift.options.RedshiftConnectorOptions.USERNAME; -import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; /** Redshift Dynamic Table Factory. */ public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory { @@ -65,67 +69,101 @@ public DynamicTableSink createDynamicTableSink(Context context) { helper.validate(); validateConfigOptions(config); final int[] primaryKeyIndexes = context.getPrimaryKeyIndexes(); - final DataType physicalRowDataType = context.getPhysicalRowDataType(); + final ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema(); - final String[] primaryKeyFields = - Arrays.stream(primaryKeyIndexes) - .mapToObj(idx -> DataType.getFieldNames(physicalRowDataType).get(idx)) - .toArray(String[]::new); + final String[] primaryKeyFieldNames = + resolvedSchema + .getPrimaryKey() + .map(UniqueConstraint::getColumns) + .map(keys -> keys.toArray(new String[0])) + .orElse(new String[0]); - LOG.info("Primary Keys are ", String.join(", ", primaryKeyFields)); + final String[] fieldNames = resolvedSchema.getColumnNames().toArray(new String[0]); + + LOG.info("Primary Keys are ", String.join(", ", primaryKeyFieldNames)); + LOG.info("All fields are ", String.join(", ", fieldNames)); final InternalRedshiftConfigOptions redshiftConfigOptions = getInternalRedshiftOptions(config, context); - - LOG.info("NOW" + redshiftConfigOptions.getSinkMode().toString()); - LOG.info("NOW" + redshiftConfigOptions.getConnectionConfigOptions().getHostName()); - LOG.info("NOW" + redshiftConfigOptions.getConnectionConfigOptions().getUserName()); - - final String[] fieldNames = - DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); - - LOG.info("Field names are ", String.join(", ", fieldNames)); - - DataType[] fieldDataTypes = - context.getCatalogTable() - .getResolvedSchema() - .getColumnDataTypes() - .toArray(new DataType[0]); + final RedshiftConnectionConfigOptions connectionConfigOptions = + getConnectionConfigOptions(config); + final RedshiftExecutionConfigOptions executionConfigOptions = + getExecutionConfigOptions(config); + final RedshiftCopyModeConfigOptions copyModeConfigOptions = + getCopyModeConfigOptions(config); + + LOG.info("SINK MODE PROVIDED: " + redshiftConfigOptions.getSinkMode().toString()); + LOG.info("HOSTNAME :" + connectionConfigOptions.getHostName()); + LOG.info("USERNAME :" + connectionConfigOptions.getUserName()); + + DataType[] fieldDataTypes = resolvedSchema.getColumnDataTypes().toArray(new DataType[0]); return new RedshiftDynamicTableSink( redshiftConfigOptions, - context.getPhysicalRowDataType(), - primaryKeyFields, + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions, + primaryKeyFieldNames, fieldNames, fieldDataTypes); } + private RedshiftCopyModeConfigOptions getCopyModeConfigOptions(ReadableConfig readableConfig) { + RedshiftCopyModeConfigOptions.RedshiftCopyModeConfigOptionsBuilder builder = + RedshiftCopyModeConfigOptions.builder(); + readableConfig.getOptional(S3_URI).ifPresent(builder::withS3Uri); + readableConfig.getOptional(IAM_ROLE_ARN).ifPresent(builder::withIamRoleArn); + return builder.build(); + } + + private RedshiftExecutionConfigOptions getExecutionConfigOptions( + ReadableConfig readableConfig) { + RedshiftExecutionConfigOptions.RedshiftExecutionConfigOptionsBuilder builder = + RedshiftExecutionConfigOptions.builder(); + + readableConfig.getOptional(TIMEOUT).ifPresent(builder::withTimeout); + readableConfig.getOptional(FLUSH_INTERVAL).ifPresent(builder::withFlushInterval); + readableConfig.getOptional(BATCH_SIZE).ifPresent(builder::withBatchSize); + readableConfig.getOptional(MAX_RETRIES).ifPresent(builder::withMaxRetries); + return builder.build(); + } + + private RedshiftConnectionConfigOptions getConnectionConfigOptions(ReadableConfig config) { + RedshiftConnectionConfigOptions.RedshiftConnectionConfigOptionsBuilder builder = + RedshiftConnectionConfigOptions.builder() + .withHostName(config.get(HOSTNAME)) + .withPort(config.get(PORT)) + .withDatabaseName(config.get(DATABASE_NAME)) + .withTableName(config.get(TABLE_NAME)) + .withUserName(config.get(USERNAME)) + .withPassword(config.get(PASSWORD)); + + config.getOptional(USERNAME).ifPresent(builder::withUserName); + config.getOptional(PASSWORD).ifPresent(builder::withPassword); + + return builder.build(); + } + private InternalRedshiftConfigOptions getInternalRedshiftOptions( ReadableConfig readableConfig, Context context) { - LOG.info("HOSTNAME : " + readableConfig.get(HOSTNAME)); - LOG.info("PORT : " + readableConfig.get(PORT)); - LOG.info("USER : " + readableConfig.get(USERNAME)); - LOG.info("PASS : " + readableConfig.get(PASSWORD)); - LOG.info("DB : " + readableConfig.get(DATABASE_NAME)); - LOG.info("TA : " + readableConfig.get(TABLE_NAME)); - LOG.info("SINKMODE : " + readableConfig.get(SINK_MODE)); + // LOG.info("HOSTNAME : " + readableConfig.get(HOSTNAME)); + // LOG.info("PORT : " + readableConfig.get(PORT)); + // LOG.info("USER : " + readableConfig.get(USERNAME)); + // LOG.info("PASS : " + readableConfig.get(PASSWORD)); + // LOG.info("DB : " + readableConfig.get(DATABASE_NAME)); + // LOG.info("TA : " + readableConfig.get(TABLE_NAME)); + // LOG.info("SINKMODE : " + readableConfig.get(SINK_MODE)); InternalRedshiftConfigOptions.InternalRedshiftConfigOptionsBuilder builder = InternalRedshiftConfigOptions.builder() - .withHostName(readableConfig.get(HOSTNAME)) - .withUserName(readableConfig.get(USERNAME)) - .withPassword(readableConfig.get(PASSWORD)) - .withPort(readableConfig.get(PORT)) - .withDatabaseName(readableConfig.get(DATABASE_NAME)) - .withTableName(readableConfig.get(TABLE_NAME)) .withSinkMode(readableConfig.get(SINK_MODE)) .withClassLoader(context.getClassLoader()); - readableConfig.getOptional(S3_URI).ifPresent(builder::withS3Uri); - readableConfig.getOptional(IAM_ROLE_ARN).ifPresent(builder::withIamRoleArn); - readableConfig.getOptional(TIMEOUT).ifPresent(builder::withTimeout); - readableConfig.getOptional(FLUSH_INTERVAL).ifPresent(builder::withFlushInterval); - readableConfig.getOptional(BATCH_SIZE).ifPresent(builder::withBatchSize); - readableConfig.getOptional(MAX_RETRIES).ifPresent(builder::withMaxRetries); + // readableConfig.getOptional(S3_URI).ifPresent(builder::withS3Uri); + // readableConfig.getOptional(IAM_ROLE_ARN).ifPresent(builder::withIamRoleArn); + // readableConfig.getOptional(TIMEOUT).ifPresent(builder::withTimeout); + // readableConfig.getOptional(FLUSH_INTERVAL).ifPresent(builder::withFlushInterval); + // readableConfig.getOptional(BATCH_SIZE).ifPresent(builder::withBatchSize); + // readableConfig.getOptional(MAX_RETRIES).ifPresent(builder::withMaxRetries); return builder.build(); } @@ -160,8 +198,7 @@ public Set> optionalOptions() { private void validateConfigOptions(ReadableConfig config) { - if (config.get(SINK_MODE).equals(SinkMode.COPY) - && !config.getOptional(S3_URI).isPresent()) { + if (config.get(SINK_MODE) == SinkMode.COPY && !config.getOptional(S3_URI).isPresent()) { throw new IllegalArgumentException( "A S3 URL must be provided as the COPY mode is True!"); } diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableSink.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableSink.java index 96d67caf..21c089b1 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableSink.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableSink.java @@ -19,6 +19,9 @@ package org.apache.flink.connector.redshift.table; import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.connector.redshift.util.AbstractRedshiftRichOutputFormat; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -30,11 +33,14 @@ /** Redshift Dynamic Table Sink. */ public class RedshiftDynamicTableSink implements DynamicTableSink { - private final transient DataType physicalRowDataType; - private final transient InternalRedshiftConfigOptions redshiftConfigOptions; + private final InternalRedshiftConfigOptions redshiftConfigOptions; - private final transient String[] primaryKeyFields; + private final RedshiftCopyModeConfigOptions copyModeConfigOptions; + private final RedshiftConnectionConfigOptions connectionConfigOptions; + private final RedshiftExecutionConfigOptions executionConfigOptions; + + private final transient String[] primaryKeyFieldNames; private final transient String[] fieldNames; @@ -42,14 +48,18 @@ public class RedshiftDynamicTableSink implements DynamicTableSink { public RedshiftDynamicTableSink( InternalRedshiftConfigOptions redshiftConfigOptions, - DataType physicalRowDataType, - String[] primaryKeyFields, + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions, + RedshiftCopyModeConfigOptions copyModeConfigOptions, + String[] primaryKeyFieldNames, String[] fieldNames, DataType[] fieldDataTypes) { - this.physicalRowDataType = physicalRowDataType; this.redshiftConfigOptions = redshiftConfigOptions; - this.primaryKeyFields = primaryKeyFields.length > 0 ? primaryKeyFields : null; + this.connectionConfigOptions = connectionConfigOptions; + this.executionConfigOptions = executionConfigOptions; + this.copyModeConfigOptions = copyModeConfigOptions; + this.primaryKeyFieldNames = (primaryKeyFieldNames.length > 0) ? primaryKeyFieldNames : null; this.fieldNames = fieldNames; this.fieldDataTypes = fieldDataTypes; } @@ -66,7 +76,7 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { private void validatePrimaryKey(ChangelogMode requestedMode) { checkState( - ChangelogMode.insertOnly().equals(requestedMode) || primaryKeyFields == null, + ChangelogMode.insertOnly().equals(requestedMode) || primaryKeyFieldNames == null, "please declare primary key for sink table when query contains update/delete record."); } @@ -77,8 +87,11 @@ public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider( AbstractRedshiftRichOutputFormat outputFormat = new AbstractRedshiftRichOutputFormat.Builder() .withRedshiftConfigOptions(redshiftConfigOptions) + .withConnectionConfigOptions(connectionConfigOptions) + .withExecutionConfigOptions(executionConfigOptions) + .withCopyModeConfigOptions(copyModeConfigOptions) .withFieldNames(fieldNames) - .withPrimaryKeyFields(primaryKeyFields) + .withPrimaryKeyFields(primaryKeyFieldNames) .withFieldDataTypes(fieldDataTypes) .build(); return OutputFormatProvider.of(outputFormat); @@ -88,8 +101,10 @@ public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider( public DynamicTableSink copy() { return new RedshiftDynamicTableSink( redshiftConfigOptions, - physicalRowDataType, - primaryKeyFields, + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions, + primaryKeyFieldNames, fieldNames, fieldDataTypes); } diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/AbstractRedshiftRichOutputFormat.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/AbstractRedshiftRichOutputFormat.java index d9e2e15f..fb49129c 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/AbstractRedshiftRichOutputFormat.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/AbstractRedshiftRichOutputFormat.java @@ -25,6 +25,9 @@ import org.apache.flink.connector.redshift.internal.connection.RedshiftJdbcConnectionProvider; import org.apache.flink.connector.redshift.internal.executor.RedshiftExecutor; import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; @@ -138,7 +141,10 @@ public static class Builder { private transient DataType dataType; - private transient InternalRedshiftConfigOptions redshiftConfigOptions; + private InternalRedshiftConfigOptions redshiftConfigOptions; + private RedshiftConnectionConfigOptions connectionConfigOptions; + private RedshiftExecutionConfigOptions executionConfigOptions; + private RedshiftCopyModeConfigOptions copyModeConfigOptions; private transient String[] fieldNames; @@ -156,6 +162,24 @@ public Builder withRedshiftConfigOptions( return this; } + public Builder withConnectionConfigOptions( + RedshiftConnectionConfigOptions connectionConfigOptions) { + this.connectionConfigOptions = connectionConfigOptions; + return this; + } + + public Builder withExecutionConfigOptions( + RedshiftExecutionConfigOptions executionConfigOptions) { + this.executionConfigOptions = executionConfigOptions; + return this; + } + + public Builder withCopyModeConfigOptions( + RedshiftCopyModeConfigOptions copyModeConfigOptions) { + this.copyModeConfigOptions = copyModeConfigOptions; + return this; + } + public Builder withDataType(DataType dataType) { this.dataType = dataType; return this; @@ -182,8 +206,7 @@ public Builder withFieldDataTypes(DataType[] fieldDataTypes) { public AbstractRedshiftRichOutputFormat build() { RedshiftConnectionProvider redshiftConnectionProvider = - new RedshiftJdbcConnectionProvider( - redshiftConfigOptions.getConnectionConfigOptions()); + new RedshiftJdbcConnectionProvider(connectionConfigOptions); try { BaseConnection connection = redshiftConnectionProvider.getOrEstablishConnection(); @@ -198,6 +221,9 @@ public AbstractRedshiftRichOutputFormat build() { return new RedshiftRichOutputFormat( redshiftConnectionProvider, redshiftConfigOptions, + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions, fieldNames, primaryKeyFields, logicalTypes); diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/RedshiftRichOutputFormat.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/RedshiftRichOutputFormat.java index 4379fdfa..7c0f9bc6 100644 --- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/RedshiftRichOutputFormat.java +++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/RedshiftRichOutputFormat.java @@ -21,6 +21,9 @@ import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider; import org.apache.flink.connector.redshift.internal.executor.RedshiftExecutor; import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions; +import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.util.Preconditions; @@ -37,26 +40,35 @@ public class RedshiftRichOutputFormat extends AbstractRedshiftRichOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(RedshiftRichOutputFormat.class); - private final transient RedshiftConnectionProvider redshiftConnectionProvider; + private final RedshiftConnectionProvider redshiftConnectionProvider; - private final transient InternalRedshiftConfigOptions redshiftConfigOptions; + private final InternalRedshiftConfigOptions redshiftConfigOptions; + private final RedshiftConnectionConfigOptions connectionConfigOptions; + private final RedshiftExecutionConfigOptions executionConfigOptions; + private final RedshiftCopyModeConfigOptions copyModeConfigOptions; - private transient RedshiftExecutor executor; + private RedshiftExecutor executor; - private transient int batchCount = 0; + private int batchCount = 0; - private final transient String[] fieldNames; - private final transient String[] primaryKeyFields; - private final transient LogicalType[] fieldDataTypes; + private final String[] fieldNames; + private final String[] primaryKeyFields; + private final LogicalType[] fieldDataTypes; protected RedshiftRichOutputFormat( RedshiftConnectionProvider redshiftConnectionProvider, InternalRedshiftConfigOptions redshiftConfigOptions, + RedshiftConnectionConfigOptions connectionConfigOptions, + RedshiftExecutionConfigOptions executionConfigOptions, + RedshiftCopyModeConfigOptions copyModeConfigOptions, String[] fieldNames, String[] primaryKeyFields, LogicalType[] fieldDataTypes) { this.redshiftConnectionProvider = Preconditions.checkNotNull(redshiftConnectionProvider); this.redshiftConfigOptions = Preconditions.checkNotNull(redshiftConfigOptions); + this.connectionConfigOptions = Preconditions.checkNotNull(connectionConfigOptions); + this.executionConfigOptions = Preconditions.checkNotNull(executionConfigOptions); + this.copyModeConfigOptions = Preconditions.checkNotNull(copyModeConfigOptions); this.fieldNames = Preconditions.checkNotNull(fieldNames); this.fieldDataTypes = Preconditions.checkNotNull(fieldDataTypes); this.primaryKeyFields = Preconditions.checkNotNull(primaryKeyFields); @@ -67,13 +79,17 @@ public void open(int taskNumber, int numTasks) throws IOException { try { executor = RedshiftExecutor.createRedshiftExecutor( - redshiftConfigOptions, fieldNames, primaryKeyFields, fieldDataTypes); + redshiftConfigOptions, + connectionConfigOptions, + executionConfigOptions, + copyModeConfigOptions, + fieldNames, + primaryKeyFields, + fieldDataTypes); executor.prepareStatements(redshiftConnectionProvider); LOG.info("Executor: " + executor); - InternalRedshiftConfigOptions.ExecutionConfigOptions executionConfigOptions = - redshiftConfigOptions.getExecutionConfigOptions(); long flushIntervalMillis = executionConfigOptions.getFlushInterval().toMillis(); scheduledFlush(flushIntervalMillis, executor.getName()); } catch (Exception exception) { @@ -88,7 +104,7 @@ public synchronized void writeRecord(RowData record) throws IOException { try { executor.addToBatch(record); batchCount++; - if (batchCount >= redshiftConfigOptions.getExecutionConfigOptions().getBatchSize()) { + if (batchCount >= executionConfigOptions.getBatchSize()) { flush(); } } catch (SQLException exception) {