Skip to content

Commit

Permalink
[Fix](multi-catalog) Fix read error in mixed partition locations.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Jun 30, 2023
1 parent 53f90cb commit de2e81a
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -230,51 +232,42 @@ public void createScanRangeLocations() throws UserException {
if (inputSplits.isEmpty()) {
return;
}
FileSplit inputSplit = (FileSplit) inputSplits.get(0);
TFileType locationType = getLocationType();
params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
params.setFormatType(fileFormatType);
TFileCompressType fileCompressType = getFileCompressType(inputSplit);
params.setCompressType(fileCompressType);
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
if (isCsvOrJson) {
params.setFileAttributes(getFileAttributes());
}

// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) {
params.setProperties(locationProperties);
}
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
String fsName = getFsName(inputSplit);
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
params.setHdfsParams(tHdfsParams);

if (locationType == TFileType.FILE_BROKER) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
}
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
}

List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
TFileScanRangeParams scanRangeParams = new TFileScanRangeParams(params);
FileSplit fileSplit = (FileSplit) split;
TFileType locationType = getLocationType(fileSplit.getPath().toString());
scanRangeParams.setFileType(locationType);
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
scanRangeParams.setCompressType(fileCompressType);
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
if (isCsvOrJson) {
scanRangeParams.setFileAttributes(getFileAttributes());
}

TFileScanRangeParams scanRangeParams;
if (!isCsvOrJson) {
scanRangeParams = params;
} else {
// If fileFormatType is csv/json format, uncompressed files may be coexists with compressed files
// So we need set compressType separately
scanRangeParams = new TFileScanRangeParams(params);
scanRangeParams.setCompressType(getFileCompressType(fileSplit));
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (fileFormatType == TFileFormatType.FORMAT_JNI) {
scanRangeParams.setProperties(locationProperties);
} else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
String fsName = getFsName(fileSplit);
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
scanRangeParams.setHdfsParams(tHdfsParams);

if (locationType == TFileType.FILE_BROKER) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
}
scanRangeParams.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
} else if (locationType == TFileType.FILE_S3) {
scanRangeParams.setProperties(locationProperties);
}

TScanRangeLocations curLocations = newLocations(scanRangeParams);

// If fileSplit has partition values, use the values collected from hive partitions.
Expand All @@ -288,7 +281,8 @@ public void createScanRangeLocations() throws UserException {
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID)
: fileSplit.getPartitionValues();

TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
locationType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) split;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
Expand Down Expand Up @@ -354,7 +348,7 @@ private TScanRangeLocations newLocations(TFileScanRangeParams params) {
}

private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
List<String> columnsFromPathKeys)
List<String> columnsFromPathKeys, TFileType locationType)
throws UserException {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
Expand All @@ -365,11 +359,11 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> col
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);

if (getLocationType() == TFileType.FILE_HDFS) {
if (locationType == TFileType.FILE_HDFS) {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
} else if (getLocationType() == TFileType.FILE_S3
|| getLocationType() == TFileType.FILE_BROKER
|| getLocationType() == TFileType.FILE_NET) {
} else if (locationType == TFileType.FILE_S3
|| locationType == TFileType.FILE_BROKER
|| locationType == TFileType.FILE_NET) {
// need full path
rangeDesc.setPath(fileSplit.getPath().toString());
}
Expand All @@ -379,6 +373,8 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> col

protected abstract TFileType getLocationType() throws UserException;

protected abstract TFileType getLocationType(String location) throws UserException;

protected abstract TFileFormatType getFileFormatType() throws UserException;

protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -226,7 +224,11 @@ public TableIf getTargetTable() {

@Override
protected TFileType getLocationType() throws UserException {
String location = hmsTable.getRemoteTable().getSd().getLocation();
return getLocationType(hmsTable.getRemoteTable().getSd().getLocation());
}

@Override
protected TFileType getLocationType(String location) throws UserException {
return getTFileType(location).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeNa

@Override
protected TFileType getLocationType() throws UserException {
return getLocationType(null);
}

@Override
protected TFileType getLocationType(String location) throws UserException {
return TFileType.FILE_NET;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType(null);
}

@Override
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
return tableValuedFunction.getTFileType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask
public TFileType getLocationType() throws UserException {
Table icebergTable = source.getIcebergTable();
String location = icebergTable.location();
return getLocationType(location);
}

@Override
public TFileType getLocationType(String location) throws UserException {
Table icebergTable = source.getIcebergTable();
return getTFileType(location).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ public List<Split> getSplits() throws UserException {

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
String location = ((AbstractFileStoreTable) source.getPaimonTable()).location().toString();
return getLocationType(((AbstractFileStoreTable) source.getPaimonTable()).location().toString());
}

@Override
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
if (location != null && !location.isEmpty()) {
if (S3Util.isObjStorage(location)) {
return TFileType.FILE_S3;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !01 --
1 Tom 48 shanghai male 20230101
2 Jerry 35 guangzhou male 20230101
3 Frank 25 hangzhou male 20230101
4 Ada 22 beijing female 20230101
5 Jason 46 shanghai male 20230102
6 Andy 38 guangzhou male 20230102
7 Sam 29 hangzhou male 20230102
8 Chloea 18 beijing female 20230102

-- !02 --
8

-- !03 --
guangzhou 2
hangzhou 2
shanghai 2

-- !01 --
1 Tom 48 shanghai male 20230101
2 Jerry 35 guangzhou male 20230101
3 Frank 25 hangzhou male 20230101
4 Ada 22 beijing female 20230101
5 Jason 46 shanghai male 20230102
6 Andy 38 guangzhou male 20230102
7 Sam 29 hangzhou male 20230102
8 Chloea 18 beijing female 20230102

-- !02 --
8

-- !03 --
guangzhou 2
hangzhou 2
shanghai 2

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_mixed_par_locations", "p2") {

def formats = ["_parquet", "_orc"]
def q1 = """select * from test_mixed_par_locationsSUFFIX order by id;"""
def q2 = """select count(id) from test_mixed_par_locationsSUFFIX;"""
def q3 = """select city, count(*) from test_mixed_par_locations_parquet where sex = 'male' group by city order by city;"""

String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String extAk = context.config.otherConfigs.get("extAk");
String extSk = context.config.otherConfigs.get("extSk");
String ext3Endpoint = context.config.otherConfigs.get("ext3Endpoint");
String extS3Region = context.config.otherConfigs.get("extS3Region");
String catalog_name = "test_mixed_par_locations"

sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}',
'AWS_ACCESS_KEY' = "${extAk}",
'AWS_SECRET_KEY' = "${extSk}",
'AWS_ENDPOINT' = "${ext3Endpoint}",
'AWS_REGION' = "${extS3Region}"
);
"""
logger.info("catalog " + catalog_name + " created")
sql """switch ${catalog_name};"""
logger.info("switched to catalog " + catalog_name)
sql """use multi_catalog;"""
logger.info("use multi_catalog")

for (String format in formats) {
logger.info("Process format " + format)
qt_01 q1.replace("SUFFIX", format)
qt_02 q2.replace("SUFFIX", format)
qt_03 q3.replace("SUFFIX", format)
}
sql """drop catalog if exists ${catalog_name}"""
} finally {
}
}
}

0 comments on commit de2e81a

Please sign in to comment.