From 9b1c97ce33d022e7854e0273f7ab90e889dda514 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 10 Aug 2023 17:41:53 +0800 Subject: [PATCH] [Feature](Routine Load)Support Partial Update (#22785) (cherry picked from commit 221e860cb7c86d79bc99185f34ae2dd2f7a10be8) --- .../Load/ALTER-ROUTINE-LOAD.md | 1 + .../Load/CREATE-ROUTINE-LOAD.md | 3 ++ .../Load/ALTER-ROUTINE-LOAD.md | 1 + .../Load/CREATE-ROUTINE-LOAD.md | 3 ++ .../doris/analysis/AlterRoutineLoadStmt.java | 29 +++++++++++++++++++ .../doris/analysis/CreateRoutineLoadStmt.java | 27 +++++++++++++---- .../load/routineload/KafkaRoutineLoadJob.java | 4 +++ .../load/routineload/RoutineLoadJob.java | 15 +++++++++- 8 files changed, 77 insertions(+), 6 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md index 3a9d4d79cf199e..0e45d21245c344 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md @@ -69,6 +69,7 @@ FROM data_source 10. `timezone` 11. `num_as_string` 12. `fuzzy_parse` + 13. `partial_columns` 4. `data_source` diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md index a0ef3186c86f30..be08cb6b8c4d24 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md @@ -249,6 +249,9 @@ FROM data_source [data_source_properties] 11. `load_to_single_tablet` Boolean type, True means that one task can only load data to one tablet in the corresponding partition at a time. The default value is false. This parameter can only be set when loading data into the OLAP table with random partition. + 12. `partial_columns` + Boolean type, True means that use partial column update, the default value is false, this parameter is only allowed to be set when the table model is Unique and Merge on Write is used. Multi-table does not support this parameter. + - `FROM data_source [data_source_properties]` The type of data source. Currently supports: diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md index fbf1e57969d6d9..b52ce004d7bbed 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md @@ -69,6 +69,7 @@ FROM data_source 10. `timezone` 11. `num_as_string` 12. `fuzzy_parse` + 13. `partial_columns` 4. `data_source` diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md index cf192fa15375ea..f0c7dc068312cf 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md @@ -248,6 +248,9 @@ FROM data_source [data_source_properties] 布尔类型,为 true 表示支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,该参数只允许在对带有 random 分区的 olap 表导数的时候设置。 + 12. `partial_columns` + 布尔类型,为 true 表示使用部分列更新,默认值为 false,该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。一流多表不支持此参数。 + - `FROM data_source [data_source_properties]` 数据源的类型。当前支持: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 8c32ae3cb5e1cf..0e25725cceee75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -17,7 +17,10 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; @@ -59,6 +62,7 @@ public class AlterRoutineLoadStmt extends DdlStmt { .add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY) .add(CreateRoutineLoadStmt.NUM_AS_STRING) .add(CreateRoutineLoadStmt.FUZZY_PARSE) + .add(CreateRoutineLoadStmt.PARTIAL_COLUMNS) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) .build(); @@ -67,6 +71,8 @@ public class AlterRoutineLoadStmt extends DdlStmt { private final Map jobProperties; private final Map dataSourceMapProperties; + private boolean isPartialUpdate; + // save analyzed job properties. // analyzed data source properties are saved in dataSourceProperties. private Map analyzedJobProperties = Maps.newHashMap(); @@ -76,6 +82,8 @@ public AlterRoutineLoadStmt(LabelName labelName, Map jobProperti this.labelName = labelName; this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap(); this.dataSourceMapProperties = dataSourceProperties != null ? dataSourceProperties : Maps.newHashMap(); + this.isPartialUpdate = this.jobProperties.getOrDefault(CreateRoutineLoadStmt.PARTIAL_COLUMNS, "false") + .equalsIgnoreCase("true"); } public String getDbName() { @@ -111,12 +119,29 @@ public void analyze(Analyzer analyzer) throws UserException { checkJobProperties(); // check data source properties checkDataSourceProperties(); + checkPartialUpdate(); if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties)) { throw new AnalysisException("No properties are specified"); } } + private void checkPartialUpdate() throws UserException { + if (!isPartialUpdate) { + return; + } + RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() + .getJob(getDbName(), getLabel()); + if (job.isMultiTable()) { + throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load."); + } + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName()); + Table table = db.getTableOrAnalysisException(job.getTableName()); + if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); + } + } + private void checkJobProperties() throws UserException { Optional optional = jobProperties.keySet().stream().filter( entity -> !CONFIGURABLE_JOB_PROPERTIES_SET.contains(entity)).findFirst(); @@ -203,6 +228,10 @@ private void checkJobProperties() throws UserException { boolean fuzzyParse = Boolean.parseBoolean(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE)); analyzedJobProperties.put(CreateRoutineLoadStmt.FUZZY_PARSE, String.valueOf(fuzzyParse)); } + if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { + analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, + String.valueOf(isPartialUpdate)); + } } private void checkDataSourceProperties() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index b0e559de154c06..eb65c3b1e25a7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -38,6 +38,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -106,6 +107,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String NUM_AS_STRING = "num_as_string"; public static final String FUZZY_PARSE = "fuzzy_parse"; + public static final String PARTIAL_COLUMNS = "partial_columns"; + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; @@ -131,6 +134,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(EXEC_MEM_LIMIT_PROPERTY) .add(SEND_BATCH_PARALLELISM) .add(LOAD_TO_SINGLE_TABLET) + .add(PARTIAL_COLUMNS) .build(); private final LabelName labelName; @@ -157,8 +161,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { /** * RoutineLoad support json data. * Require Params: - * 1) dataFormat = "json" - * 2) jsonPaths = "$.XXX.xxx" + * 1) dataFormat = "json" + * 2) jsonPaths = "$.XXX.xxx" */ private String format = ""; //default is csv. private String jsonPaths = ""; @@ -167,6 +171,12 @@ public class CreateRoutineLoadStmt extends DdlStmt { private boolean numAsString = false; private boolean fuzzyParse = false; + /** + * support partial columns load(Only Unique Key Columns) + */ + @Getter + private boolean isPartialUpdate = false; + private String comment = ""; private LoadTask.MergeType mergeType; @@ -196,6 +206,7 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List jobProperties, Map copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); + if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { + this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS)); + } } LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", this.id, jobProperties, dataSourceProperties); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index b0ae15b3f787ed..da4a2ce7f9201e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -196,6 +196,8 @@ public boolean isFinalState() { protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS; protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; + protected boolean isPartialUpdate = false; + protected String sequenceCol; /** @@ -335,6 +337,10 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(this.execMemLimit)); jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, String.valueOf(this.sendBatchParallelism)); jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, String.valueOf(this.loadToSingleTablet)); + jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, stmt.isPartialUpdate() ? "true" : "false"); + if (stmt.isPartialUpdate()) { + this.isPartialUpdate = true; + } if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) { jobProperties.put(PROPS_FORMAT, "csv"); @@ -624,7 +630,7 @@ public List getHiddenColumns() { @Override public boolean isPartialUpdate() { - return false; + return isPartialUpdate; } @Override @@ -1472,6 +1478,9 @@ public String getShowCreateInfo() { appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false); appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false); appendProperties(sb, PROPS_FORMAT, getFormat(), false); + if (isPartialUpdate) { + appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, isPartialUpdate, false); + } appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false); appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false); appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false); @@ -1550,6 +1559,7 @@ private String jobPropertiesToJsonString() { jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString()); jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : lineDelimiter.toString()); } + jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum)); jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS)); jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows)); @@ -1700,6 +1710,9 @@ protected void readFields(DataInput in) throws IOException { String key = Text.readString(in); String value = Text.readString(in); jobProperties.put(key, value); + if (key.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { + isPartialUpdate = Boolean.parseBoolean(value); + } } size = in.readInt();