Skip to content

Commit

Permalink
[improvement](mtmv)mtmv support partition by hms table (#29989)
Browse files Browse the repository at this point in the history
  • Loading branch information
zddr authored Jan 29, 2024
1 parent 6d6179a commit 26c6ac6
Show file tree
Hide file tree
Showing 26 changed files with 516 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,22 @@ update orc_full_acid_par set value = 'BB' where id = 2;
alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major';
alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major';

create table mtmv_base1 (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
STORED AS ORC;

insert into mtmv_base1 PARTITION(part_col=20230101) values
(1, 'A'),
(2, 'B'),
(3, 'C');

insert into mtmv_base1 PARTITION(part_col=20230102) values
(4, 'D'),
(5, 'E'),
(6, 'F');


CREATE TABLE `test_different_column_orders_orc`(
`name` string,
`id` int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.doris.catalog;

import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;

import com.google.common.collect.Lists;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class ListPartitionItem extends PartitionItem {
public static ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList());
Expand Down Expand Up @@ -69,6 +73,13 @@ public PartitionItem getIntersect(PartitionItem newItem) {
return null;
}

@Override
public PartitionKeyDesc toPartitionKeyDesc() {
List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionInfo::toPartitionValue)
.collect(Collectors.toList());
return PartitionKeyDesc.createIn(inValues);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(partitionKeys.size());
Expand Down
37 changes: 36 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
Expand Down Expand Up @@ -100,7 +101,7 @@
* Internal representation of tableFamilyGroup-related metadata. A OlaptableFamilyGroup contains several tableFamily.
* Note: when you add a new olap table property, you should modify TableProperty class
*/
public class OlapTable extends Table {
public class OlapTable extends Table implements MTMVRelatedTableIf {
private static final Logger LOG = LogManager.getLogger(OlapTable.class);

public enum OlapTableState {
Expand Down Expand Up @@ -772,6 +773,7 @@ public PartitionInfo getPartitionInfo() {
return partitionInfo;
}

@Override
public Set<String> getPartitionColumnNames() throws DdlException {
Set<String> partitionColumnNames = Sets.newHashSet();
if (partitionInfo instanceof SinglePartitionInfo) {
Expand Down Expand Up @@ -2535,4 +2537,37 @@ public List<Tablet> getAllTablets() throws AnalysisException {
}
return tablets;
}

@Override
public PartitionType getPartitionType() {
return partitionInfo.getType();
}

@Override
public Map<Long, PartitionItem> getPartitionItems() {
return getPartitionInfo().getIdToItem(false);
}

@Override
public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException {
return getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
}

@Override
public long getLastModifyTime() {
long result = 0L;
long visibleVersionTime;
for (Partition partition : getAllPartitions()) {
visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
if (visibleVersionTime > result) {
result = visibleVersionTime;
}
}
return result;
}

@Override
public List<Column> getPartitionColumns() {
return getPartitionInfo().getPartitionColumns();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public PartitionDesc toPartitionDesc(OlapTable olapTable) throws AnalysisExcepti
throw new RuntimeException("Should implement it in derived classes.");
}

static List<PartitionValue> toPartitionValue(PartitionKey partitionKey) {
public static List<PartitionValue> toPartitionValue(PartitionKey partitionKey) {
return partitionKey.getKeys().stream().map(expr -> {
if (expr == MaxLiteral.MAX_VALUE) {
return PartitionValue.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.catalog;

import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.io.Writable;

import java.util.Comparator;
Expand All @@ -34,4 +35,5 @@ public boolean isDefaultPartition() {
return false;
}

public abstract PartitionKeyDesc toPartitionKeyDesc();
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ public static PartitionKey createListPartitionKeyWithTypes(List<PartitionValue>
partitionKey.originHiveKeys.add(values.get(i).getStringValue());
}
partitionKey.types.add(types.get(i).getPrimitiveType());
//If there is one default value, set `isDefaultListPartitionKey` to true
if (values.get(i).isHiveDefaultPartition()) {
partitionKey.setDefaultListPartition(true);
}
}
if (values.isEmpty()) {
for (int i = 0; i < types.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.catalog;

import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.util.RangeUtils;

import com.google.common.collect.Range;
Expand Down Expand Up @@ -45,6 +46,13 @@ public boolean isDefaultPartition() {
return false;
}

@Override
public PartitionKeyDesc toPartitionKeyDesc() {
return PartitionKeyDesc.createFixed(
PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()),
PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
}

@Override
public void write(DataOutput out) throws IOException {
RangeUtils.writeRange(out, partitionKeyRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.HudiUtils;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
Expand Down Expand Up @@ -71,14 +76,15 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Hive metastore external table.
*/
public class HMSExternalTable extends ExternalTable {
public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf {
private static final Logger LOG = LogManager.getLogger(HMSExternalTable.class);

private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
Expand Down Expand Up @@ -257,6 +263,7 @@ public List<Type> getPartitionColumnTypes() {
return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList());
}

@Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
getFullSchema();
Expand Down Expand Up @@ -778,6 +785,58 @@ public Set<String> getDistributionColumnNames() {
return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
.collect(Collectors.toSet());
}

@Override
public PartitionType getPartitionType() {
return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames() {
return getPartitionColumns().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}

@Override
public Map<Long, PartitionItem> getPartitionItems() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());

return hivePartitionValues.getIdToPartitionItem().entrySet().stream()
.filter(entry -> !entry.getValue().isDefaultPartition())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException {
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(1);
partitionValuesList.add(
((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive());
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
List<HivePartition> resPartitions = cache.getAllPartitionsWithCache(getDbName(), getName(),
partitionValuesList);
if (resPartitions.size() != 1) {
throw new AnalysisException("partition not normal, size: " + resPartitions.size());
}
return resPartitions.get(0).getLastModifiedTimeIgnoreInit();
}

@Override
public long getLastModifyTime() throws AnalysisException {

long result = 0L;
long visibleVersionTime;
for (Entry<Long, PartitionItem> entry : getPartitionItems().entrySet()) {
visibleVersionTime = getPartitionLastModifyTime(entry.getKey(), entry.getValue());
if (visibleVersionTime > result) {
result = visibleVersionTime;
}
}
return result;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ public ListPartitionItem toListPartitionItem(String partitionName, List<Type> ty
}
try {
PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true);
return new ListPartitionItem(Lists.newArrayList(key));
ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key));
// if `PartitionKey` is default, set `PartitionItem` to default
listPartitionItem.setDefaultPartition(key.isHiveDefaultPartition());
return listPartitionItem;
} catch (AnalysisException e) {
throw new CacheException("failed to convert hive partition %s to list partition in catalog %s",
e, partitionName, catalog.getName());
Expand All @@ -315,7 +318,8 @@ private HivePartition loadPartition(PartitionCacheKey key) {
sd.getInputFormat(), sd.getLocation(), key, catalog.getName());
}
// TODO: more info?
return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values);
return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values,
partition.getParameters());
}

private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends PartitionCacheKey> keys) {
Expand Down Expand Up @@ -348,7 +352,7 @@ private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends
StorageDescriptor sd = partition.getSd();
ret.put(new PartitionCacheKey(dbName, tblName, partition.getValues()),
new HivePartition(dbName, tblName, false,
sd.getInputFormat(), sd.getLocation(), partition.getValues()));
sd.getInputFormat(), sd.getLocation(), partition.getValues(), partition.getParameters()));
}
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@
import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class HivePartition {
public static final String LAST_MODIFY_TIME_KEY = "transient_lastDdlTime";
public static final String FILE_NUM_KEY = "numFiles";

private String dbName;
private String tblName;
private String inputFormat;
private String path;
private List<String> partitionValues;
private boolean isDummyPartition;
private Map<String, String> parameters;

public HivePartition(String dbName, String tblName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues) {
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this.dbName = dbName;
this.tblName = tblName;
this.isDummyPartition = isDummyPartition;
Expand All @@ -44,6 +49,7 @@ public HivePartition(String dbName, String tblName, boolean isDummyPartition,
this.path = path;
// eg: cn, beijing
this.partitionValues = partitionValues;
this.parameters = parameters;
}

// return partition name like: nation=cn/city=beijing
Expand All @@ -63,6 +69,31 @@ public boolean isDummyPartition() {
return this.isDummyPartition;
}

public long getLastModifiedTime() {
if (parameters == null || !parameters.containsKey(LAST_MODIFY_TIME_KEY)) {
return 0L;
}
return Long.parseLong(parameters.get(LAST_MODIFY_TIME_KEY)) * 1000;
}

/**
* If there are no files, it proves that there is no data under the partition, we return 0
* @return
*/
public long getLastModifiedTimeIgnoreInit() {
if (getFileNum() == 0) {
return 0L;
}
return getLastModifiedTime();
}

public long getFileNum() {
if (parameters == null || !parameters.containsKey(FILE_NUM_KEY)) {
return 0L;
}
return Long.parseLong(parameters.get(FILE_NUM_KEY));
}

@Override
public String toString() {
return "HivePartition{"
Expand Down
Loading

0 comments on commit 26c6ac6

Please sign in to comment.