Skip to content

Commit

Permalink
[Fix-2147] [metadata] Missing precision and scale when generating MyS…
Browse files Browse the repository at this point in the history
…ql DDL (DataLinkDC#2148)

Co-authored-by: wenmo <[email protected]>
  • Loading branch information
aiwenmo and aiwenmo authored Jul 24, 2023
1 parent 68c8c5a commit 2af98cb
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 51 deletions.
24 changes: 24 additions & 0 deletions dlink-common/src/main/java/com/dlink/utils/SplitUtil.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.dlink.utils;

import com.dlink.assertion.Asserts;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -72,6 +93,9 @@ public static String getReValue(String value, Map<String, String> splitConfig) {
}

public static boolean isEnabled(Map<String, String> split) {
if (Asserts.isNullMap(split)) {
return false;
}
return Boolean.parseBoolean(split.get("enable"));
}

Expand Down
131 changes: 91 additions & 40 deletions dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

package com.dlink.trans.ddl;

import com.dlink.assertion.Asserts;
import com.dlink.parser.SingleSqlParserFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -36,6 +37,7 @@
* @since 2022/1/29 23:30
*/
public class CDCSource {

private String connector;
private String statement;
private String name;
Expand All @@ -56,13 +58,21 @@ public class CDCSource {
private Map<String, String> sink;
private List<Map<String, String>> sinks;

public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> split, Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, Map<String, String> jdbc) {
this(connector, statement, name, hostname, port, username, password, checkpoint, parallelism, startupMode, split, debezium, source, sink, null, jdbc);
public CDCSource() {
}

public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username,
String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> split, Map<String, String> debezium, Map<String, String> source,
Map<String, String> sink, Map<String, String> jdbc) {
this(connector, statement, name, hostname, port, username, password, checkpoint, parallelism, startupMode,
split, debezium, source, sink, null, jdbc);
}

public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> split, Map<String, String> debezium, Map<String, String> source, Map<String, String> sink, List<Map<String, String>> sinks, Map<String, String> jdbc) {
public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username,
String password, Integer checkpoint, Integer parallelism, String startupMode,
Map<String, String> split, Map<String, String> debezium, Map<String, String> source,
Map<String, String> sink, List<Map<String, String>> sinks, Map<String, String> jdbc) {
this.connector = connector;
this.statement = statement;
this.name = name;
Expand All @@ -81,11 +91,63 @@ public CDCSource(String connector, String statement, String name, String hostnam
this.sinks = sinks;
}

public static CDCSource build(String statement) {
public static CDCSource build(String statement) throws Exception {
Optional.ofNullable(statement)
.orElseThrow(() -> new Exception("Statement can not be null. Please specify a statement."));
Map<String, List<String>> map = SingleSqlParserFactory.generateParser(statement);
Optional.ofNullable(map)
.orElseThrow(() -> new Exception("Job configurations can not be null. Please specify configurations."));
Optional.ofNullable(map.get("WITH"))
.orElseThrow(() -> new Exception("Job configurations can not be null. Please specify configurations."));

Map<String, String> config = getKeyValue(map.get("WITH"));
Optional.ofNullable(config)
.orElseThrow(() -> new Exception("Job configurations can not be null. Please specify configurations."));

CDCSource cdcSource = new CDCSource();

Optional.ofNullable(config.get("connector"))
.orElseThrow(() -> new Exception("Please specify connector in configurations."));
cdcSource.setConnector(config.get("connector"));
cdcSource.setStatement(config.get("statement"));

String name = Optional.ofNullable(map.get("CDCSOURCE")).orElseGet(() -> {
return Arrays.asList("dinky_cdcsource_task");
}).toString();
cdcSource.setName(name);

Optional.ofNullable(config.get("hostname"))
.orElseThrow(() -> new Exception("Please specify hostname in configurations."));
cdcSource.setHostname(config.get("hostname"));

Optional.ofNullable(config.get("port"))
.orElseThrow(() -> new Exception("Please specify port in configurations."));
cdcSource.setPort(Integer.valueOf(config.get("port")));

cdcSource.setUsername(config.get("username"));
cdcSource.setPassword(config.get("password"));

Optional.ofNullable(config.get("checkpoint")).ifPresent(s -> {
cdcSource.setCheckpoint(Integer.valueOf(s));
});
Optional.ofNullable(config.get("parallelism")).ifPresent(s -> {
cdcSource.setParallelism(Integer.valueOf(s));
});
Optional.ofNullable(config.get("scan.startup.mode")).ifPresent(s -> {
cdcSource.setStartupMode(s);
});
Optional.ofNullable(config.get("database-name")).ifPresent(s -> {
cdcSource.setDatabase(s);
});
Optional.ofNullable(config.get("schema-name")).ifPresent(s -> {
cdcSource.setSchema(s);
});
Optional.ofNullable(config.get("table-name")).ifPresent(s -> {
cdcSource.setTable(s);
});

// debezium params. (debezium.*)
Map<String, String> debezium = new HashMap<>();
Map<String, String> split = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("debezium.")) {
String key = entry.getKey();
Expand All @@ -95,6 +157,10 @@ public static CDCSource build(String statement) {
}
}
}
cdcSource.setDebezium(debezium);

// partition table params. (split.*)
Map<String, String> split = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("split.")) {
String key = entry.getKey();
Expand All @@ -104,7 +170,12 @@ public static CDCSource build(String statement) {
}
}
}
splitMapInit(split);
if (split.size() > 0) {
splitMapInit(split);
}
cdcSource.setSplit(split);

