Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33132] Flink Connector Redshift TableSink Implementation #114

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions flink-connector-aws/flink-connector-redshift/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Flink Redshift Connector

This is the initial Proof of Concept for Flink connector redshift in 2 modes

- read.mode = JDBC
- read.mode = COPY

This POC only supports Sink Table.

## Connector Options
| Option | Required | Default | Type | Description |
|:-------|:---------|:---------|:-----|:------------|
hostname | required | none | String | Redshift connection hostname
port | required | 5439 | Integer | Redshift connection port
username | required | none | String | Redshift user username
password | required | none | String | Redshift user password
database-name | required | dev | String | Redshift database to connect
table-name | required | none | String | Reshift table name
sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data.
sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data.
sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed.
copy-mode | required | false | Boolean | Using Redshift COPY command to insert/upsert or not.
copy-temp-s3-uri | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a S3 URI.
iam-role-arn | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser.

**Update/Delete Data Considerations:**
The data is updated and deleted by the primary key.

## Data Type Mapping

| Flink Type | Redshift Type |
|:--------------------|:--------------------------------------------------------|
| CHAR | VARCHAR |
| VARCHAR | VARCHAR |
| STRING | VARCHAR |
| BOOLEAN | Boolean |
| BYTES | Not supported |
| DECIMAL | Decimal |
| TINYINT | Int8 |
| SMALLINT | Int16 |
| INTEGER | Int32 |
| BIGINT | Int64 |
| FLOAT | Float32 |
| DOUBLE | Float64 |
| DATE | Date |
| TIME | Timestamp |
| TIMESTAMP | Timestamp |
| TIMESTAMP_LTZ | Timestamp |
| INTERVAL_YEAR_MONTH | Int32 |
| INTERVAL_DAY_TIME | Int64 |
| ARRAY | Not supported |
| MAP | Not supported |
| ROW | Not supported |
| MULTISET | Not supported |
| RAW | Not supported |



## How POC is Tested

### Create and sink a table in pure JDBC mode

```SQL

-- register a Redshift table `t_user` in flink sql.
CREATE TABLE t_user (
`user_id` BIGINT,
`user_type` INTEGER,
`language` STRING,
`country` STRING,
`gender` STRING,
`score` DOUBLE,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'redshift',
'hostname' = 'xxxx.redshift.awsamazon.com',
'port' = '5439',
'username' = 'awsuser',
'password' = 'passwordxxxx',
'database-name' = 'tutorial',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);

-- write data into the Redshift table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`) FROM T;

```

### Create and sink a table in COPY mode

```SQL

-- register a Redshift table `t_user` in flink sql.
CREATE TABLE t_user (
`user_id` BIGINT,
`user_type` INTEGER,
`language` STRING,
`country` STRING,
`gender` STRING,
`score` DOUBLE,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'redshift',
'hostname' = 'xxxx.redshift.awsamazon.com',
'port' = '5439',
'username' = 'awsuser',
'password' = 'passwordxxxx',
'database-name' = 'tutorial',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3',
'copy-mode' = 'true',
'copy-temp-s3-uri' = 's3://bucket-name/key/temp',
'iam-role-arn' = 'arn:aws:iam::xxxxxxxx:role/xxxxxRedshiftS3Rolexxxxx'
);
```
125 changes: 125 additions & 0 deletions flink-connector-aws/flink-connector-redshift/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-parent</artifactId>
<version>4.3-SNAPSHOT</version>
</parent>

<artifactId>flink-connector-redshift</artifactId>
<name>Flink : Connectors : AWS : Amazon Redshift</name>

<properties>
<redshift.jdbc.version>2.1.0.17</redshift.jdbc.version>
<commons-logging.version>1.2</commons-logging.version>
<commons-csv.version>1.10.0</commons-csv.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>

<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-base</artifactId>
<version>${parent.version}</version>
<scope>provided</scope>
</dependency>

<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->

<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-java</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->

<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->

<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-api-java-bridge</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->


<dependency>
<groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId>
<version>${redshift.jdbc.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>${commons-csv.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>${aws.sdkv1.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdkv1.version}</version>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.redshift.connection;

import org.apache.flink.connector.redshift.options.RedshiftOptions;

import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.DriverManager;
import java.sql.SQLException;

/** Redshift Connection Provider. */
public class RedshiftConnectionProvider implements Serializable {
private static final long serialVersionUID = 1L;

static final Logger LOG = LoggerFactory.getLogger(RedshiftConnectionProvider.class);

private static final String REDSHIFT_DRIVER_NAME = "com.amazon.redshift.Driver";

private transient RedshiftConnectionImpl connection;

private final RedshiftOptions options;

public RedshiftConnectionProvider(RedshiftOptions options) {
this.options = options;
}

public synchronized RedshiftConnectionImpl getConnection() throws SQLException {
if (connection == null) {
connection =
createConnection(
options.getHostname(), options.getPort(), options.getDatabaseName());
}
return connection;
}

private RedshiftConnectionImpl createConnection(String hostname, int port, String dbName)
throws SQLException {
// String url = parseUrl(urls);

RedshiftConnectionImpl conn;
String url = "jdbc:redshift://" + hostname + ":" + port + "/" + dbName;
LOG.info("connection to {}", url);

try {
Class.forName(REDSHIFT_DRIVER_NAME);
} catch (ClassNotFoundException e) {
throw new SQLException(e);
}

if (options.getUsername().isPresent()) {
conn =
(RedshiftConnectionImpl)
DriverManager.getConnection(
url,
options.getUsername().orElse(null),
options.getPassword().orElse(null));
} else {
conn = (RedshiftConnectionImpl) DriverManager.getConnection(url);
}

return conn;
}

public void closeConnection() throws SQLException {
if (this.connection != null) {
this.connection.close();
}
}

public RedshiftConnectionImpl getOrCreateConnection() throws SQLException {
if (connection == null) {
connection =
createConnection(
options.getHostname(), options.getPort(), options.getDatabaseName());
}
return connection;
}
}
Loading