Skip to content

Commit

Permalink
[Feature](Routine Load)Support Partial Update (apache#22785)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs authored and morningman committed Aug 30, 2023
1 parent 4ac5c59 commit 771867c
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ FROM data_source
10. `timezone`
11. `num_as_string`
12. `fuzzy_parse`
13. `partial_columns`


4. `data_source`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ FROM data_source [data_source_properties]
11. `load_to_single_tablet`
Boolean type, True means that one task can only load data to one tablet in the corresponding partition at a time. The default value is false. This parameter can only be set when loading data into the OLAP table with random partition.
12. `partial_columns`
Boolean type, True means that use partial column update, the default value is false, this parameter is only allowed to be set when the table model is Unique and Merge on Write is used. Multi-table does not support this parameter.
- `FROM data_source [data_source_properties]`
The type of data source. Currently supports:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ FROM data_source
10. `timezone`
11. `num_as_string`
12. `fuzzy_parse`
13. `partial_columns`


4. `data_source`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ FROM data_source [data_source_properties]

布尔类型,为 true 表示支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,该参数只允许在对带有 random 分区的 olap 表导数的时候设置。

12. `partial_columns`
布尔类型,为 true 表示使用部分列更新,默认值为 false,该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。一流多表不支持此参数。

- `FROM data_source [data_source_properties]`

数据源的类型。当前支持:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.doris.analysis;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -59,6 +62,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
.add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)
.add(CreateRoutineLoadStmt.NUM_AS_STRING)
.add(CreateRoutineLoadStmt.FUZZY_PARSE)
.add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.build();
Expand All @@ -67,6 +71,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
private final Map<String, String> jobProperties;
private final Map<String, String> dataSourceMapProperties;

private boolean isPartialUpdate;

// save analyzed job properties.
// analyzed data source properties are saved in dataSourceProperties.
private Map<String, String> analyzedJobProperties = Maps.newHashMap();
Expand All @@ -76,6 +82,8 @@ public AlterRoutineLoadStmt(LabelName labelName, Map<String, String> jobProperti
this.labelName = labelName;
this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap();
this.dataSourceMapProperties = dataSourceProperties != null ? dataSourceProperties : Maps.newHashMap();
this.isPartialUpdate = this.jobProperties.getOrDefault(CreateRoutineLoadStmt.PARTIAL_COLUMNS, "false")
.equalsIgnoreCase("true");
}

public String getDbName() {
Expand Down Expand Up @@ -111,12 +119,29 @@ public void analyze(Analyzer analyzer) throws UserException {
checkJobProperties();
// check data source properties
checkDataSourceProperties();
checkPartialUpdate();

if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties)) {
throw new AnalysisException("No properties are specified");
}
}

private void checkPartialUpdate() throws UserException {
if (!isPartialUpdate) {
return;
}
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getLabel());
if (job.isMultiTable()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is not supported in multi-table load.");
}
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName());
Table table = db.getTableOrAnalysisException(job.getTableName());
if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW");
}
}

private void checkJobProperties() throws UserException {
Optional<String> optional = jobProperties.keySet().stream().filter(
entity -> !CONFIGURABLE_JOB_PROPERTIES_SET.contains(entity)).findFirst();
Expand Down Expand Up @@ -203,6 +228,10 @@ private void checkJobProperties() throws UserException {
boolean fuzzyParse = Boolean.parseBoolean(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE));
analyzedJobProperties.put(CreateRoutineLoadStmt.FUZZY_PARSE, String.valueOf(fuzzyParse));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
}
}

private void checkDataSourceProperties() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -106,6 +107,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String NUM_AS_STRING = "num_as_string";
public static final String FUZZY_PARSE = "fuzzy_parse";

public static final String PARTIAL_COLUMNS = "partial_columns";

