Skip to content

Commit

Permalink
Serialization error issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Samrat002 committed Nov 16, 2023
1 parent 1671a65 commit 118e644
Show file tree
Hide file tree
Showing 15 changed files with 490 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.connector.redshift.internal.connection;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions;

import com.amazon.redshift.core.BaseConnection;
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
Expand All @@ -44,10 +44,9 @@ public class RedshiftJdbcConnectionProvider implements RedshiftConnectionProvide

private transient BaseConnection connection;

private final transient InternalRedshiftConfigOptions.ConnectionConfigOptions connectionOptions;
private final transient RedshiftConnectionConfigOptions connectionOptions;

public RedshiftJdbcConnectionProvider(
InternalRedshiftConfigOptions.ConnectionConfigOptions configOptions) {
public RedshiftJdbcConnectionProvider(RedshiftConnectionConfigOptions configOptions) {
this.connectionOptions = configOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement;
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions;
import org.apache.flink.connector.redshift.util.S3Util;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
Expand All @@ -43,19 +46,19 @@ public class RedshiftCopyModeBatchExecutor implements RedshiftExecutor {

private static final Logger LOG = LoggerFactory.getLogger(RedshiftCopyModeBatchExecutor.class);

private final transient int maxRetries;
private final int maxRetries;

private final transient String tableName;
private final String tableName;

private final transient String[] fieldNames;

private transient String sql;
private String sql;

private final transient RedshiftCopyModeRowConverter copyRowConverter;
private final RedshiftCopyModeRowConverter copyRowConverter;

private final transient String tempS3Uri;
private final String tempS3Uri;

private final transient String iamRoleArn;
private final String iamRoleArn;

private final transient List<String[]> csvData;

Expand All @@ -64,16 +67,21 @@ public class RedshiftCopyModeBatchExecutor implements RedshiftExecutor {
private transient FieldNamedRedshiftPreparedStatement statement;

public RedshiftCopyModeBatchExecutor(
String[] fieldNames, LogicalType[] fieldTypes, InternalRedshiftConfigOptions options) {
this.tableName = options.getConnectionConfigOptions().getTableName();
String[] fieldNames,
LogicalType[] fieldTypes,
InternalRedshiftConfigOptions options,
RedshiftConnectionConfigOptions connectionConfigOptions,
RedshiftExecutionConfigOptions executionConfigOptions,
RedshiftCopyModeConfigOptions copyModeConfigOptions) {
this.tableName = connectionConfigOptions.getTableName();
this.fieldNames = fieldNames;
this.maxRetries = options.getExecutionConfigOptions().getMaxRetries();
this.maxRetries = executionConfigOptions.getMaxRetries();
this.csvData = new ArrayList<>();
this.s3Client = S3Client.create();
this.copyRowConverter = new RedshiftCopyModeRowConverter(fieldTypes);

this.tempS3Uri = S3Util.getS3UriWithFileName(options.getCopyModeConfigOptions().getS3Uri());
this.iamRoleArn = options.getCopyModeConfigOptions().getIamRoleArn();
this.tempS3Uri = S3Util.getS3UriWithFileName(copyModeConfigOptions.getS3Uri());
this.iamRoleArn = copyModeConfigOptions.getIamRoleArn();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement;
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions;
import org.apache.flink.connector.redshift.util.S3Util;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
Expand All @@ -45,29 +47,29 @@ public class RedshiftCopyModeUpsertExecutor implements RedshiftExecutor {

private static final Logger LOG = LoggerFactory.getLogger(RedshiftCopyModeUpsertExecutor.class);

private final transient int maxRetries;
private final int maxRetries;

private final transient String tableName;
private final String tableName;

private final transient String stageTableName;

private final transient String[] fieldNames;

private final transient String[] keyFields;

private final transient String tempS3Uri;
private final String tempS3Uri;

private final transient String iamRoleArn;
private final String iamRoleArn;

private transient String copyInsertSql;
private String copyInsertSql;

private transient String updateTransactionSql;
private String updateTransactionSql;

private transient String deleteSql;
private String deleteSql;

private final transient RedshiftCopyModeRowConverter deleteConverter;
private final RedshiftCopyModeRowConverter deleteConverter;

private final transient Function<RowData, RowData> deleteExtractor;
private final Function<RowData, RowData> deleteExtractor;

private transient FieldNamedRedshiftPreparedStatement insertStatement;

Expand All @@ -89,22 +91,24 @@ public RedshiftCopyModeUpsertExecutor(
LogicalType[] fieldTypes,
RedshiftCopyModeRowConverter deleteConverter,
Function<RowData, RowData> deleteExtractor,
InternalRedshiftConfigOptions options) {
RedshiftConnectionConfigOptions connectionConfigOptions,
RedshiftExecutionConfigOptions executionConfigOptions,
RedshiftCopyModeConfigOptions copyModeConfigOptions) {

this.maxRetries = options.getExecutionConfigOptions().getMaxRetries();
this.maxRetries = executionConfigOptions.getMaxRetries();
this.fieldNames = fieldNames;
this.keyFields = keyFields;
this.deleteConverter = deleteConverter;
this.deleteExtractor = deleteExtractor;
this.csvInsertData = new ArrayList<>();
this.csvUpdateData = new ArrayList<>();

this.tableName = options.getConnectionConfigOptions().getTableName();
this.iamRoleArn = options.getCopyModeConfigOptions().getIamRoleArn();
this.tableName = connectionConfigOptions.getTableName();
this.iamRoleArn = copyModeConfigOptions.getIamRoleArn();
this.s3Client = S3Client.create();
this.copyRowConverter = new RedshiftCopyModeRowConverter(fieldTypes);
this.stageTableName = "_" + tableName + "_stage";
this.tempS3Uri = S3Util.getS3UriWithFileName(options.getCopyModeConfigOptions().getS3Uri());
this.tempS3Uri = S3Util.getS3UriWithFileName(copyModeConfigOptions.getS3Uri());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement;
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftConnectionConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftCopyModeConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -96,6 +99,9 @@ default void attemptExecuteBatch(

static RedshiftExecutor createRedshiftExecutor(
InternalRedshiftConfigOptions redshiftConfigOptions,
RedshiftConnectionConfigOptions connectionConfigOptions,
RedshiftExecutionConfigOptions executionConfigOptions,
RedshiftCopyModeConfigOptions copyModeConfigOptions,
String[] fieldNames,
String[] primaryKeyFields,
LogicalType[] fieldTypes) {
Expand All @@ -104,23 +110,40 @@ static RedshiftExecutor createRedshiftExecutor(
case COPY:
LOG.info("Create Upload Copy UPSERT Executor.");
return createCopyModeUpsertExecutor(
fieldNames, primaryKeyFields, fieldTypes, redshiftConfigOptions);
fieldNames,
primaryKeyFields,
fieldTypes,
connectionConfigOptions,
executionConfigOptions,
copyModeConfigOptions);
case JDBC:
LOG.info("Create pure JDBC UPSERT Executor.");
return createJdbcModeUpsertExecutor(
fieldNames, primaryKeyFields, fieldTypes, redshiftConfigOptions);
fieldNames,
primaryKeyFields,
fieldTypes,
connectionConfigOptions,
executionConfigOptions);
}

} else {
switch (redshiftConfigOptions.getSinkMode()) {
case JDBC:
LOG.info("Create pure JDBC batch Executor.");
return createJdbcModeBatchExecutor(
fieldNames, fieldTypes, redshiftConfigOptions);
fieldNames,
fieldTypes,
connectionConfigOptions,
executionConfigOptions);
case COPY:
LOG.info("Create Upload Copy batch Executor.");
return createCopyModeBatchExecutor(
fieldNames, fieldTypes, redshiftConfigOptions);
fieldNames,
fieldTypes,
redshiftConfigOptions,
connectionConfigOptions,
executionConfigOptions,
copyModeConfigOptions);
}
}
throw new FlinkRuntimeException("No correct Executor found");
Expand All @@ -129,15 +152,26 @@ static RedshiftExecutor createRedshiftExecutor(
static RedshiftCopyModeBatchExecutor createCopyModeBatchExecutor(
String[] fieldNames,
LogicalType[] fieldTypes,
InternalRedshiftConfigOptions redshiftConfigOptions) {
return new RedshiftCopyModeBatchExecutor(fieldNames, fieldTypes, redshiftConfigOptions);
InternalRedshiftConfigOptions redshiftConfigOptions,
RedshiftConnectionConfigOptions connectionConfigOptions,
RedshiftExecutionConfigOptions executionConfigOptions,
RedshiftCopyModeConfigOptions copyModeConfigOptions) {
return new RedshiftCopyModeBatchExecutor(
fieldNames,
fieldTypes,
redshiftConfigOptions,
connectionConfigOptions,
executionConfigOptions,
copyModeConfigOptions);
}

static RedshiftCopyModeUpsertExecutor createCopyModeUpsertExecutor(
String[] fieldNames,
String[] keyFields,
LogicalType[] fieldTypes,
InternalRedshiftConfigOptions options) {
RedshiftConnectionConfigOptions connectionConfigOptions,
RedshiftExecutionConfigOptions executionConfigOptions,
RedshiftCopyModeConfigOptions copyModeConfigOptions) {

int[] delFields =
Arrays.stream(keyFields)
Expand All @@ -153,24 +187,30 @@ static RedshiftCopyModeUpsertExecutor createCopyModeUpsertExecutor(
fieldTypes,
new RedshiftCopyModeRowConverter(delTypes),
createExtractor(fieldTypes, delFields),
options);
connectionConfigOptions,
executionConfigOptions,
copyModeConfigOptions);
}

static RedshiftJdbcModeBatchExecutor createJdbcModeBatchExecutor(
String[] fieldNames, LogicalType[] fieldTypes, InternalRedshiftConfigOptions options) {
String[] fieldNames,
LogicalType[] fieldTypes,
RedshiftConnectionConfigOptions connectionConfigOptions,
RedshiftExecutionConfigOptions executionConfigOptions) {
String insertSql =
RedshiftStatement.getInsertIntoStatement(
options.getConnectionConfigOptions().getTableName(), fieldNames);
connectionConfigOptions.getTableName(), fieldNames);
RedshiftRowConverter converter = new RedshiftJdbcModeRowConverter(RowType.of(fieldTypes));
return new RedshiftJdbcModeBatchExecutor(insertSql, converter, options);
return new RedshiftJdbcModeBatchExecutor(insertSql, converter, executionConfigOptions);
}

static RedshiftJdbcModeUpsertExecutor createJdbcModeUpsertExecutor(
String[] fieldNames,
String[] keyFields,
LogicalType[] fieldTypes,
InternalRedshiftConfigOptions options) {
String tableName = options.getConnectionConfigOptions().getTableName();
RedshiftConnectionConfigOptions connectionConfigOptions,
RedshiftExecutionConfigOptions executionConfigOptions) {
String tableName = connectionConfigOptions.getTableName();
String insertSql = RedshiftStatement.getInsertIntoStatement(tableName, fieldNames);
String updateSql = RedshiftStatement.getUpdateStatement(tableName, fieldNames, keyFields);
String deleteSql = RedshiftStatement.getDeleteStatement(tableName, keyFields);
Expand Down Expand Up @@ -200,7 +240,7 @@ static RedshiftJdbcModeUpsertExecutor createJdbcModeUpsertExecutor(
new RedshiftJdbcModeRowConverter(RowType.of(delTypes)),
createExtractor(fieldTypes, updFields),
createExtractor(fieldTypes, delFields),
options,
executionConfigOptions,
fieldNames);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;

Expand All @@ -37,19 +37,21 @@ public class RedshiftJdbcModeBatchExecutor implements RedshiftExecutor {

private static final Logger LOG = LoggerFactory.getLogger(RedshiftJdbcModeBatchExecutor.class);

private final transient String sql;
private final String sql;

private final transient RedshiftRowConverter converter;

private final transient int maxRetries;
private final int maxRetries;

private transient FieldNamedRedshiftPreparedStatement statement;

public RedshiftJdbcModeBatchExecutor(
String sql, RedshiftRowConverter converter, InternalRedshiftConfigOptions options) {
String sql,
RedshiftRowConverter converter,
RedshiftExecutionConfigOptions executionConfigOptions) {
this.sql = sql;
this.converter = converter;
this.maxRetries = options.getExecutionConfigOptions().getMaxRetries();
this.maxRetries = executionConfigOptions.getMaxRetries();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
import org.apache.flink.connector.redshift.internal.statement.FieldNamedRedshiftPreparedStatement;
import org.apache.flink.connector.redshift.options.InternalRedshiftConfigOptions;
import org.apache.flink.connector.redshift.options.RedshiftExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;

Expand All @@ -39,11 +39,11 @@ public class RedshiftJdbcModeUpsertExecutor implements RedshiftExecutor {

private static final Logger LOG = LoggerFactory.getLogger(RedshiftJdbcModeUpsertExecutor.class);

private final transient String insertSql;
private final String insertSql;

private final transient String updateSql;
private final String updateSql;

private final transient String deleteSql;
private final String deleteSql;

private final transient RedshiftRowConverter insertConverter;

Expand All @@ -55,7 +55,7 @@ public class RedshiftJdbcModeUpsertExecutor implements RedshiftExecutor {

private final transient Function<RowData, RowData> deleteExtractor;

private final transient int maxRetries;
private final int maxRetries;
private final transient String[] fieldNames;

private transient FieldNamedRedshiftPreparedStatement insertStatement;
Expand All @@ -73,7 +73,7 @@ public RedshiftJdbcModeUpsertExecutor(
RedshiftRowConverter deleteConverter,
Function<RowData, RowData> updateExtractor,
Function<RowData, RowData> deleteExtractor,
InternalRedshiftConfigOptions options,
RedshiftExecutionConfigOptions executionConfigOptions,
String[] fieldNames) {
this.insertSql = insertSql;
this.updateSql = updateSql;
Expand All @@ -83,7 +83,7 @@ public RedshiftJdbcModeUpsertExecutor(
this.deleteConverter = deleteConverter;
this.updateExtractor = updateExtractor;
this.deleteExtractor = deleteExtractor;
this.maxRetries = options.getExecutionConfigOptions().getMaxRetries();
this.maxRetries = executionConfigOptions.getMaxRetries();
this.fieldNames = fieldNames;
}

Expand Down
Loading

0 comments on commit 118e644

Please sign in to comment.