Skip to content

Commit

Permalink
[Fix-2556] [cdc] Fix Mysql TimestampType convert to '1970-01-01' in C…
Browse files Browse the repository at this point in the history
…DCSOURCE
  • Loading branch information
aiwenmo committed Nov 20, 2023
1 parent 5897b4c commit 6cf93cf
Show file tree
Hide file tree
Showing 19 changed files with 587 additions and 215 deletions.
5 changes: 5 additions & 0 deletions dlink-client/dlink-client-1.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<artifactId>dlink-flink-1.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public Map map(String value) throws Exception {
}

protected SingleOutputStreamOperator<Map> shunt(
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
return mapOperator.filter(new FilterFunction<Map>() {
Expand All @@ -140,18 +140,18 @@ public boolean filter(Map value) throws Exception {
}

protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {

return processOperator.getSideOutput(tag);
}

protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {

Expand Down Expand Up @@ -210,18 +210,18 @@ public void flatMap(Map value, Collector<RowData> out) throws Exception {
}

public abstract void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);

@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {

final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
Expand All @@ -237,8 +237,8 @@ public DataStreamSource build(

buildColumn(columnNameList, columnTypeList, table.getColumns());

DataStream<RowData> rowDataDataStream =
buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList,
table.getSchemaTableName());

addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
}
Expand Down Expand Up @@ -290,7 +290,7 @@ public LogicalType getLogicalType(Column column) {
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
return new TimestampType(column.getLength());
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public DataStreamSource build(
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
if(config.getSink().containsKey("fenodes")){
if (config.getSink().containsKey("fenodes")) {
config.getSink().remove("url");
}
if (Asserts.isNotNullString(timeZone)) {
Expand Down Expand Up @@ -295,8 +295,18 @@ protected Object convertValue(Object value, LogicalType logicalType) {
} else if (logicalType instanceof TimestampType) {
if (value instanceof Integer) {
return Instant.ofEpochMilli(((Integer) value).longValue()).atZone(sinkTimeZone).toLocalDateTime();
} else if (value instanceof String) {
return Instant.parse((String) value).atZone(sinkTimeZone).toLocalDateTime();
} else {
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
TimestampType logicalType1 = (TimestampType) logicalType;
// 转换为毫秒
if (logicalType1.getPrecision() == 3) {
return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
} else if (logicalType1.getPrecision() > 3) {
return Instant.ofEpochMilli(((long) value) / (long) Math.pow(10, logicalType1.getPrecision() - 3))
.atZone(sinkTimeZone).toLocalDateTime();
}
return Instant.ofEpochSecond(((long) value)).atZone(sinkTimeZone).toLocalDateTime();
}
} else if (logicalType instanceof DecimalType) {
return new BigDecimal((String) value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.dlink.cdc.sql;

import org.apache.flink.table.types.logical.TimestampType;

import org.junit.Assert;
import org.junit.Test;

/**
* CDCSOURCETest
*
*/
public class SinkBuilderTest {

@Test
public void convertValueTimestampTest() {
SQLSinkBuilder sqlSinkBuilder = new SQLSinkBuilder();
Object value0 = sqlSinkBuilder.convertValue(1688946316L, new TimestampType(0));
Object value3 = sqlSinkBuilder.convertValue(1688946316000L, new TimestampType(3));
Object value6 = sqlSinkBuilder.convertValue(1688946316000000L, new TimestampType(6));
String value = "2023-07-09T23:45:16";
Assert.assertEquals(value, value0.toString());
Assert.assertEquals(value, value3.toString());
Assert.assertEquals(value, value6.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ protected Properties getProperties() {
Properties properties = new Properties();
Map<String, String> sink = config.getSink();
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties") && Asserts.isNotNullString(entry.getValue())) {
if (Asserts.isNotNullString(entry.getKey()) && entry.getKey().startsWith("properties")
&& Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(entry.getKey().replace("properties.", ""), entry.getValue());
}
}
Expand All @@ -112,6 +113,7 @@ protected Properties getProperties() {

protected SingleOutputStreamOperator<Map> deserialize(DataStreamSource<String> dataStreamSource) {
return dataStreamSource.map(new MapFunction<String, Map>() {

@Override
public Map map(String value) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -121,98 +123,105 @@ public Map map(String value) throws Exception {
}

protected SingleOutputStreamOperator<Map> shunt(
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
SingleOutputStreamOperator<Map> mapOperator,
Table table,
String schemaFieldName) {
final String tableName = table.getName();
final String schemaName = table.getSchema();
return mapOperator.filter(new FilterFunction<Map>() {

@Override
public boolean filter(Map value) throws Exception {
LinkedHashMap source = (LinkedHashMap) value.get("source");
return tableName.equals(source.get("table").toString())
&& schemaName.equals(source.get(schemaFieldName).toString());
&& schemaName.equals(source.get(schemaFieldName).toString());
}
});
}

protected DataStream<Map> shunt(
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {
SingleOutputStreamOperator<Map> processOperator,
Table table,
OutputTag<Map> tag) {

return processOperator.getSideOutput(tag);
}

protected DataStream<RowData> buildRowData(
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
SingleOutputStreamOperator<Map> filterOperator,
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator
.flatMap(new FlatMapFunction<Map, RowData>() {
@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
try {
switch (value.get("op").toString()) {
case "r":
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i, convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i, convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i, convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i, convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
default:
.flatMap(new FlatMapFunction<Map, RowData>() {

@Override
public void flatMap(Map value, Collector<RowData> out) throws Exception {
try {
switch (value.get("op").toString()) {
case "r":
case "c":
GenericRowData igenericRowData = new GenericRowData(columnNameList.size());
igenericRowData.setRowKind(RowKind.INSERT);
Map idata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
igenericRowData.setField(i,
convertValue(idata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(igenericRowData);
break;
case "d":
GenericRowData dgenericRowData = new GenericRowData(columnNameList.size());
dgenericRowData.setRowKind(RowKind.DELETE);
Map ddata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
dgenericRowData.setField(i,
convertValue(ddata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(dgenericRowData);
break;
case "u":
GenericRowData ubgenericRowData = new GenericRowData(columnNameList.size());
ubgenericRowData.setRowKind(RowKind.UPDATE_BEFORE);
Map ubdata = (Map) value.get("before");
for (int i = 0; i < columnNameList.size(); i++) {
ubgenericRowData.setField(i,
convertValue(ubdata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(ubgenericRowData);
GenericRowData uagenericRowData = new GenericRowData(columnNameList.size());
uagenericRowData.setRowKind(RowKind.UPDATE_AFTER);
Map uadata = (Map) value.get("after");
for (int i = 0; i < columnNameList.size(); i++) {
uagenericRowData.setField(i,
convertValue(uadata.get(columnNameList.get(i)), columnTypeList.get(i)));
}
out.collect(uagenericRowData);
break;
default:
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName,
JSONUtil.toJsonString(value), e);
throw e;
}
} catch (Exception e) {
logger.error("SchameTable: {} - Row: {} - Exception:", schemaTableName, JSONUtil.toJsonString(value), e);
throw e;
}
}
});
});
}

public abstract void addSink(
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);
StreamExecutionEnvironment env,
DataStream<RowData> rowDataDataStream,
Table table,
List<String> columnNameList,
List<LogicalType> columnTypeList);

@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {

final List<Schema> schemaList = config.getSchemaList();
final String schemaFieldName = config.getSchemaFieldName();
Expand All @@ -228,7 +237,8 @@ public DataStreamSource build(

buildColumn(columnNameList, columnTypeList, table.getColumns());

DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList, table.getSchemaTableName());
DataStream<RowData> rowDataDataStream = buildRowData(filterOperator, columnNameList, columnTypeList,
table.getSchemaTableName());

addSink(env, rowDataDataStream, table, columnNameList, columnTypeList);
}
Expand Down Expand Up @@ -280,7 +290,7 @@ public LogicalType getLogicalType(Column column) {
return new DateType();
case LOCALDATETIME:
case TIMESTAMP:
return new TimestampType();
return new TimestampType(column.getLength());
case BYTES:
return new VarBinaryType(Integer.MAX_VALUE);
default:
Expand All @@ -295,7 +305,8 @@ protected Object convertValue(Object value, LogicalType logicalType) {
if (logicalType instanceof VarCharType) {
return StringData.fromString((String) value);
} else if (logicalType instanceof DateType) {
return StringData.fromString(Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString());
return StringData.fromString(
Instant.ofEpochMilli((long) value).atZone(ZoneId.systemDefault()).toLocalDate().toString());
} else if (logicalType instanceof TimestampType) {
return TimestampData.fromTimestamp(Timestamp.from(Instant.ofEpochMilli((long) value)));
} else if (logicalType instanceof DecimalType) {
Expand Down
Loading

0 comments on commit 6cf93cf

Please sign in to comment.