From f80df20b6ffe254f42ebc8e4575a1b8fe278155b Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Mon, 3 Jul 2023 15:14:17 +0800 Subject: [PATCH] [Fix](multi-catalog) Fix read error in mixed partition locations. (#21399) Issue Number: close #20948 Fix read error in mixed partition locations(for example, some partitions locations are on s3, other are on hdfs) by `getLocationType` of file split level instead of the table level. --- .../planner/external/FileQueryScanNode.java | 82 +++++++++---------- .../doris/planner/external/HiveScanNode.java | 6 +- .../planner/external/MaxComputeScanNode.java | 5 ++ .../doris/planner/external/TVFScanNode.java | 5 ++ .../external/iceberg/IcebergScanNode.java | 6 ++ .../external/paimon/PaimonScanNode.java | 6 +- .../hive/test_mixed_par_locations.out | 37 +++++++++ .../hive/test_mixed_par_locations.groovy | 63 ++++++++++++++ 8 files changed, 164 insertions(+), 46 deletions(-) create mode 100644 regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out create mode 100644 regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index ed004ff17015e2..39727a6f048e38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -230,51 +230,42 @@ public void createScanRangeLocations() throws UserException { if (inputSplits.isEmpty()) { return; } - FileSplit inputSplit = (FileSplit) inputSplits.get(0); - TFileType locationType = getLocationType(); - params.setFileType(locationType); TFileFormatType fileFormatType = getFileFormatType(); params.setFormatType(fileFormatType); - TFileCompressType fileCompressType = getFileCompressType(inputSplit); - params.setCompressType(fileCompressType); - boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; - if (isCsvOrJson) { - params.setFileAttributes(getFileAttributes()); - } - - // set hdfs params for hdfs file type. - Map locationProperties = getLocationProperties(); - if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) { - params.setProperties(locationProperties); - } - if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { - String fsName = getFsName(inputSplit); - THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); - tHdfsParams.setFsName(fsName); - params.setHdfsParams(tHdfsParams); - - if (locationType == TFileType.FILE_BROKER) { - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); - if (broker == null) { - throw new UserException("No alive broker."); - } - params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); - } - } - List pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { + TFileScanRangeParams scanRangeParams = new TFileScanRangeParams(params); FileSplit fileSplit = (FileSplit) split; + TFileType locationType = getLocationType(fileSplit.getPath().toString()); + scanRangeParams.setFileType(locationType); + TFileCompressType fileCompressType = getFileCompressType(fileSplit); + scanRangeParams.setCompressType(fileCompressType); + boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; + if (isCsvOrJson) { + scanRangeParams.setFileAttributes(getFileAttributes()); + } - TFileScanRangeParams scanRangeParams; - if (!isCsvOrJson) { - scanRangeParams = params; - } else { - // If fileFormatType is csv/json format, uncompressed files may be coexists with compressed files - // So we need set compressType separately - scanRangeParams = new TFileScanRangeParams(params); - scanRangeParams.setCompressType(getFileCompressType(fileSplit)); + // set hdfs params for hdfs file type. + Map locationProperties = getLocationProperties(); + if (fileFormatType == TFileFormatType.FORMAT_JNI) { + scanRangeParams.setProperties(locationProperties); + } else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { + String fsName = getFsName(fileSplit); + THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + scanRangeParams.setHdfsParams(tHdfsParams); + + if (locationType == TFileType.FILE_BROKER) { + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + if (broker == null) { + throw new UserException("No alive broker."); + } + scanRangeParams.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); + } + } else if (locationType == TFileType.FILE_S3) { + scanRangeParams.setProperties(locationProperties); } + TScanRangeLocations curLocations = newLocations(scanRangeParams); // If fileSplit has partition values, use the values collected from hive partitions. @@ -288,7 +279,8 @@ public void createScanRangeLocations() throws UserException { ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, + locationType); if (isACID) { HiveSplit hiveSplit = (HiveSplit) split; hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); @@ -354,7 +346,7 @@ private TScanRangeLocations newLocations(TFileScanRangeParams params) { } private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List columnsFromPath, - List columnsFromPathKeys) + List columnsFromPathKeys, TFileType locationType) throws UserException { TFileRangeDesc rangeDesc = new TFileRangeDesc(); rangeDesc.setStartOffset(fileSplit.getStart()); @@ -365,11 +357,11 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List col rangeDesc.setColumnsFromPath(columnsFromPath); rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); - if (getLocationType() == TFileType.FILE_HDFS) { + if (locationType == TFileType.FILE_HDFS) { rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (getLocationType() == TFileType.FILE_S3 - || getLocationType() == TFileType.FILE_BROKER - || getLocationType() == TFileType.FILE_NET) { + } else if (locationType == TFileType.FILE_S3 + || locationType == TFileType.FILE_BROKER + || locationType == TFileType.FILE_NET) { // need full path rangeDesc.setPath(fileSplit.getPath().toString()); } @@ -379,6 +371,8 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List col protected abstract TFileType getLocationType() throws UserException; + protected abstract TFileType getLocationType(String location) throws UserException; + protected abstract TFileFormatType getFileFormatType() throws UserException; protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 710fed10ddcec4..ec85f8eb251adb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -230,7 +230,11 @@ public TableIf getTargetTable() { @Override protected TFileType getLocationType() throws UserException { - String location = hmsTable.getRemoteTable().getSd().getLocation(); + return getLocationType(hmsTable.getRemoteTable().getSd().getLocation()); + } + + @Override + protected TFileType getLocationType(String location) throws UserException { return getTFileType(location).orElseThrow(() -> new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java index 102292e4c078c7..11e9bafd86f581 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java @@ -53,6 +53,11 @@ public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeNa @Override protected TFileType getLocationType() throws UserException { + return getLocationType(null); + } + + @Override + protected TFileType getLocationType(String location) throws UserException { return TFileType.FILE_NET; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java index 0dfb78abedd0bd..476e16b098b452 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java @@ -84,6 +84,11 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User @Override public TFileType getLocationType() throws DdlException, MetaNotFoundException { + return getLocationType(null); + } + + @Override + public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException { return tableValuedFunction.getTFileType(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index bc23cec0929627..2de2f8291c2db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -244,6 +244,12 @@ private List getDeleteFileFilters(FileScanTask spitTask public TFileType getLocationType() throws UserException { Table icebergTable = source.getIcebergTable(); String location = icebergTable.location(); + return getLocationType(location); + } + + @Override + public TFileType getLocationType(String location) throws UserException { + Table icebergTable = source.getIcebergTable(); return getTFileType(location).orElseThrow(() -> new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java index d8fcca48acf0e8..a31ba1340bc2df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java @@ -141,7 +141,11 @@ public List getSplits() throws UserException { @Override public TFileType getLocationType() throws DdlException, MetaNotFoundException { - String location = ((AbstractFileStoreTable) source.getPaimonTable()).location().toString(); + return getLocationType(((AbstractFileStoreTable) source.getPaimonTable()).location().toString()); + } + + @Override + public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException { if (location != null && !location.isEmpty()) { if (S3Util.isObjStorage(location)) { return TFileType.FILE_S3; diff --git a/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out b/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out new file mode 100644 index 00000000000000..e4344d897fd4b4 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !01 -- +1 Tom 48 shanghai male 20230101 +2 Jerry 35 guangzhou male 20230101 +3 Frank 25 hangzhou male 20230101 +4 Ada 22 beijing female 20230101 +5 Jason 46 shanghai male 20230102 +6 Andy 38 guangzhou male 20230102 +7 Sam 29 hangzhou male 20230102 +8 Chloea 18 beijing female 20230102 + +-- !02 -- +8 + +-- !03 -- +guangzhou 2 +hangzhou 2 +shanghai 2 + +-- !01 -- +1 Tom 48 shanghai male 20230101 +2 Jerry 35 guangzhou male 20230101 +3 Frank 25 hangzhou male 20230101 +4 Ada 22 beijing female 20230101 +5 Jason 46 shanghai male 20230102 +6 Andy 38 guangzhou male 20230102 +7 Sam 29 hangzhou male 20230102 +8 Chloea 18 beijing female 20230102 + +-- !02 -- +8 + +-- !03 -- +guangzhou 2 +hangzhou 2 +shanghai 2 + diff --git a/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy b/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy new file mode 100644 index 00000000000000..ec092f99e76d0d --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mixed_par_locations", "p2") { + + def formats = ["_parquet", "_orc"] + def q1 = """select * from test_mixed_par_locationsSUFFIX order by id;""" + def q2 = """select count(id) from test_mixed_par_locationsSUFFIX;""" + def q3 = """select city, count(*) from test_mixed_par_locations_parquet where sex = 'male' group by city order by city;""" + + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String extAk = context.config.otherConfigs.get("extAk"); + String extSk = context.config.otherConfigs.get("extSk"); + String ext3Endpoint = context.config.otherConfigs.get("ext3Endpoint"); + String extS3Region = context.config.otherConfigs.get("extS3Region"); + String catalog_name = "test_mixed_par_locations" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}', + 'AWS_ACCESS_KEY' = "${extAk}", + 'AWS_SECRET_KEY' = "${extSk}", + 'AWS_ENDPOINT' = "${ext3Endpoint}", + 'AWS_REGION' = "${extS3Region}" + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """use multi_catalog;""" + logger.info("use multi_catalog") + + for (String format in formats) { + logger.info("Process format " + format) + qt_01 q1.replace("SUFFIX", format) + qt_02 q2.replace("SUFFIX", format) + qt_03 q3.replace("SUFFIX", format) + } + sql """drop catalog if exists ${catalog_name}""" + } finally { + } + } +}