Skip to content

Commit

Permalink
[fix](catalog) fix the that failed to check if input format is splita…
Browse files Browse the repository at this point in the history
…ble (#35029)

Introduced from #33242
When we check supported inputformat in a Set<String>, we should use string, not object
  • Loading branch information
morningman authored May 18, 2024
1 parent 8eeee84 commit 7200bae
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,21 @@ private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends
StorageDescriptor sd = partition.getSd();
ret.put(new PartitionCacheKey(dbName, tblName, partition.getValues()),
new HivePartition(dbName, tblName, false,
sd.getInputFormat(), sd.getLocation(), partition.getValues()));
sd.getInputFormat(), sd.getLocation(), partition.getValues()));
}
return ret;
}

// Get File Status by using FileSystem API.
private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
JobConf jobConf,
List<String> partitionValues,
String bindBrokerName) throws UserException {
private FileCacheValue getFileCache(String location, String inputFormat,
JobConf jobConf,
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
Expand Down Expand Up @@ -421,18 +421,18 @@ private FileCacheValue loadFiles(FileCacheKey key) {
FileInputFormat.setInputPaths(jobConf, finalLocation.get());
try {
FileCacheValue result;
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
if (key.useSelfSplitter) {
result = getFileCache(finalLocation.get(), inputFormat, jobConf,
key.getPartitionValues(), key.bindBrokerName);
result = getFileCache(finalLocation.get(), key.inputFormat, jobConf,
key.getPartitionValues(), key.bindBrokerName);
} else {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
splits = ugi.doAs(
(PrivilegedExceptionAction<InputSplit[]>) () -> inputFormat.getSplits(jobConf, 0));
(PrivilegedExceptionAction<InputSplit[]>) () -> inputFormat.getSplits(jobConf, 0));
} else {
splits = inputFormat.getSplits(jobConf, 0 /* use hdfs block size as default */);
}
Expand Down Expand Up @@ -715,7 +715,7 @@ public void addPartitionsCache(String dbName, String tblName, List<String> parti
}

public void dropPartitionsCache(String dbName, String tblName, List<String> partitionNames,
boolean invalidPartitionCache) {
boolean invalidPartitionCache) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, tblName, null);
HivePartitionValues partitionValues = partitionValuesCache.getIfPresent(key);
if (partitionValues == null) {
Expand Down Expand Up @@ -839,17 +839,17 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
}

Expand All @@ -859,10 +859,10 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
}
fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
Expand Down Expand Up @@ -976,8 +976,8 @@ public FileCacheKey(String location, String inputFormat, List<String> partitionV
}

public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location,
String inputFormat, boolean useSelfSplitter,
String bindBrokerName) {
String inputFormat, boolean useSelfSplitter,
String bindBrokerName) {
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName);
fileCacheKey.dummyKey = dbName + "." + tblName;
fileCacheKey.useSelfSplitter = useSelfSplitter;
Expand All @@ -996,7 +996,7 @@ public boolean equals(Object obj) {
return dummyKey.equals(((FileCacheKey) obj).dummyKey);
}
return location.equals(((FileCacheKey) obj).location)
&& Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues);
&& Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues);
}

@Override
Expand Down Expand Up @@ -1115,10 +1115,10 @@ public static class HivePartitionValues {
private Map<Long, List<UniqueId>> idToUniqueIdsMap;
private Map<Long, PartitionItem> idToPartitionItem;
private Map<Long, List<String>> partitionValuesMap;
//multi pair
// multi pair
private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
private Map<Range<PartitionKey>, UniqueId> rangeToId;
//single pair
// single pair
private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap;
private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
}
}

public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat<?, ?> inputFormat,
String location, JobConf jobConf) throws UserException {
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inputFormat,
String location) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
return ((BrokerFileSystem) remoteFileSystem)
.isSplittable(location, inputFormat.getClass().getCanonicalName());
.isSplittable(location, inputFormat);
}

return HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat);
Expand Down

0 comments on commit 7200bae

Please sign in to comment.