Skip to content

Commit

Permalink
Merge pull request #519 from cloudsufi/oracle-date-issue-release
Browse files Browse the repository at this point in the history
[cherrypick][PLUGIN-1812] Added fix for date datatype in oracle sink
  • Loading branch information
psainics authored Oct 24, 2024
2 parents 1f731da + d0243cb commit bb20d77
Show file tree
Hide file tree
Showing 27 changed files with 161 additions and 28 deletions.
2 changes: 1 addition & 1 deletion amazon-redshift-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Amazon Redshift plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion aurora-mysql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Aurora DB MySQL plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion aurora-postgresql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Aurora DB PostgreSQL plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion cloudsql-mysql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>CloudSQL MySQL plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion cloudsql-postgresql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>CloudSQL PostgreSQL plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion database-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Database Commons</name>
Expand Down
2 changes: 1 addition & 1 deletion db2-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>IBM DB2 plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion generic-database-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Generic database plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion generic-db-argument-setter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Generic database argument setter plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion mariadb-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Maria DB plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion memsql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Memsql plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion mssql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Microsoft SQL Server plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion mysql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Mysql plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion netezza-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Netezza plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion oracle-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>Oracle plugin</name>
Expand Down
51 changes: 51 additions & 0 deletions oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,54 @@ Feature: Oracle - Verify data transfer from BigQuery source to Oracle sink
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table with case


@BQ_SOURCE_TEST_DATE @ORACLE_DATE_TABLE
Scenario: To verify data is getting transferred from BigQuery source to Oracle sink successfully when schema is having date and timestamp fields
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Oracle" from the plugins list as: "Sink"
Then Connect plugins: "BigQuery" and "Oracle" to establish connection
Then Navigate to the properties page of plugin: "BigQuery"
Then Replace input plugin property: "project" with value: "projectId"
Then Enter input plugin property: "datasetProject" with value: "projectId"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Verify the Output Schema matches the Expected Schema: "outputDatatypesDateTimeSchema"
Then Validate "BigQuery" plugin properties
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Oracle"
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Enter input plugin property: "referenceName" with value: "sourceRef"
Then Replace input plugin property: "database" with value: "databaseName"
Then Replace input plugin property: "tableName" with value: "targetTable"
Then Replace input plugin property: "dbSchemaName" with value: "schema"
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Enter input plugin property: "referenceName" with value: "targetRef"
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Validate "Oracle" plugin properties
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Verify the preview of pipeline is "success"
Then Click on preview data for Oracle sink
Then Close the preview data
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table
48 changes: 41 additions & 7 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.List;
Expand All @@ -44,6 +49,13 @@

