Skip to content

Commit

Permalink
mvName
Browse files Browse the repository at this point in the history
  • Loading branch information
zddr committed Oct 8, 2024
1 parent 038d518 commit 318c891
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 13 deletions.
29 changes: 23 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVJobInfo;
import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.MTMVPartitionCol;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPartitionUtil;
Expand All @@ -55,6 +57,7 @@

import java.io.DataInput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -399,18 +402,24 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
* @return mvPartitionName ==> relationPartitionNames
* @throws AnalysisException
*/
public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisException {
public Map<String, Map<BaseTableInfo, Set<String>>> calculatePartitionMappings() throws AnalysisException {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return Maps.newHashMap();
}
long start = System.currentTimeMillis();
Map<String, Set<String>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, Map<BaseTableInfo, Set<String>>> res = Maps.newHashMap();
Map<BaseTableInfo, Map<PartitionKeyDesc, Set<String>>> refreshPartitionDescs = getRefreshPartitionDescs();
List<MTMVPartitionCol> partitionRefreshTables = mvPartitionInfo.getPartitionRefreshTables();

Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
PartitionKeyDesc mvPartitionKeyDesc = entry.getValue().toPartitionKeyDesc();
Map<BaseTableInfo, Set<String>> tablePartitions = Maps.newHashMap();
for (MTMVPartitionCol partitionCol : partitionRefreshTables) {
tablePartitions.put(partitionCol.getTable(), refreshPartitionDescs.get(partitionCol.getTable())
.getOrDefault(mvPartitionKeyDesc, Sets.newHashSet()));
}
res.put(entry.getKey(), tablePartitions);
}
if (LOG.isDebugEnabled()) {
LOG.debug("calculatePartitionMappings use [{}] mills, mvName is [{}]",
Expand All @@ -419,6 +428,14 @@ public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisExce
return res;
}

private Map<BaseTableInfo, Map<PartitionKeyDesc, Set<String>>> getRefreshPartitionDescs() throws AnalysisException {
// TODO: 2024/10/8 foreach
List<MTMVPartitionCol> partitionRefreshTables = mvPartitionInfo.getPartitionRefreshTables();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
return null;
}

public ConcurrentLinkedQueue<MTMVTask> getHistoryTasks() {
return jobInfo.getHistoryTasks();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,21 @@

public class MTMVPartitionCol {
@SerializedName("rt")
private BaseTableInfo relatedTable;
private BaseTableInfo table;

@SerializedName("pc")
private String partitionCol;
private String col;

public MTMVPartitionCol(BaseTableInfo table, String col) {
this.table = table;
this.col = col;
}

public BaseTableInfo getTable() {
return table;
}

public String getCol() {
return col;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.CatalogMgr;

import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;

import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -47,8 +49,8 @@ public enum MTMVPartitionType {
private String partitionCol;
@SerializedName("expr")
private Expr expr;
@SerializedName("oprt")
private List<MTMVPartitionCol> otherPartitionRefreshTables;
@SerializedName("prt")
private List<MTMVPartitionCol> partitionRefreshTablesNotIncludeRelatedTable;

public MTMVPartitionInfo() {
}
Expand Down Expand Up @@ -107,6 +109,17 @@ public void setExpr(Expr expr) {
this.expr = expr;
}

public List<MTMVPartitionCol> getPartitionRefreshTablesNotIncludeRelatedTable() {
return partitionRefreshTablesNotIncludeRelatedTable;
}

public List<MTMVPartitionCol> getPartitionRefreshTables() {
ArrayList<MTMVPartitionCol> partitionRefreshTables = Lists.newArrayList(
partitionRefreshTablesNotIncludeRelatedTable);
partitionRefreshTables.add(new MTMVPartitionCol(this.relatedTable, this.relatedCol));
return partitionRefreshTables;
}

/**
* Get the position of relatedCol in the relatedTable partition column
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static boolean isMTMVPartitionSync(MTMVRefreshContext refreshContext, Str
Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
MTMV mtmv = refreshContext.getMtmv();
Set<String> relatedPartitionNames = refreshContext.getPartitionMappings().get(partitionName);
Map<BaseTableInfo, Set<String>> relatedPartitionNames = refreshContext.getPartitionMappings().get(partitionName);
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

public class MTMVRefreshContext {
private MTMV mtmv;
private Map<String, Set<String>> partitionMappings;
// mvPartitionName ==> baseTableInfo : baseTablePartitionName
private Map<String, Map<BaseTableInfo, Set<String>>> partitionMappings;
private MTMVBaseVersions baseVersions;

public MTMVRefreshContext(MTMV mtmv) {
Expand All @@ -36,7 +37,7 @@ public MTMV getMtmv() {
return mtmv;
}

public Map<String, Set<String>> getPartitionMappings() {
public Map<String, Map<BaseTableInfo, Set<String>>> getPartitionMappings() {
return partitionMappings;
}

Expand Down

0 comments on commit 318c891

Please sign in to comment.