Skip to content

Commit

Permalink
[fix](show) show load warning support load v2 (#22759)
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 authored and xiaokang committed Aug 22, 2023
1 parent f540ce7 commit 4bd7ba6
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 1 deletion.
41 changes: 40 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.metadata.MTMVJob;
Expand Down Expand Up @@ -1291,14 +1292,18 @@ private void handleShowLoadWarnings() throws AnalysisException {
}

Database db = env.getInternalCatalog().getDbOrAnalysisException(showWarningsStmt.getDbName());
ShowResultSet showResultSet = handleShowLoadWarningV2(showWarningsStmt, db);
if (showResultSet != null) {
resultSet = showResultSet;
return;
}

long dbId = db.getId();
Load load = env.getLoadInstance();
long jobId = 0;
LoadJob job = null;
String label = null;
if (showWarningsStmt.isFindByLabel()) {
label = showWarningsStmt.getLabel();
jobId = load.getLatestJobIdByLabel(dbId, showWarningsStmt.getLabel());
job = load.getLoadJob(jobId);
if (job == null) {
Expand Down Expand Up @@ -1343,6 +1348,40 @@ private void handleShowLoadWarnings() throws AnalysisException {
resultSet = new ShowResultSet(showWarningsStmt.getMetaData(), rows);
}

private ShowResultSet handleShowLoadWarningV2(ShowLoadWarningsStmt showWarningsStmt, Database db)
throws AnalysisException {
LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
if (showWarningsStmt.isFindByLabel()) {
List<List<Comparable>> loadJobInfosByDb = loadManager.getLoadJobInfosByDb(db.getId(),
showWarningsStmt.getLabel(),
true, null);
if (CollectionUtils.isEmpty(loadJobInfosByDb)) {
return null;
}
List<List<String>> infoList = Lists.newArrayListWithCapacity(loadJobInfosByDb.size());
for (List<Comparable> comparables : loadJobInfosByDb) {
List<String> singleInfo = comparables.stream().map(Object::toString).collect(Collectors.toList());
infoList.add(singleInfo);
}
return new ShowResultSet(showWarningsStmt.getMetaData(), infoList);
}
org.apache.doris.load.loadv2.LoadJob loadJob = loadManager.getLoadJob(showWarningsStmt.getJobId());
if (loadJob == null) {
return null;
}
List<String> singleInfo;
try {
singleInfo = loadJob
.getShowInfo()
.stream()
.map(Objects::toString)
.collect(Collectors.toList());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
return new ShowResultSet(showWarningsStmt.getMetaData(), Lists.newArrayList(Collections.singleton(singleInfo)));
}

private void handleShowLoadWarningsFromURL(ShowLoadWarningsStmt showWarningsStmt, URL url)
throws AnalysisException {
String host = url.getHost();
Expand Down
78 changes: 78 additions & 0 deletions regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_etl_failed", "load_p0") {
def tableName = "test_etl_failed"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(20) NULL,
`k2` bigint(20) NULL,
`v1` tinyint(4) NULL,
`v2` string NULL,
`v3` date NOT NULL,
`v4` datetime NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
String label = "test_etl_failed"
String path = "s3://doris-build-1308700295/regression/load/data/etl_failure/etl-failure.csv"
String format = "CSV"
String ak = getS3AK()
String sk = getS3SK()
sql """
LOAD LABEL ${label} (
DATA INFILE("$path")
INTO TABLE ${tableName}
FORMAT AS ${format}
)
WITH S3 (
"AWS_ACCESS_KEY" = "$ak",
"AWS_SECRET_KEY" = "$sk",
"AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
"AWS_REGION" = "ap-beijing"
)
PROPERTIES(
"use_new_load_scan_node" = "true",
"max_filter_ratio" = "0.1"
);
"""

def max_try_milli_secs = 600000
while (max_try_milli_secs > 0) {
String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + label)
assertTrue(1 == 2, "etl should be failed")
break;
}
if (result[0][2].equals("CANCELLED")) {
break;
}
Thread.sleep(1000)
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
assertTrue(1 == 2, "load Timeout: $label")
}
}
String[][] result = sql """ show load warnings where label="$label" """
assertTrue(result[0].size() > 1, "warning show be not null")
}

0 comments on commit 4bd7ba6

Please sign in to comment.