public class BQValidation {

private static final List<SimpleDateFormat> TIMESTAMP_DATE_FORMATS = Arrays.asList(
new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss"),
new SimpleDateFormat("yyyy-MM-dd"));
private static final List<DateTimeFormatter> TIMESTAMP_TZ_DATE_FORMATS = Arrays.asList(
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX"));

/**
* Extracts entire data from source and target tables.
*
Expand Down Expand Up @@ -173,21 +185,43 @@ public static boolean compareResultSetAndJsonData(ResultSet rsSource, List<JsonO

case Types.TIMESTAMP:
Timestamp sourceTS = rsSource.getTimestamp(columnName);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss");
String targetT = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date dateParsed = dateFormat.parse(targetT);
Date dateParsed = null;
for (SimpleDateFormat dateTimeFormatter : TIMESTAMP_DATE_FORMATS) {
try {
dateParsed = dateTimeFormatter.parse(targetT);
break;
} catch (ParseException exception) {
// do nothing
}
}
Timestamp targetTs = new java.sql.Timestamp(dateParsed.getTime());
result = String.valueOf(sourceTS).equals(String.valueOf(targetTs));
result = sourceTS.equals(targetTs);
Assert.assertTrue("Different values found for column : %s", result);
break;

case OracleSourceSchemaReader.TIMESTAMP_TZ:
Timestamp sourceTZ = rsSource.getTimestamp(columnName);
SimpleDateFormat dateValue = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
String targetTS = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date date = dateValue.parse(targetTS);
Timestamp targetTZ = new Timestamp(date.getTime());
Assert.assertTrue("Different columns found for Timestamp", sourceTZ.equals(targetTZ));
ZonedDateTime targetDate = null;
for (DateTimeFormatter dateTimeFormatter : TIMESTAMP_TZ_DATE_FORMATS) {
try {
targetDate = ZonedDateTime.parse(targetTS, dateTimeFormatter);
break;
} catch (DateTimeParseException exception) {
// do nothing
}
}
Assert.assertTrue("Different columns found for Timestamp",
sourceTZ.toLocalDateTime().equals(targetDate.toLocalDateTime()));
break;

case OracleSourceSchemaReader.TIMESTAMP_LTZ:
Timestamp sourceLTZ = rsSource.getTimestamp(columnName);
String targetLTZ = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Assert.assertTrue("Different columns found for Timestamp",
sourceLTZ.toLocalDateTime().equals(LocalDateTime.parse(targetLTZ, formatter)));
break;

case OracleSourceSchemaReader.BINARY_FLOAT:
Expand Down
10 changes: 10 additions & 0 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/OracleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,14 @@ public static void deleteTable(String schema, String table)
}
}
}

public static void createTargetDateTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) {
String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable +
"(ID varchar2(100),DATE_COL DATE,TIMESTAMP_TZ_COL TIMESTAMP WITH TIME ZONE,TIMESTAMP_LTZ_COL " +
"TIMESTAMP WITH LOCAL TIME ZONE,INTERVAL_YM_COL INTERVAL YEAR TO MONTH,DATE_TYPE DATE)";
statement.executeUpdate(createTargetTableQuery);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,34 @@ public static void deleteTempSourceBQTableSmallCase() throws IOException, Interr
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void createTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("CreateBQTableQueryFileDate"),
PluginPropertyUtils.pluginProp("InsertBQDataQueryFileDate"));
}

@After(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void deleteTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
String bqSourceTable = PluginPropertyUtils.pluginProp("bqSourceTable");
BigQueryClient.dropBqQuery(bqSourceTable);
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 2, value = "@ORACLE_DATE_TABLE")
public static void createOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.createTargetDateTable(PluginPropertyUtils.pluginProp("targetTable"),
PluginPropertyUtils.pluginProp("schema"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " created successfully");
}

@After(order = 2, value = "@ORACLE_DATE_TABLE")
public static void dropOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.deleteTable(PluginPropertyUtils.pluginProp("schema"),
PluginPropertyUtils.pluginProp("targetTable"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " deleted successfully");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ InsertBQDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt
#bq queries file path for Small Case Schema
CreateBQTableQueryFileSmallCase=testdata/BigQuery/BigQueryCreateTableQuerySmallCase.txt
InsertBQDataQueryFileSmallCase=testdata/BigQuery/BigQueryInsertDataQuerySmallCase.txt
#bq queries file path for various Date time type Fields
CreateBQTableQueryFileDate=testdata/BigQuery/CreateBQTableQueryFileDate.txt
InsertBQDataQueryFileDate=testdata/BigQuery/InsertBQDataQueryFileDate.txt

#ORACLE Datatypes
bigQueryColumns=(COL23 FLOAT(4), COL28 TIMESTAMP, COL29 TIMESTAMP(9), COL30 TIMESTAMP WITH TIME ZONE, \
Expand All @@ -125,3 +128,6 @@ outputDatatypesSchema1=[{"key":"COL23","value":"double"},{"key":"COL28","value":
{"key":"COL29","value":"datetime"},{"key":"COL30","value":"timestamp"},{"key":"COL31","value":"string"},\
{"key":"COL32","value":"string"},{"key":"COL33","value":"datetime"},{"key":"COL34","value":"float"},\
{"key":"COL35","value":"double"}]
outputDatatypesDateTimeSchema=[{"key":"ID","value":"string"},{"key":"DATE_COL","value":"datetime"},\
{"key":"TIMESTAMP_TZ_COL","value":"timestamp"},{"key":"TIMESTAMP_LTZ_COL","value":"datetime"},\
{"key":"INTERVAL_YM_COL","value":"string"},{"key":"DATE_TYPE","value":"date"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE TABLE `DATASET.TABLE_NAME` (ID STRING NOT NULL,DATE_COL DATETIME, TIMESTAMP_TZ_COL TIMESTAMP, TIMESTAMP_LTZ_COL DATETIME, INTERVAL_YM_COL STRING,DATE_TYPE DATE);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO `DATASET.TABLE_NAME` (id, date_col, timestamp_tz_col, timestamp_ltz_col, interval_ym_col,date_type) VALUES('2', '2024-10-11', '2024-10-11 14:30:00.123456+00:00', '2024-10-11 14:30:00.123456','2-6','2024-10-11');
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
String timestampString = Timestamp.valueOf(localDateTime).toString();
Object timestampWithTimeZone = createOracleTimestamp(stmt.getConnection(), timestampString);
stmt.setObject(sqlIndex, timestampWithTimeZone);
} else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType())) {
// Deprecated: Handle the case when the Timestamp is mapped to CDAP Timestamp type
} else {
// Handle the case when the Timestamp is mapped to CDAP Timestamp type or CDAP Date type.
super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>io.cdap.plugin</groupId>
<artifactId>database-plugins-parent</artifactId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
<packaging>pom</packaging>
<name>Database Plugins</name>
<description>Collection of database plugins</description>
Expand Down
2 changes: 1 addition & 1 deletion postgresql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>PostgreSQL plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion saphana-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<name>SAP HANA plugin</name>
Expand Down
2 changes: 1 addition & 1 deletion teradata-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>database-plugins-parent</artifactId>
<groupId>io.cdap.plugin</groupId>
<version>1.11.4-SNAPSHOT</version>
<version>1.11.4</version>
</parent>

<artifactId>teradata-plugin</artifactId>
Expand Down

0 comments on commit bb20d77

Please sign in to comment.