From 2af98cb915c40ac39cb324db75575c38e2cbd8f1 Mon Sep 17 00:00:00 2001 From: aiwenmo <32723967+aiwenmo@users.noreply.github.com> Date: Tue, 25 Jul 2023 00:04:28 +0800 Subject: [PATCH] [Fix-2147] [metadata] Missing precision and scale when generating MySql DDL (#2148) Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com> --- .../main/java/com/dlink/utils/SplitUtil.java | 24 ++++ .../java/com/dlink/trans/ddl/CDCSource.java | 131 ++++++++++++------ .../trans/ddl/CreateCDCSourceOperation.java | 17 ++- .../dlink/metadata/driver/MySqlDriver.java | 14 +- 4 files changed, 135 insertions(+), 51 deletions(-) diff --git a/dlink-common/src/main/java/com/dlink/utils/SplitUtil.java b/dlink-common/src/main/java/com/dlink/utils/SplitUtil.java index 3353db055a..5e751e92e2 100644 --- a/dlink-common/src/main/java/com/dlink/utils/SplitUtil.java +++ b/dlink-common/src/main/java/com/dlink/utils/SplitUtil.java @@ -1,5 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package com.dlink.utils; +import com.dlink.assertion.Asserts; + import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -72,6 +93,9 @@ public static String getReValue(String value, Map splitConfig) { } public static boolean isEnabled(Map split) { + if (Asserts.isNullMap(split)) { + return false; + } return Boolean.parseBoolean(split.get("enable")); } diff --git a/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java b/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java index 1671c1b566..42231015e2 100644 --- a/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java +++ b/dlink-executor/src/main/java/com/dlink/trans/ddl/CDCSource.java @@ -19,13 +19,14 @@ package com.dlink.trans.ddl; -import com.dlink.assertion.Asserts; import com.dlink.parser.SingleSqlParserFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,6 +37,7 @@ * @since 2022/1/29 23:30 */ public class CDCSource { + private String connector; private String statement; private String name; @@ -56,13 +58,21 @@ public class CDCSource { private Map sink; private List> sinks; - public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, - Map split, Map debezium, Map source, Map sink, Map jdbc) { - this(connector, statement, name, hostname, port, username, password, checkpoint, parallelism, startupMode, split, debezium, source, sink, null, jdbc); + public CDCSource() { + } + + public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, + String password, Integer checkpoint, Integer parallelism, String startupMode, + Map split, Map debezium, Map source, + Map sink, Map jdbc) { + this(connector, statement, name, hostname, port, username, password, checkpoint, parallelism, startupMode, + split, debezium, source, sink, null, jdbc); } - public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, String password, Integer checkpoint, Integer parallelism, String startupMode, - Map split, Map debezium, Map source, Map sink, List> sinks, Map jdbc) { + public CDCSource(String connector, String statement, String name, String hostname, Integer port, String username, + String password, Integer checkpoint, Integer parallelism, String startupMode, + Map split, Map debezium, Map source, + Map sink, List> sinks, Map jdbc) { this.connector = connector; this.statement = statement; this.name = name; @@ -81,11 +91,63 @@ public CDCSource(String connector, String statement, String name, String hostnam this.sinks = sinks; } - public static CDCSource build(String statement) { + public static CDCSource build(String statement) throws Exception { + Optional.ofNullable(statement) + .orElseThrow(() -> new Exception("Statement can not be null. Please specify a statement.")); Map> map = SingleSqlParserFactory.generateParser(statement); + Optional.ofNullable(map) + .orElseThrow(() -> new Exception("Job configurations can not be null. Please specify configurations.")); + Optional.ofNullable(map.get("WITH")) + .orElseThrow(() -> new Exception("Job configurations can not be null. Please specify configurations.")); + Map config = getKeyValue(map.get("WITH")); + Optional.ofNullable(config) + .orElseThrow(() -> new Exception("Job configurations can not be null. Please specify configurations.")); + + CDCSource cdcSource = new CDCSource(); + + Optional.ofNullable(config.get("connector")) + .orElseThrow(() -> new Exception("Please specify connector in configurations.")); + cdcSource.setConnector(config.get("connector")); + cdcSource.setStatement(config.get("statement")); + + String name = Optional.ofNullable(map.get("CDCSOURCE")).orElseGet(() -> { + return Arrays.asList("dinky_cdcsource_task"); + }).toString(); + cdcSource.setName(name); + + Optional.ofNullable(config.get("hostname")) + .orElseThrow(() -> new Exception("Please specify hostname in configurations.")); + cdcSource.setHostname(config.get("hostname")); + + Optional.ofNullable(config.get("port")) + .orElseThrow(() -> new Exception("Please specify port in configurations.")); + cdcSource.setPort(Integer.valueOf(config.get("port"))); + + cdcSource.setUsername(config.get("username")); + cdcSource.setPassword(config.get("password")); + + Optional.ofNullable(config.get("checkpoint")).ifPresent(s -> { + cdcSource.setCheckpoint(Integer.valueOf(s)); + }); + Optional.ofNullable(config.get("parallelism")).ifPresent(s -> { + cdcSource.setParallelism(Integer.valueOf(s)); + }); + Optional.ofNullable(config.get("scan.startup.mode")).ifPresent(s -> { + cdcSource.setStartupMode(s); + }); + Optional.ofNullable(config.get("database-name")).ifPresent(s -> { + cdcSource.setDatabase(s); + }); + Optional.ofNullable(config.get("schema-name")).ifPresent(s -> { + cdcSource.setSchema(s); + }); + Optional.ofNullable(config.get("table-name")).ifPresent(s -> { + cdcSource.setTable(s); + }); + + // debezium params. (debezium.*) Map debezium = new HashMap<>(); - Map split = new HashMap<>(); for (Map.Entry entry : config.entrySet()) { if (entry.getKey().startsWith("debezium.")) { String key = entry.getKey(); @@ -95,6 +157,10 @@ public static CDCSource build(String statement) { } } } + cdcSource.setDebezium(debezium); + + // partition table params. (split.*) + Map split = new HashMap<>(); for (Map.Entry entry : config.entrySet()) { if (entry.getKey().startsWith("split.")) { String key = entry.getKey(); @@ -104,7 +170,12 @@ public static CDCSource build(String statement) { } } } - splitMapInit(split); + if (split.size() > 0) { + splitMapInit(split); + } + cdcSource.setSplit(split); + + // source custom params. (source.*) Map source = new HashMap<>(); for (Map.Entry entry : config.entrySet()) { if (entry.getKey().startsWith("source.")) { @@ -115,7 +186,9 @@ public static CDCSource build(String statement) { } } } - // jdbc参数(jdbc.properties.*) + cdcSource.setSource(source); + + // jdbc params. (jdbc.properties.*) Map jdbc = new HashMap<>(); for (Map.Entry entry : config.entrySet()) { if (entry.getKey().startsWith("jdbc.properties.")) { @@ -126,6 +199,9 @@ public static CDCSource build(String statement) { } } } + cdcSource.setJdbc(jdbc); + + // sink custom params. (sink.*) Map sink = new HashMap<>(); for (Map.Entry entry : config.entrySet()) { if (entry.getKey().startsWith("sink.")) { @@ -136,9 +212,8 @@ public static CDCSource build(String statement) { } } } - /** - * 支持多目标写入功能, 从0开始顺序写入配置. - */ + + // multiple sinks custom params. (sink[i].*) Map> sinks = new HashMap<>(); final Pattern p = Pattern.compile("sink\\[(?.*)\\]"); for (Map.Entry entry : config.entrySet()) { @@ -163,33 +238,9 @@ public static CDCSource build(String statement) { if (sink.isEmpty() && sinkList.size() > 0) { sink = sinkList.get(0); } - CDCSource cdcSource = new CDCSource( - config.get("connector"), - statement, - map.get("CDCSOURCE").toString(), - config.get("hostname"), - Integer.valueOf(config.get("port")), - config.get("username"), - config.get("password"), - Integer.valueOf(config.get("checkpoint")), - Integer.valueOf(config.get("parallelism")), - config.get("scan.startup.mode"), - split, - debezium, - source, - sink, - sinkList, - jdbc - ); - if (Asserts.isNotNullString(config.get("database-name"))) { - cdcSource.setDatabase(config.get("database-name")); - } - if (Asserts.isNotNullString(config.get("schema-name"))) { - cdcSource.setSchema(config.get("schema-name")); - } - if (Asserts.isNotNullString(config.get("table-name"))) { - cdcSource.setTable(config.get("table-name")); - } + cdcSource.setSink(sink); + cdcSource.setSinks(sinkList); + return cdcSource; } diff --git a/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java b/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java index 6b3e55e773..dbc6e4e0c1 100644 --- a/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java +++ b/dlink-executor/src/main/java/com/dlink/trans/ddl/CreateCDCSourceOperation.java @@ -77,7 +77,12 @@ public Operation create(String statement) { @Override public TableResult build(Executor executor) { logger.info("Start build CDCSOURCE Task..."); - CDCSource cdcSource = CDCSource.build(statement); + CDCSource cdcSource = null; + try { + cdcSource = CDCSource.build(statement); + } catch (Exception e) { + e.printStackTrace(); + } FlinkCDCConfig config = new FlinkCDCConfig(cdcSource.getConnector(), cdcSource.getHostname(), cdcSource.getPort(), cdcSource.getUsername(), cdcSource.getPassword(), cdcSource.getCheckpoint(), cdcSource.getParallelism(), cdcSource.getDatabase(), cdcSource.getSchema(), cdcSource.getTable(), @@ -203,11 +208,11 @@ Driver checkAndCreateSinkSchema(FlinkCDCConfig config, String schemaName) throws if (!Asserts.isEqualsIgnoreCase(autoCreate, "true") || Asserts.isNullString(schemaName)) { return null; } - String url=""; - if("doris".equals(sink.get("connector"))){ - url="jdbc:mysql://"+sink.get("fenodes").trim().split(",")[0].replace("8030","9030"); - }else{ - url = sink.get("url"); + String url = ""; + if ("doris".equals(sink.get("connector"))) { + url = "jdbc:mysql://" + sink.get("fenodes").trim().split(",")[0].replace("8030", "9030"); + } else { + url = sink.get("url"); } String schema = SqlUtil.replaceAllParam(sink.get("sink.db"), "schemaName", schemaName); diff --git a/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java b/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java index 2166ba9548..cc736f3b39 100644 --- a/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java +++ b/dlink-metadata/dlink-metadata-mysql/src/main/java/com/dlink/metadata/driver/MySqlDriver.java @@ -108,9 +108,12 @@ public String genTable(Table table) { } else { sb.append(column.getType()); // 处理浮点类型 - if (column.getPrecision() > 0 && column.getScale() > 0) { + if (Asserts.isNotNullString(column.getType()) + && (column.getType().toLowerCase().contains("numeric") + || column.getType().toLowerCase().contains("decimal") + || column.getType().toLowerCase().contains("double"))) { sb.append("(") - .append(column.getLength()) + .append(column.getPrecision()) .append(",").append(column.getScale()) .append(")"); } else if (null != column.getLength()) { // 处理字符串类型和数值型 @@ -123,10 +126,11 @@ public String genTable(Table table) { sb.append(" DEFAULT ").append("''"); } else { // 数据类型不为 datetime/datetime(x)/timestamp/timestamp(x) 类型,应该使用单引号! - if (!column.getType().toLowerCase().startsWith("datetime") || !column.getType().toLowerCase().startsWith("timestamp")) { - sb.append(" DEFAULT ").append('\'').append(column.getDefaultValue()).append('\''); - } else { + if (column.getType().toLowerCase().startsWith("datetime") + || column.getType().toLowerCase().startsWith("timestamp")) { sb.append(" DEFAULT ").append(column.getDefaultValue()); + } else { + sb.append(" DEFAULT ").append('\'').append(column.getDefaultValue()).append('\''); } } } else {