// source custom params. (source.*)
Map<String, String> source = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("source.")) {
Expand All @@ -115,7 +186,9 @@ public static CDCSource build(String statement) {
}
}
}
// jdbc参数(jdbc.properties.*)
cdcSource.setSource(source);

// jdbc params. (jdbc.properties.*)
Map<String, String> jdbc = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("jdbc.properties.")) {
Expand All @@ -126,6 +199,9 @@ public static CDCSource build(String statement) {
}
}
}
cdcSource.setJdbc(jdbc);

// sink custom params. (sink.*)
Map<String, String> sink = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().startsWith("sink.")) {
Expand All @@ -136,9 +212,8 @@ public static CDCSource build(String statement) {
}
}
}
/**
* 支持多目标写入功能, 从0开始顺序写入配置.
*/

// multiple sinks custom params. (sink[i].*)
Map<String, Map<String, String>> sinks = new HashMap<>();
final Pattern p = Pattern.compile("sink\\[(?<index>.*)\\]");
for (Map.Entry<String, String> entry : config.entrySet()) {
Expand All @@ -163,33 +238,9 @@ public static CDCSource build(String statement) {
if (sink.isEmpty() && sinkList.size() > 0) {
sink = sinkList.get(0);
}
CDCSource cdcSource = new CDCSource(
config.get("connector"),
statement,
map.get("CDCSOURCE").toString(),
config.get("hostname"),
Integer.valueOf(config.get("port")),
config.get("username"),
config.get("password"),
Integer.valueOf(config.get("checkpoint")),
Integer.valueOf(config.get("parallelism")),
config.get("scan.startup.mode"),
split,
debezium,
source,
sink,
sinkList,
jdbc
);
if (Asserts.isNotNullString(config.get("database-name"))) {
cdcSource.setDatabase(config.get("database-name"));
}
if (Asserts.isNotNullString(config.get("schema-name"))) {
cdcSource.setSchema(config.get("schema-name"));
}
if (Asserts.isNotNullString(config.get("table-name"))) {
cdcSource.setTable(config.get("table-name"));
}
cdcSource.setSink(sink);
cdcSource.setSinks(sinkList);

return cdcSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ public Operation create(String statement) {
@Override
public TableResult build(Executor executor) {
logger.info("Start build CDCSOURCE Task...");
CDCSource cdcSource = CDCSource.build(statement);
CDCSource cdcSource = null;
try {
cdcSource = CDCSource.build(statement);
} catch (Exception e) {
e.printStackTrace();
}
FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(),
cdcSource.getPort(), cdcSource.getUsername(), cdcSource.getPassword(), cdcSource.getCheckpoint(),
cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema(), cdcSource.getTable(),
Expand Down Expand Up @@ -203,11 +208,11 @@ Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws
if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) {
return null;
}
String url="";
if("doris".equals(sink.get("connector"))){
url="jdbc:mysql://"+sink.get("fenodes").trim().split(",")[0].replace("8030","9030");
}else{
url = sink.get("url");
String url = "";
if ("doris".equals(sink.get("connector"))) {
url = "jdbc:mysql://" + sink.get("fenodes").trim().split(",")[0].replace("8030", "9030");
} else {
url = sink.get("url");
}

String schema = SqlUtil.replaceAllParam(sink.get("sink.db"), "schemaName", schemaName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ public String genTable(Table table) {
} else {
sb.append(column.getType());
// 处理浮点类型
if (column.getPrecision() > 0 && column.getScale() > 0) {
if (Asserts.isNotNullString(column.getType())
&& (column.getType().toLowerCase().contains("numeric")
|| column.getType().toLowerCase().contains("decimal")
|| column.getType().toLowerCase().contains("double"))) {
sb.append("(")
.append(column.getLength())
.append(column.getPrecision())
.append(",").append(column.getScale())
.append(")");
} else if (null != column.getLength()) { // 处理字符串类型和数值型
Expand All @@ -123,10 +126,11 @@ public String genTable(Table table) {
sb.append(" DEFAULT ").append("''");
} else {
// 数据类型不为 datetime/datetime(x)/timestamp/timestamp(x) 类型,应该使用单引号!
if (!column.getType().toLowerCase().startsWith("datetime") || !column.getType().toLowerCase().startsWith("timestamp")) {
sb.append(" DEFAULT ").append('\'').append(column.getDefaultValue()).append('\'');
} else {
if (column.getType().toLowerCase().startsWith("datetime")
|| column.getType().toLowerCase().startsWith("timestamp")) {
sb.append(" DEFAULT ").append(column.getDefaultValue());
} else {
sb.append(" DEFAULT ").append('\'').append(column.getDefaultValue()).append('\'');
}
}
} else {
Expand Down

0 comments on commit 2af98cb

Please sign in to comment.