From 6cf93cfe814f3cf668a907e09fee0ca695470be8 Mon Sep 17 00:00:00 2001 From: wenmo <32723967+wenmo@users.noreply.github.com> Date: Mon, 20 Nov 2023 23:38:15 +0800 Subject: [PATCH] [Fix-2556] [cdc] Fix Mysql TimestampType convert to '1970-01-01' in CDCSOURCE --- dlink-client/dlink-client-1.11/pom.xml | 5 + .../com/dlink/cdc/AbstractSinkBuilder.java | 44 ++--- .../com/dlink/cdc/sql/SQLSinkBuilder.java | 14 +- .../com/dlink/cdc/sql/SinkBuilderTest.java | 44 +++++ .../com/dlink/cdc/AbstractSinkBuilder.java | 151 ++++++++++-------- .../com/dlink/cdc/sql/SQLSinkBuilder.java | 14 +- .../com/dlink/cdc/sql/SinkBuilderTest.java | 44 +++++ .../com/dlink/cdc/AbstractSinkBuilder.java | 147 +++++++++-------- .../com/dlink/cdc/sql/SQLSinkBuilder.java | 12 +- .../com/dlink/cdc/sql/SinkBuilderTest.java | 44 +++++ .../com/dlink/cdc/AbstractSinkBuilder.java | 47 +++--- .../com/dlink/cdc/sql/SinkBuilderTest.java | 44 +++++ .../com/dlink/cdc/AbstractSinkBuilder.java | 44 ++--- .../com/dlink/cdc/sql/SQLSinkBuilder.java | 12 +- .../com/dlink/cdc/sql/SinkBuilderTest.java | 44 +++++ .../com/dlink/cdc/AbstractSinkBuilder.java | 2 +- .../com/dlink/cdc/sql/SinkBuilderTest.java | 44 +++++ .../com/dlink/cdc/AbstractSinkBuilder.java | 2 +- .../com/dlink/cdc/sql/SinkBuilderTest.java | 44 +++++ 19 files changed, 587 insertions(+), 215 deletions(-) create mode 100644 dlink-client/dlink-client-1.11/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java create mode 100644 dlink-client/dlink-client-1.12/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java create mode 100644 dlink-client/dlink-client-1.13/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java create mode 100644 dlink-client/dlink-client-1.14/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java create mode 100644 dlink-client/dlink-client-1.15/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java create mode 100644 dlink-client/dlink-client-1.16/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java create mode 100644 dlink-client/dlink-client-1.17/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java diff --git a/dlink-client/dlink-client-1.11/pom.xml b/dlink-client/dlink-client-1.11/pom.xml index d4d54c0ca9..c2dd63d475 100644 --- a/dlink-client/dlink-client-1.11/pom.xml +++ b/dlink-client/dlink-client-1.11/pom.xml @@ -45,6 +45,11 @@ dlink-flink-1.11 provided + + junit + junit + test + diff --git a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 38bfdb0242..47f5194fd8 100644 --- a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -123,9 +123,9 @@ public Map map(String value) throws Exception { } protected SingleOutputStreamOperator shunt( - SingleOutputStreamOperator mapOperator, - Table table, - String schemaFieldName) { + SingleOutputStreamOperator mapOperator, + Table table, + String schemaFieldName) { final String tableName = table.getName(); final String schemaName = table.getSchema(); return mapOperator.filter(new FilterFunction() { @@ -140,18 +140,18 @@ public boolean filter(Map value) throws Exception { } protected DataStream shunt( - SingleOutputStreamOperator processOperator, - Table table, - OutputTag tag) { + SingleOutputStreamOperator processOperator, + Table table, + OutputTag tag) { return processOperator.getSideOutput(tag); } protected DataStream buildRowData( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList, - String schemaTableName) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList, + String schemaTableName) { return filterOperator .flatMap(new FlatMapFunction() { @@ -210,18 +210,18 @@ public void flatMap(Map value, Collector out) throws Exception { } public abstract void addSink( - StreamExecutionEnvironment env, - DataStream rowDataDataStream, - Table table, - List columnNameList, - List columnTypeList); + StreamExecutionEnvironment env, + DataStream rowDataDataStream, + Table table, + List columnNameList, + List columnTypeList); @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); @@ -237,8 +237,8 @@ public DataStreamSource build( buildColumn(columnNameList, columnTypeList, table.getColumns()); - DataStream rowDataDataStream = - buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName()); + DataStream rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, + table.getSchemaTableName()); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); } @@ -290,7 +290,7 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCALDATETIME: case TIMESTAMP: - return new TimestampType(); + return new TimestampType(column.getLength()); case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: diff --git a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index d8d6c11876..9a3ab44852 100644 --- a/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.11/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -211,7 +211,7 @@ public DataStreamSource build( DataStreamSource dataStreamSource) { final String timeZone = config.getSink().get("timezone"); config.getSink().remove("timezone"); - if(config.getSink().containsKey("fenodes")){ + if (config.getSink().containsKey("fenodes")) { config.getSink().remove("url"); } if (Asserts.isNotNullString(timeZone)) { @@ -295,8 +295,18 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else if (logicalType instanceof TimestampType) { if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime(); + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime(); } else { - return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + TimestampType logicalType1 = (TimestampType) logicalType; + // 转换为毫秒 + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(sinkTimeZone).toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) value)).atZone(sinkTimeZone).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); diff --git a/dlink-client/dlink-client-1.11/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java b/dlink-client/dlink-client-1.11/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java new file mode 100644 index 0000000000..7cb0973b77 --- /dev/null +++ b/dlink-client/dlink-client-1.11/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.dlink.cdc.sql; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6)); + String value = "2023-07-09T23:45:16"; + Assert.assertEquals(value, value0.toString()); + Assert.assertEquals(value, value3.toString()); + Assert.assertEquals(value, value6.toString()); + } +} diff --git a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 84ec633f94..47f5194fd8 100644 --- a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -103,7 +103,8 @@ protected Properties getProperties() { Properties properties = new Properties(); Map sink = config.getSink(); for (Map.Entry entry : sink.entrySet()) { - if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) { + if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") + && Asserts.isNotNullString(entry.getValue())) { properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue()); } } @@ -112,6 +113,7 @@ protected Properties getProperties() { protected SingleOutputStreamOperator deserialize(DataStreamSource dataStreamSource) { return dataStreamSource.map(new MapFunction() { + @Override public Map map(String value) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); @@ -121,98 +123,105 @@ public Map map(String value) throws Exception { } protected SingleOutputStreamOperator shunt( - SingleOutputStreamOperator mapOperator, - Table table, - String schemaFieldName) { + SingleOutputStreamOperator mapOperator, + Table table, + String schemaFieldName) { final String tableName = table.getName(); final String schemaName = table.getSchema(); return mapOperator.filter(new FilterFunction() { + @Override public boolean filter(Map value) throws Exception { LinkedHashMap source = (LinkedHashMap) value.get("source"); return tableName.equals(source.get("table").toString()) - && schemaName.equals(source.get(schemaFieldName).toString()); + && schemaName.equals(source.get(schemaFieldName).toString()); } }); } protected DataStream shunt( - SingleOutputStreamOperator processOperator, - Table table, - OutputTag tag) { + SingleOutputStreamOperator processOperator, + Table table, + OutputTag tag) { return processOperator.getSideOutput(tag); } protected DataStream buildRowData( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList, - String schemaTableName) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList, + String schemaTableName) { return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - try { - switch (value.get("op").toString()) { - case "r": - case "c": - GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); - igenericRowData.setRowKind(RowKind.INSERT); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(igenericRowData); - break; - case "d": - GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); - dgenericRowData.setRowKind(RowKind.DELETE); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(dgenericRowData); - break; - case "u": - GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); - ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubgenericRowData); - GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); - uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uagenericRowData); - break; - default: + .flatMap(new FlatMapFunction() { + + @Override + public void flatMap(Map value, Collector out) throws Exception { + try { + switch (value.get("op").toString()) { + case "r": + case "c": + GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); + igenericRowData.setRowKind(RowKind.INSERT); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + igenericRowData.setField(i, + convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(igenericRowData); + break; + case "d": + GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); + dgenericRowData.setRowKind(RowKind.DELETE); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + dgenericRowData.setField(i, + convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(dgenericRowData); + break; + case "u": + GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); + ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubgenericRowData.setField(i, + convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubgenericRowData); + GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); + uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uagenericRowData.setField(i, + convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uagenericRowData); + break; + default: + } + } catch (Exception e) { + logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, + JSONUtil.toJsonString(value), e); + throw e; } - } catch (Exception e) { - logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e); - throw e; } - } - }); + }); } public abstract void addSink( - StreamExecutionEnvironment env, - DataStream rowDataDataStream, - Table table, - List columnNameList, - List columnTypeList); + StreamExecutionEnvironment env, + DataStream rowDataDataStream, + Table table, + List columnNameList, + List columnTypeList); @Override public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); @@ -228,7 +237,8 @@ public DataStreamSource build( buildColumn(columnNameList, columnTypeList, table.getColumns()); - DataStream rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName()); + DataStream rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, + table.getSchemaTableName()); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); } @@ -280,7 +290,7 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCALDATETIME: case TIMESTAMP: - return new TimestampType(); + return new TimestampType(column.getLength()); case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: @@ -295,7 +305,8 @@ protected Object convertValue(Object value, LogicalType logicalType) { if (logicalType instanceof VarCharType) { return StringData.fromString((String) value); } else if (logicalType instanceof DateType) { - return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); + return StringData.fromString( + Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); } else if (logicalType instanceof TimestampType) { return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value))); } else if (logicalType instanceof DecimalType) { diff --git a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index a52217fcb4..ad71dd626b 100644 --- a/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.12/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -244,7 +244,7 @@ public DataStreamSource build( DataStreamSource dataStreamSource) { final String timeZone = config.getSink().get("timezone"); config.getSink().remove("timezone"); - if(config.getSink().containsKey("fenodes")){ + if (config.getSink().containsKey("fenodes")) { config.getSink().remove("url"); } if (Asserts.isNotNullString(timeZone)) { @@ -330,8 +330,18 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else if (logicalType instanceof TimestampType) { if (value instanceof Integer) { return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime(); + } else if (value instanceof String) { + return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime(); } else { - return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + TimestampType logicalType1 = (TimestampType) logicalType; + // 转换为毫秒 + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(sinkTimeZone).toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) value)).atZone(sinkTimeZone).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); diff --git a/dlink-client/dlink-client-1.12/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java b/dlink-client/dlink-client-1.12/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java new file mode 100644 index 0000000000..7cb0973b77 --- /dev/null +++ b/dlink-client/dlink-client-1.12/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.dlink.cdc.sql; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6)); + String value = "2023-07-09T23:45:16"; + Assert.assertEquals(value, value0.toString()); + Assert.assertEquals(value, value3.toString()); + Assert.assertEquals(value, value6.toString()); + } +} diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index c2cf3b632d..d125b272fe 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -103,8 +103,9 @@ protected Properties getProperties() { Properties properties = new Properties(); Map sink = config.getSink(); for (Map.Entry entry : sink.entrySet()) { - if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) { - properties.setProperty(entry.getKey().replace("properties.",""), entry.getValue()); + if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") + && Asserts.isNotNullString(entry.getValue())) { + properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue()); } } return properties; @@ -112,6 +113,7 @@ protected Properties getProperties() { protected SingleOutputStreamOperator deserialize(DataStreamSource dataStreamSource) { return dataStreamSource.map(new MapFunction() { + @Override public Map map(String value) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); @@ -121,17 +123,18 @@ public Map map(String value) throws Exception { } protected SingleOutputStreamOperator shunt( - SingleOutputStreamOperator mapOperator, - Table table, - String schemaFieldName) { + SingleOutputStreamOperator mapOperator, + Table table, + String schemaFieldName) { final String tableName = table.getName(); final String schemaName = table.getSchema(); return mapOperator.filter(new FilterFunction() { + @Override public boolean filter(Map value) throws Exception { LinkedHashMap source = (LinkedHashMap) value.get("source"); return tableName.equals(source.get("table").toString()) - && schemaName.equals(source.get(schemaFieldName).toString()); + && schemaName.equals(source.get(schemaFieldName).toString()); } }); } @@ -145,73 +148,79 @@ protected DataStream shunt( } protected DataStream buildRowData( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList, - String schemaTableName) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList, + String schemaTableName) { return filterOperator - .flatMap(new FlatMapFunction() { - @Override - public void flatMap(Map value, Collector out) throws Exception { - try { - switch (value.get("op").toString()) { - case "r": - case "c": - GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); - igenericRowData.setRowKind(RowKind.INSERT); - Map idata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(igenericRowData); - break; - case "d": - GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); - dgenericRowData.setRowKind(RowKind.DELETE); - Map ddata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(dgenericRowData); - break; - case "u": - GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); - ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); - Map ubdata = (Map) value.get("before"); - for (int i = 0; i < columnNameList.size(); i++) { - ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(ubgenericRowData); - GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); - uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); - Map uadata = (Map) value.get("after"); - for (int i = 0; i < columnNameList.size(); i++) { - uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); - } - out.collect(uagenericRowData); - break; - default: + .flatMap(new FlatMapFunction() { + + @Override + public void flatMap(Map value, Collector out) throws Exception { + try { + switch (value.get("op").toString()) { + case "r": + case "c": + GenericRowData igenericRowData = new GenericRowData(columnNameList.size()); + igenericRowData.setRowKind(RowKind.INSERT); + Map idata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + igenericRowData.setField(i, + convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(igenericRowData); + break; + case "d": + GenericRowData dgenericRowData = new GenericRowData(columnNameList.size()); + dgenericRowData.setRowKind(RowKind.DELETE); + Map ddata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + dgenericRowData.setField(i, + convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(dgenericRowData); + break; + case "u": + GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size()); + ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE); + Map ubdata = (Map) value.get("before"); + for (int i = 0; i < columnNameList.size(); i++) { + ubgenericRowData.setField(i, + convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(ubgenericRowData); + GenericRowData uagenericRowData = new GenericRowData(columnNameList.size()); + uagenericRowData.setRowKind(RowKind.UPDATE_AFTER); + Map uadata = (Map) value.get("after"); + for (int i = 0; i < columnNameList.size(); i++) { + uagenericRowData.setField(i, + convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i))); + } + out.collect(uagenericRowData); + break; + default: + } + } catch (Exception e) { + logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, + JSONUtil.toJsonString(value), e); + throw e; } - } catch (Exception e) { - logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e); - throw e; } - } - }); + }); } public abstract void addSink( - StreamExecutionEnvironment env, - DataStream rowDataDataStream, - Table table, - List columnNameList, - List columnTypeList); + StreamExecutionEnvironment env, + DataStream rowDataDataStream, + Table table, + List columnNameList, + List columnTypeList); public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final List schemaList = config.getSchemaList(); final String schemaFieldName = config.getSchemaFieldName(); @@ -227,7 +236,8 @@ public DataStreamSource build( buildColumn(columnNameList, columnTypeList, table.getColumns()); - DataStream rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName()); + DataStream rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, + table.getSchemaTableName()); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); } @@ -279,7 +289,7 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCALDATETIME: case TIMESTAMP: - return new TimestampType(); + return new TimestampType(column.getLength()); case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: @@ -294,7 +304,8 @@ protected Object convertValue(Object value, LogicalType logicalType) { if (logicalType instanceof VarCharType) { return StringData.fromString((String) value); } else if (logicalType instanceof DateType) { - return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); + return StringData.fromString( + Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); } else if (logicalType instanceof TimestampType) { return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value))); } else if (logicalType instanceof DecimalType) { diff --git a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index 63482043e0..84d8d4bbc9 100644 --- a/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.13/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -244,7 +244,7 @@ public DataStreamSource build( DataStreamSource dataStreamSource) { final String timeZone = config.getSink().get("timezone"); config.getSink().remove("timezone"); - if(config.getSink().containsKey("fenodes")){ + if (config.getSink().containsKey("fenodes")) { config.getSink().remove("url"); } if (Asserts.isNotNullString(timeZone)) { @@ -334,7 +334,15 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else if (value instanceof String) { return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime(); } else { - return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + TimestampType logicalType1 = (TimestampType) logicalType; + // 转换为毫秒 + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(sinkTimeZone).toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) value)).atZone(sinkTimeZone).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); diff --git a/dlink-client/dlink-client-1.13/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java b/dlink-client/dlink-client-1.13/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java new file mode 100644 index 0000000000..7cb0973b77 --- /dev/null +++ b/dlink-client/dlink-client-1.13/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.dlink.cdc.sql; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6)); + String value = "2023-07-09T23:45:16"; + Assert.assertEquals(value, value0.toString()); + Assert.assertEquals(value, value3.toString()); + Assert.assertEquals(value, value6.toString()); + } +} diff --git a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index ad58cc2987..76ab41b994 100644 --- a/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.14/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -125,9 +125,9 @@ public Map map(String value) throws Exception { } protected SingleOutputStreamOperator shunt( - SingleOutputStreamOperator mapOperator, - Table table, - String schemaFieldName) { + SingleOutputStreamOperator mapOperator, + Table table, + String schemaFieldName) { final String tableName = table.getName(); final String schemaName = table.getSchema(); return mapOperator.filter(new FilterFunction() { @@ -142,18 +142,18 @@ public boolean filter(Map value) throws Exception { } protected DataStream shunt( - SingleOutputStreamOperator processOperator, - Table table, - OutputTag tag) { + SingleOutputStreamOperator processOperator, + Table table, + OutputTag tag) { return processOperator.getSideOutput(tag); } protected DataStream buildRowData( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList, - String schemaTableName) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList, + String schemaTableName) { return filterOperator .flatMap(new FlatMapFunction() { @@ -212,17 +212,17 @@ public void flatMap(Map value, Collector out) throws Exception { } public abstract void addSink( - StreamExecutionEnvironment env, - DataStream rowDataDataStream, - Table table, - List columnNameList, - List columnTypeList); + StreamExecutionEnvironment env, + DataStream rowDataDataStream, + Table table, + List columnNameList, + List columnTypeList); public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final String timeZone = config.getSink().get("timezone"); config.getSink().remove("timezone"); @@ -244,8 +244,8 @@ public DataStreamSource build( buildColumn(columnNameList, columnTypeList, table.getColumns()); - DataStream rowDataDataStream = - buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName()); + DataStream rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, + table.getSchemaTableName()); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); } @@ -297,7 +297,7 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCALDATETIME: case TIMESTAMP: - return new TimestampType(); + return new TimestampType(column.getLength()); case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: @@ -313,7 +313,8 @@ protected Object convertValue(Object value, LogicalType logicalType) { return StringData.fromString((String) value); } else if (logicalType instanceof DateType) { return StringData.fromString( - Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); } else if (logicalType instanceof TimestampType) { + Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString()); + } else if (logicalType instanceof TimestampType) { if (value instanceof Integer) { return TimestampData.fromLocalDateTime( Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime()); diff --git a/dlink-client/dlink-client-1.14/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java b/dlink-client/dlink-client-1.14/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java new file mode 100644 index 0000000000..7cb0973b77 --- /dev/null +++ b/dlink-client/dlink-client-1.14/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.dlink.cdc.sql; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6)); + String value = "2023-07-09T23:45:16"; + Assert.assertEquals(value, value0.toString()); + Assert.assertEquals(value, value3.toString()); + Assert.assertEquals(value, value6.toString()); + } +} diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 5e729c8b29..0591a826e2 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -125,9 +125,9 @@ public Map map(String value) throws Exception { } protected SingleOutputStreamOperator shunt( - SingleOutputStreamOperator mapOperator, - Table table, - String schemaFieldName) { + SingleOutputStreamOperator mapOperator, + Table table, + String schemaFieldName) { final String tableName = table.getName(); final String schemaName = table.getSchema(); return mapOperator.filter(new FilterFunction() { @@ -142,18 +142,18 @@ public boolean filter(Map value) throws Exception { } protected DataStream shunt( - SingleOutputStreamOperator processOperator, - Table table, - OutputTag tag) { + SingleOutputStreamOperator processOperator, + Table table, + OutputTag tag) { return processOperator.getSideOutput(tag); } protected DataStream buildRowData( - SingleOutputStreamOperator filterOperator, - List columnNameList, - List columnTypeList, - String schemaTableName) { + SingleOutputStreamOperator filterOperator, + List columnNameList, + List columnTypeList, + String schemaTableName) { return filterOperator .flatMap(new FlatMapFunction() { @@ -212,17 +212,17 @@ public void flatMap(Map value, Collector out) throws Exception { } public abstract void addSink( - StreamExecutionEnvironment env, - DataStream rowDataDataStream, - Table table, - List columnNameList, - List columnTypeList); + StreamExecutionEnvironment env, + DataStream rowDataDataStream, + Table table, + List columnNameList, + List columnTypeList); public DataStreamSource build( - CDCBuilder cdcBuilder, - StreamExecutionEnvironment env, - CustomTableEnvironment customTableEnvironment, - DataStreamSource dataStreamSource) { + CDCBuilder cdcBuilder, + StreamExecutionEnvironment env, + CustomTableEnvironment customTableEnvironment, + DataStreamSource dataStreamSource) { final String timeZone = config.getSink().get("timezone"); config.getSink().remove("timezone"); @@ -244,8 +244,8 @@ public DataStreamSource build( buildColumn(columnNameList, columnTypeList, table.getColumns()); - DataStream rowDataDataStream = - buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName()); + DataStream rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, + table.getSchemaTableName()); addSink(env, rowDataDataStream, table, columnNameList, columnTypeList); } @@ -297,7 +297,7 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCALDATETIME: case TIMESTAMP: - return new TimestampType(); + return new TimestampType(column.getLength()); case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: diff --git a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java index e52b46cc9b..dbdc3af74a 100644 --- a/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java +++ b/dlink-client/dlink-client-1.15/src/main/java/com/dlink/cdc/sql/SQLSinkBuilder.java @@ -244,7 +244,7 @@ public DataStreamSource build( DataStreamSource dataStreamSource) { final String timeZone = config.getSink().get("timezone"); config.getSink().remove("timezone"); - if(config.getSink().containsKey("fenodes")){ + if (config.getSink().containsKey("fenodes")) { config.getSink().remove("url"); } if (Asserts.isNotNullString(timeZone)) { @@ -333,7 +333,15 @@ protected Object convertValue(Object value, LogicalType logicalType) { } else if (value instanceof String) { return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime(); } else { - return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + TimestampType logicalType1 = (TimestampType) logicalType; + // 转换为毫秒 + if (logicalType1.getPrecision() == 3) { + return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime(); + } else if (logicalType1.getPrecision() > 3) { + return Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3)) + .atZone(sinkTimeZone).toLocalDateTime(); + } + return Instant.ofEpochSecond(((long) value)).atZone(sinkTimeZone).toLocalDateTime(); } } else if (logicalType instanceof DecimalType) { return new BigDecimal((String) value); diff --git a/dlink-client/dlink-client-1.15/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java b/dlink-client/dlink-client-1.15/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java new file mode 100644 index 0000000000..7cb0973b77 --- /dev/null +++ b/dlink-client/dlink-client-1.15/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.dlink.cdc.sql; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6)); + String value = "2023-07-09T23:45:16"; + Assert.assertEquals(value, value0.toString()); + Assert.assertEquals(value, value3.toString()); + Assert.assertEquals(value, value6.toString()); + } +} diff --git a/dlink-client/dlink-client-1.16/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.16/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 200dadefbb..a62d433cb3 100644 --- a/dlink-client/dlink-client-1.16/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.16/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -298,7 +298,7 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCALDATETIME: case TIMESTAMP: - return new TimestampType(); + return new TimestampType(column.getLength()); case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: diff --git a/dlink-client/dlink-client-1.16/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java b/dlink-client/dlink-client-1.16/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java new file mode 100644 index 0000000000..7cb0973b77 --- /dev/null +++ b/dlink-client/dlink-client-1.16/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.dlink.cdc.sql; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6)); + String value = "2023-07-09T23:45:16"; + Assert.assertEquals(value, value0.toString()); + Assert.assertEquals(value, value3.toString()); + Assert.assertEquals(value, value6.toString()); + } +} diff --git a/dlink-client/dlink-client-1.17/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java b/dlink-client/dlink-client-1.17/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java index 55e418467e..027961cd35 100644 --- a/dlink-client/dlink-client-1.17/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java +++ b/dlink-client/dlink-client-1.17/src/main/java/com/dlink/cdc/AbstractSinkBuilder.java @@ -291,7 +291,7 @@ public LogicalType getLogicalType(Column column) { return new DateType(); case LOCALDATETIME: case TIMESTAMP: - return new TimestampType(); + return new TimestampType(column.getLength()); case BYTES: return new VarBinaryType(Integer.MAX_VALUE); default: diff --git a/dlink-client/dlink-client-1.17/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java b/dlink-client/dlink-client-1.17/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java new file mode 100644 index 0000000000..7cb0973b77 --- /dev/null +++ b/dlink-client/dlink-client-1.17/src/test/java/com/dlink/cdc/sql/SinkBuilderTest.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.dlink.cdc.sql; + +import org.apache.flink.table.types.logical.TimestampType; + +import org.junit.Assert; +import org.junit.Test; + +/** + * CDCSOURCETest + * + */ +public class SinkBuilderTest { + + @Test + public void convertValueTimestampTest() { + SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder(); + Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0)); + Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3)); + Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6)); + String value = "2023-07-09T23:45:16"; + Assert.assertEquals(value, value0.toString()); + Assert.assertEquals(value, value3.toString()); + Assert.assertEquals(value, value6.toString()); + } +}