From d01b5aee43ebf679d9ca6c1558c2e18978fd1d94 Mon Sep 17 00:00:00 2001
From: wenmo <32723967+wenmo@users.noreply.github.com>
Date: Sun, 24 Dec 2023 23:39:10 +0800
Subject: [PATCH] [Optimization-2734][cdc] Optimize CDCSOURCE convert type and
add extends path
---
.../main/java/org/dinky/init/SystemInit.java | 3 +-
dinky-assembly/src/main/assembly/package.xml | 13 +++--
dinky-cdc/dinky-cdc-core/pom.xml | 46 ++++++++++++++++++
.../org/dinky/cdc/AbstractSinkBuilder.java | 37 +++++++++-----
.../org/dinky/cdc/sql/SQLSinkBuilder.java | 30 ++++++------
.../java/org/dinky/cdc/SinkBuilderTest.java | 48 +++++++++++++++++++
.../org/dinky/executor/VariableManager.java | 2 +-
script/bin/auto.sh | 2 +-
8 files changed, 144 insertions(+), 37 deletions(-)
create mode 100644 dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java
diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java
index 791ea505f6..5ea552f8c7 100644
--- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java
+++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java
@@ -26,7 +26,6 @@
import org.dinky.daemon.pool.ScheduleThreadPool;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
-import org.dinky.data.exception.BusException;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.Configuration;
import org.dinky.data.model.SystemConfiguration;
@@ -204,7 +203,7 @@ private void aboutDolphinSchedulerInitOperation(Object v) {
}
} catch (Exception e) {
log.error("Error in DolphinScheduler: ", e);
- throw new BusException(
+ log.error(
"get or create DolphinScheduler project failed, please check the config of DolphinScheduler!");
}
}
diff --git a/dinky-assembly/src/main/assembly/package.xml b/dinky-assembly/src/main/assembly/package.xml
index 477e2beba3..3673874523 100644
--- a/dinky-assembly/src/main/assembly/package.xml
+++ b/dinky-assembly/src/main/assembly/package.xml
@@ -41,7 +41,6 @@
**/*.yaml
**/log4j2.xml
**/DinkyFlinkDockerfile
-
@@ -70,7 +69,7 @@
${project.parent.basedir}/build/extends/
- plugins/flink1.14
+ extends/flink1.14/dinky
dinky-catalog-mysql-1.14-${project.version}.jar
dinky-client-1.14-${project.version}.jar
@@ -79,7 +78,7 @@
${project.parent.basedir}/build/extends/
- plugins/flink1.15
+ extends/flink1.15/dinky
dinky-catalog-mysql-1.15-${project.version}.jar
dinky-client-1.15-${project.version}.jar
@@ -87,7 +86,7 @@
${project.parent.basedir}/build/extends/
- plugins/flink1.16
+ extends/flink1.16/dinky
dinky-catalog-mysql-1.16-${project.version}.jar
dinky-client-1.16-${project.version}.jar
@@ -95,7 +94,7 @@
${project.parent.basedir}/build/extends/
- plugins/flink1.17
+ extends/flink1.17/dinky
dinky-catalog-mysql-1.17-${project.version}.jar
dinky-client-1.17-${project.version}.jar
@@ -103,7 +102,7 @@
${project.parent.basedir}/build/extends/
- plugins/flink1.18
+ extends/flink1.18/dinky
dinky-catalog-mysql-1.18-${project.version}.jar
dinky-client-1.18-${project.version}.jar
@@ -111,7 +110,7 @@
${project.parent.basedir}/build/extends/
- plugins
+ extends
dinky-client-base-${project.version}.jar
diff --git a/dinky-cdc/dinky-cdc-core/pom.xml b/dinky-cdc/dinky-cdc-core/pom.xml
index 5948d197c9..fb64032773 100644
--- a/dinky-cdc/dinky-cdc-core/pom.xml
+++ b/dinky-cdc/dinky-cdc-core/pom.xml
@@ -38,5 +38,51 @@
dinky-flink-${dinky.flink.version}
${scope.runtime}
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.mockito
+ mockito-core
+ jar
+ test
+
+
+ org.powermock
+ powermock-module-junit4
+ jar
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ jar
+ test
+
+
+ org.mockito
+ mockito-core
+
+
+
+
+ org.hamcrest
+ hamcrest-all
+ jar
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java
index 5447260fc6..82b5ac7e59 100644
--- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java
+++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java
@@ -40,7 +40,6 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.logical.BigIntType;
@@ -337,7 +336,11 @@ public LogicalType getLogicalType(Column column) {
return new DateType();
case LOCAL_DATETIME:
case TIMESTAMP:
- return new TimestampType();
+ if (column.getLength() != null) {
+ return new TimestampType(column.getLength());
+ } else {
+ return new TimestampType(3);
+ }
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
@@ -409,18 +412,28 @@ protected Optional