private static final String NAME_TYPE = "ROUTINE LOAD NAME";
public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
Expand All @@ -131,6 +134,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(EXEC_MEM_LIMIT_PROPERTY)
.add(SEND_BATCH_PARALLELISM)
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
.build();

private final LabelName labelName;
Expand All @@ -157,8 +161,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
/**
* RoutineLoad support json data.
* Require Params:
* 1) dataFormat = "json"
* 2) jsonPaths = "$.XXX.xxx"
* 1) dataFormat = "json"
* 2) jsonPaths = "$.XXX.xxx"
*/
private String format = ""; //default is csv.
private String jsonPaths = "";
Expand All @@ -167,6 +171,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private boolean numAsString = false;
private boolean fuzzyParse = false;

/**
* support partial columns load(Only Unique Key Columns)
*/
@Getter
private boolean isPartialUpdate = false;

private String comment = "";

private LoadTask.MergeType mergeType;
Expand Down Expand Up @@ -196,6 +206,7 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNo
this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
.createDataSource(typeName, dataSourceProperties, this.isMultiTable);
this.mergeType = mergeType;
this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true");
if (comment != null) {
this.comment = comment;
}
Expand Down Expand Up @@ -323,6 +334,9 @@ public void checkDBTable(Analyzer analyzer) throws AnalysisException {
dbName = labelName.getDbName();
name = labelName.getLabelName();
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
if (isPartialUpdate && isMultiTable) {
throw new AnalysisException("Partial update is not supported in multi-table load.");
}
if (isMultiTable) {
return;
}
Expand All @@ -339,6 +353,9 @@ public void checkDBTable(Analyzer analyzer) throws AnalysisException {
&& !(table.getType() == Table.TableType.OLAP && ((OlapTable) table).hasDeleteSign())) {
throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete.");
}
if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW");
}
}

public void checkLoadProperties() throws UserException {
Expand Down Expand Up @@ -409,9 +426,9 @@ public void checkLoadProperties() throws UserException {
}
}
routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, importColumnsStmt,
precedingImportWhereStmt, importWhereStmt,
partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType,
importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName());
precedingImportWhereStmt, importWhereStmt,
partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType,
importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName());
}

private void checkJobProperties() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.google.gson.GsonBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -683,6 +684,9 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS));
}
}
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
this.id, jobProperties, dataSourceProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public boolean isFinalState() {
protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS;
protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;

protected boolean isPartialUpdate = false;

protected String sequenceCol;

/**
Expand Down Expand Up @@ -339,6 +341,10 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(this.execMemLimit));
jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, String.valueOf(this.sendBatchParallelism));
jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, String.valueOf(this.loadToSingleTablet));
jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, stmt.isPartialUpdate() ? "true" : "false");
if (stmt.isPartialUpdate()) {
this.isPartialUpdate = true;
}

if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) {
jobProperties.put(PROPS_FORMAT, "csv");
Expand Down Expand Up @@ -636,7 +642,7 @@ public List<String> getHiddenColumns() {

@Override
public boolean isPartialUpdate() {
return false;
return isPartialUpdate;
}

@Override
Expand Down Expand Up @@ -1484,6 +1490,9 @@ public String getShowCreateInfo() {
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false);
appendProperties(sb, PROPS_FORMAT, getFormat(), false);
if (isPartialUpdate) {
appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, isPartialUpdate, false);
}
appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false);
appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
Expand Down Expand Up @@ -1562,6 +1571,7 @@ private String jobPropertiesToJsonString() {
jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString());
jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : lineDelimiter.toString());
}
jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate));
jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum));
jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS));
jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows));
Expand Down Expand Up @@ -1712,6 +1722,9 @@ protected void readFields(DataInput in) throws IOException {
String key = Text.readString(in);
String value = Text.readString(in);
jobProperties.put(key, value);
if (key.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
isPartialUpdate = Boolean.parseBoolean(value);
}
}

size = in.readInt();
Expand Down

0 comments on commit 771867c

Please sign in to comment.