Skip to content

Commit

Permalink
[fix](routine load) fix enclose and escape can not set in routine loa…
Browse files Browse the repository at this point in the history
…d job (#38402)
  • Loading branch information
sollhui committed Aug 10, 2024
1 parent bf11d48 commit a2c4976
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
.add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.add(LoadStmt.KEY_ENCLOSE)
.add(LoadStmt.KEY_ESCAPE)
.build();

private final LabelName labelName;
Expand Down Expand Up @@ -242,6 +244,12 @@ private void checkJobProperties() throws UserException {
analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
}
if (jobProperties.containsKey(LoadStmt.KEY_ENCLOSE)) {
analyzedJobProperties.put(LoadStmt.KEY_ENCLOSE, jobProperties.get(LoadStmt.KEY_ENCLOSE));
}
if (jobProperties.containsKey(LoadStmt.KEY_ESCAPE)) {
analyzedJobProperties.put(LoadStmt.KEY_ESCAPE, jobProperties.get(LoadStmt.KEY_ESCAPE));
}
}

private void checkDataSourceProperties() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(SEND_BATCH_PARALLELISM)
.add(LOAD_TO_SINGLE_TABLET)
.add(PARTIAL_COLUMNS)
.add(LoadStmt.KEY_ENCLOSE)
.add(LoadStmt.KEY_ESCAPE)
.build();

private final LabelName labelName;
Expand Down Expand Up @@ -174,7 +176,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private boolean stripOuterArray = false;
private boolean numAsString = false;
private boolean fuzzyParse = false;

private byte enclose;
private byte escape;
/**
* support partial columns load(Only Unique Key Columns)
*/
Expand Down Expand Up @@ -302,6 +305,14 @@ public String getJsonPaths() {
return jsonPaths;
}

public byte getEnclose() {
return enclose;
}

public byte getEscape() {
return escape;
}

public String getJsonRoot() {
return jsonRoot;
}
Expand Down Expand Up @@ -486,6 +497,23 @@ private void checkJobProperties() throws UserException {
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");

String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
if (encloseStr != null) {
if (encloseStr.length() != 1) {
throw new AnalysisException("enclose must be single-char");
} else {
enclose = encloseStr.getBytes()[0];
}
}
String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
if (escapeStr != null) {
if (escapeStr.length() != 1) {
throw new AnalysisException("enclose must be single-char");
} else {
escape = escapeStr.getBytes()[0];
}
}

if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.logging.log4j.Logger;

import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -862,7 +863,7 @@ private void analyzeMultiLoadColumns() throws AnalysisException {
return;
}
String columnsSQL = "COLUMNS (" + columnDef + ")";
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL)));
SqlParser parser = new SqlParser(new org.apache.doris.analysis.SqlScanner(new StringReader(columnsSQL)));
ImportColumnsStmt columnsStmt;
try {
columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser);
Expand Down Expand Up @@ -998,6 +999,22 @@ private void analyzeProperties() throws AnalysisException {
if (analysisMap.containsKey(LoadStmt.KEY_SKIP_LINES)) {
skipLines = Integer.parseInt(analysisMap.get(LoadStmt.KEY_SKIP_LINES));
}
if (analysisMap.containsKey(LoadStmt.KEY_ENCLOSE)) {
String encloseProp = analysisMap.get(LoadStmt.KEY_ENCLOSE);
if (encloseProp.length() == 1) {
enclose = encloseProp.getBytes(StandardCharsets.UTF_8)[0];
} else {
throw new AnalysisException("enclose must be single-char");
}
}
if (analysisMap.containsKey(LoadStmt.KEY_ESCAPE)) {
String escapeProp = analysisMap.get(LoadStmt.KEY_ESCAPE);
if (escapeProp.length() == 1) {
escape = escapeProp.getBytes(StandardCharsets.UTF_8)[0];
} else {
throw new AnalysisException("escape must be single-char");
}
}
}

private void checkLoadPriv(String fullDbName) throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public class LoadStmt extends DdlStmt {

public static final String KEY_COMMENT = "comment";

public static final String KEY_ENCLOSE = "enclose";

public static final String KEY_ESCAPE = "escape";

private final LabelName label;
private final List<DataDescription> dataDescriptions;
private final BrokerDesc brokerDesc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
this.isPartialUpdate = true;
}
jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio));

if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) {
jobProperties.put(PROPS_FORMAT, "csv");
} else if (stmt.getFormat().equals("json")) {
Expand Down Expand Up @@ -384,6 +383,14 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
} else {
jobProperties.put(PROPS_FUZZY_PARSE, "false");
}
if (String.valueOf(stmt.getEnclose()) != null) {
this.enclose = stmt.getEnclose();
jobProperties.put(LoadStmt.KEY_ENCLOSE, String.valueOf(stmt.getEnclose()));
}
if (String.valueOf(stmt.getEscape()) != null) {
this.escape = stmt.getEscape();
jobProperties.put(LoadStmt.KEY_ESCAPE, String.valueOf(stmt.getEscape()));
}
}

private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
Expand Down

0 comments on commit a2c4976

Please sign in to comment.