Skip to content

Commit

Permalink
[enhance](mtmv)Optimize the speed of obtaining the last update time o…
Browse files Browse the repository at this point in the history
…f Hive (#40169)

Previously, to obtain the last update time of a hive table, it was
necessary to obtain the last update time of all partitions under the
table, which required generating a large map.
  • Loading branch information
zddr authored Sep 3, 2024
1 parent ff24a4f commit 5ea6e4b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
Expand Down Expand Up @@ -741,64 +742,68 @@ public Map<String, PartitionItem> getAndCopyPartitionItems() {
return res;
}

private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
return cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
long partitionLastModifyTime = getPartitionLastModifyTime(partitionName);
return new MTMVTimestampSnapshot(partitionLastModifyTime);
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
Long partitionId = getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues);
HivePartition hivePartition = getHivePartitionByIdOrAnalysisException(partitionId,
hivePartitionValues, cache);
return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime());
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}
String partitionName = "";
Long maxPartitionId = 0L;
long maxVersionTime = 0L;
long visibleVersionTime;
for (Entry<String, PartitionItem> entry : getAndCopyPartitionItems().entrySet()) {
visibleVersionTime = getPartitionLastModifyTime(entry.getKey());
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
if (MapUtils.isEmpty(idToName)) {
throw new AnalysisException("partitions is empty for : " + getName());
}
for (Long partitionId : idToName.keySet()) {
visibleVersionTime = getHivePartitionByIdOrAnalysisException(partitionId, hivePartitionValues,
cache).getLastModifiedTime();
if (visibleVersionTime > maxVersionTime) {
maxVersionTime = visibleVersionTime;
partitionName = entry.getKey();
maxPartitionId = partitionId;
}
}
return new MTMVMaxTimestampSnapshot(partitionName, maxVersionTime);
return new MTMVMaxTimestampSnapshot(idToName.get(maxPartitionId), maxVersionTime);
}

private long getPartitionLastModifyTime(String partitionName) throws AnalysisException {
return getPartitionByName(partitionName).getLastModifiedTime();
}

private HivePartition getPartitionByName(String partitionName) throws AnalysisException {
PartitionItem item = getAndCopyPartitionItems().get(partitionName);
List<List<String>> partitionValuesList = transferPartitionItemToPartitionValues(item);
List<HivePartition> partitions = getPartitionsByPartitionValues(partitionValuesList);
if (partitions.size() != 1) {
throw new AnalysisException("partition not normal, size: " + partitions.size());
private Long getPartitionIdByNameOrAnalysisException(String partitionName,
HiveMetaStoreCache.HivePartitionValues hivePartitionValues)
throws AnalysisException {
Long partitionId = hivePartitionValues.getPartitionNameToIdMap().get(partitionName);
if (partitionId == null) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return partitions.get(0);
return partitionId;
}

private List<HivePartition> getPartitionsByPartitionValues(List<List<String>> partitionValuesList) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
return cache.getAllPartitionsWithCache(getDbName(), getName(),
partitionValuesList);
}

private List<List<String>> transferPartitionItemToPartitionValues(PartitionItem item) {
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(1);
partitionValuesList.add(
((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive());
return partitionValuesList;
private HivePartition getHivePartitionByIdOrAnalysisException(Long partitionId,
HiveMetaStoreCache.HivePartitionValues hivePartitionValues,
HiveMetaStoreCache cache) throws AnalysisException {
List<String> partitionValues = hivePartitionValues.getPartitionValuesMap().get(partitionId);
if (CollectionUtils.isEmpty(partitionValues)) {
throw new AnalysisException("can not find partitionValues: " + partitionId);
}
HivePartition partition = cache.getHivePartition(getDbName(), getName(), partitionValues);
if (partition == null) {
throw new AnalysisException("can not find partition: " + partitionId);
}
return partition;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,10 @@ public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
return fileLists;
}

public HivePartition getHivePartition(String dbName, String name, List<String> partitionValues) {
return partitionCache.get(new PartitionCacheKey(dbName, name, partitionValues));
}

public List<HivePartition> getAllPartitionsWithCache(String dbName, String name,
List<List<String>> partitionValuesList) {
return getAllPartitions(dbName, name, partitionValuesList, true);
Expand Down

0 comments on commit 5ea6e4b

Please sign in to comment.