Skip to content

Commit

Permalink
[fix](hudi) catch exception when getting hudi partition (#35027)
Browse files Browse the repository at this point in the history
Hudi use a thread pool to get files for each partition.
And use a countdown latch to wait all threads finish.
But if the thread throw exception, the countdown latch will not be counted down,
and thread will be blocked.
  • Loading branch information
morningman committed May 21, 2024
1 parent 98f8eb5 commit e308157
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private FileCacheValue getFileCache(String location, String inputFormat,
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName), properties, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
// /user/hive/warehouse/region_tmp_union_all2/000000_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private HiveUtil() {
}

public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inputFormat,
String location, JobConf jobConf) throws UserException {
String location) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
return ((BrokerFileSystem) remoteFileSystem).isSplittable(location, inputFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class HudiScanNode extends HiveScanNode {
Expand Down Expand Up @@ -329,49 +330,58 @@ private List<Split> getIncrementalSplits() {
private void getPartitionSplits(List<HivePartition> partitions, List<Split> splits) {
Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
AtomicReference<Throwable> throwable = new AtomicReference<>();
partitions.forEach(partition -> executor.execute(() -> {
String globPath;
String partitionName = "";
if (partition.isDummyPartition()) {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses;
try {
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
new Path(globPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
}
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));

if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toStorageLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
.forEach(fileSlice -> splits.add(
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
String globPath;
String partitionName = "";
if (partition.isDummyPartition()) {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses;
try {
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
new Path(globPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
}
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));

if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toStorageLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
.forEach(fileSlice -> splits.add(
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
}
} catch (Throwable t) {
throwable.set(t);
} finally {
countDownLatch.countDown();
}
countDownLatch.countDown();
}));
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
if (throwable.get() != null) {
throw new RuntimeException(throwable.get().getMessage(), throwable.get());
}
}

@Override
Expand Down

0 comments on commit e308157

Please sign in to comment.