diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index 791ea505f6..5ea552f8c7 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -26,7 +26,6 @@ import org.dinky.daemon.pool.ScheduleThreadPool; import org.dinky.daemon.task.DaemonTask; import org.dinky.daemon.task.DaemonTaskConfig; -import org.dinky.data.exception.BusException; import org.dinky.data.exception.DinkyException; import org.dinky.data.model.Configuration; import org.dinky.data.model.SystemConfiguration; @@ -204,7 +203,7 @@ private void aboutDolphinSchedulerInitOperation(Object v) { } } catch (Exception e) { log.error("Error in DolphinScheduler: ", e); - throw new BusException( + log.error( "get or create DolphinScheduler project failed, please check the config of DolphinScheduler!"); } } diff --git a/dinky-assembly/src/main/assembly/package.xml b/dinky-assembly/src/main/assembly/package.xml index 477e2beba3..3673874523 100644 --- a/dinky-assembly/src/main/assembly/package.xml +++ b/dinky-assembly/src/main/assembly/package.xml @@ -41,7 +41,6 @@ **/*.yaml **/log4j2.xml **/DinkyFlinkDockerfile - @@ -70,7 +69,7 @@ ${project.parent.basedir}/build/extends/ - plugins/flink1.14 + extends/flink1.14/dinky dinky-catalog-mysql-1.14-${project.version}.jar dinky-client-1.14-${project.version}.jar @@ -79,7 +78,7 @@ ${project.parent.basedir}/build/extends/ - plugins/flink1.15 + extends/flink1.15/dinky dinky-catalog-mysql-1.15-${project.version}.jar dinky-client-1.15-${project.version}.jar @@ -87,7 +86,7 @@ ${project.parent.basedir}/build/extends/ - plugins/flink1.16 + extends/flink1.16/dinky dinky-catalog-mysql-1.16-${project.version}.jar dinky-client-1.16-${project.version}.jar @@ -95,7 +94,7 @@ ${project.parent.basedir}/build/extends/ - plugins/flink1.17 + extends/flink1.17/dinky dinky-catalog-mysql-1.17-${project.version}.jar dinky-client-1.17-${project.version}.jar @@ -103,7 +102,7 @@ ${project.parent.basedir}/build/extends/ - plugins/flink1.18 + extends/flink1.18/dinky dinky-catalog-mysql-1.18-${project.version}.jar dinky-client-1.18-${project.version}.jar @@ -111,7 +110,7 @@ ${project.parent.basedir}/build/extends/ - plugins + extends dinky-client-base-${project.version}.jar diff --git a/dinky-cdc/dinky-cdc-core/pom.xml b/dinky-cdc/dinky-cdc-core/pom.xml index 5948d197c9..fb64032773 100644 --- a/dinky-cdc/dinky-cdc-core/pom.xml +++ b/dinky-cdc/dinky-cdc-core/pom.xml @@ -38,5 +38,51 @@ dinky-flink-${dinky.flink.version} ${scope.runtime} + + + org.junit.vintage + junit-vintage-engine + test + + + org.assertj + assertj-core + test + + + org.mockito + mockito-core + jar + test + + + org.powermock + powermock-module-junit4 + jar + test + + + org.powermock + powermock-api-mockito2 + jar + test + + + org.mockito + mockito-core + + + + + org.hamcrest + hamcrest-all + jar + test + + + org.testcontainers + junit-jupiter + test + diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java index 5447260fc6..82b5ac7e59 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java @@ -40,7 +40,6 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.types.logical.BigIntType; @@ -337,7 +336,11 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCAL_DATETIME: case TIMESTAMP: - return new TimestampType(); + if (column.getLength() != null) { + return new TimestampType(column.getLength()); + } else { + return new TimestampType(3); + } case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: @@ -409,18 +412,28 @@ protected Optional convertDecimalType(Object value, LogicalType logicalT protected Optional convertTimestampType(Object value, LogicalType logicalType) { if (logicalType instanceof TimestampType) { if (value instanceof Integer) { - return Optional.of(TimestampData.fromLocalDateTime(Instant.ofEpochMilli(((Integer) value).longValue()) + return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) .atZone(sinkTimeZone) - .toLocalDateTime())); - } - - if (value instanceof Long) { - return Optional.of(TimestampData.fromLocalDateTime( - Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime())); + .toLocalDateTime()); + } else if (value instanceof String) { + return Optional.of( + Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); + } else { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Optional.of(Instant.ofEpochMilli((long) value) + .atZone(sinkTimeZone) + .toLocalDateTime()); + } else if (logicalType1.getPrecision() > 3) { + return Optional.of( + Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(sinkTimeZone) + .toLocalDateTime()); + } + return Optional.of(Instant.ofEpochSecond(((long) value)) + .atZone(sinkTimeZone) + .toLocalDateTime()); } - - return Optional.of(TimestampData.fromLocalDateTime( - Instant.parse(value.toString()).atZone(sinkTimeZone).toLocalDateTime())); } return Optional.empty(); } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index 2629c0f747..2cc48cb2cf 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -164,23 +164,25 @@ protected Optional convertTimestampType(Object value, LogicalType logica return Optional.of(Instant.ofEpochMilli(((Integer) value).longValue()) .atZone(sinkTimeZone) .toLocalDateTime()); - } - - if (value instanceof String) { + } else if (value instanceof String) { return Optional.of( Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime()); + } else { + TimestampType logicalType1 = (TimestampType) logicalType; + if (logicalType1.getPrecision() == 3) { + return Optional.of(Instant.ofEpochMilli((long) value) + .atZone(sinkTimeZone) + .toLocalDateTime()); + } else if (logicalType1.getPrecision() > 3) { + return Optional.of( + Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(sinkTimeZone) + .toLocalDateTime()); + } + return Optional.of(Instant.ofEpochSecond(((long) value)) + .atZone(sinkTimeZone) + .toLocalDateTime()); } - - TimestampType timestampType = (TimestampType) logicalType; - // 转换为毫秒 - if (timestampType.getPrecision() > 3) { - return Optional.of( - Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, timestampType.getPrecision() - 3.0)) - .atZone(sinkTimeZone) - .toLocalDateTime()); - } - return Optional.of( - Instant.ofEpochSecond(((long) value)).atZone(sinkTimeZone).toLocalDateTime()); } return Optional.empty(); } diff --git a/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java new file mode 100644 index 0000000000..b609a8f103 --- /dev/null +++ b/dinky-cdc/dinky-cdc-core/src/test/java/org/dinky/cdc/SinkBuilderTest.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.cdc; + +import org.dinky.cdc.sql.SQLSinkBuilder; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316123L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316123456L, new TimestampType(6)); + String target0 = "2023-07-09T23:45:16"; + String target3 = "2023-07-09T23:45:16.123"; + String target6 = "2023-07-09T23:45:16.123"; + Assert.assertEquals(target0, value0.toString()); + Assert.assertEquals(target3, value3.toString()); + Assert.assertEquals(target6, value6.toString()); + } +} diff --git a/dinky-core/src/main/java/org/dinky/executor/VariableManager.java b/dinky-core/src/main/java/org/dinky/executor/VariableManager.java index a783793925..c2bc94b3fd 100644 --- a/dinky-core/src/main/java/org/dinky/executor/VariableManager.java +++ b/dinky-core/src/main/java/org/dinky/executor/VariableManager.java @@ -83,7 +83,7 @@ private static void loadExpressionVariableClass() { log.info("load class : {}", fullClassName); } catch (ClassNotFoundException e) { log.error( - "The class [{}] that needs to be loaded may not be loaded by dinky or there is no jar file of this class under dinky's lib/plugins. Please check, and try again. {}", + "The class [{}] that needs to be loaded may not be loaded by dinky or there is no jar file of this class under dinky's lib/plugins/extends. Please check, and try again. {}", fullClassName, e.getMessage(), e); diff --git a/script/bin/auto.sh b/script/bin/auto.sh index 4570967c6b..4415460cfb 100644 --- a/script/bin/auto.sh +++ b/script/bin/auto.sh @@ -5,7 +5,7 @@ FLINK_VERSION=${2:-1.18} JAR_NAME="dinky-admin" # Use FLINK_HOME: -CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/dinky/*:./plugins/flink${FLINK_VERSION}/*" +CLASS_PATH=".:./lib/*:config:./plugins/*:./customJar/*:./plugins/flink${FLINK_VERSION}/dinky/*:./plugins/flink${FLINK_VERSION}/*:./extends/flink${FLINK_VERSION}/dinky/*:./extends/flink${FLINK_VERSION}/*" PID_FILE="dinky.pid"