Skip to content

Commit

Permalink
Record partition update rows for each column.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed May 22, 2024
1 parent 8517b8f commit ab4f348
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
Expand All @@ -35,6 +37,7 @@
import org.apache.doris.statistics.ColStatsMeta;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.TableStatsMeta;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -186,22 +189,39 @@ public ShowResultSet constructResultSet(List<Pair<Pair<String, String>, ColumnSt

public ShowResultSet constructPartitionResultSet(List<ResultRow> resultRows, TableIf tableIf) {
List<List<String>> result = Lists.newArrayList();
resultRows.forEach(r -> {
for (ResultRow r : resultRows) {
List<String> row = Lists.newArrayList();
row.add(r.get(0));
row.add(r.get(1));
row.add(r.get(2)); // TODO: Get index name.
row.add(r.get(3));
row.add(r.get(4));
row.add(r.get(5));
row.add(r.get(6));
row.add(r.get(7));
row.add(r.get(8));
row.add(r.get(9));
row.add("N/A");
row.add("Manual");
row.add(r.get(0)); // column_name
row.add(r.get(1)); // partition_name
long indexId = Long.parseLong(r.get(2));
String indexName = indexId == -1 ? tableIf.getName() : ((OlapTable) tableIf).getIndexNameById(indexId);
row.add(indexName); // index_name.
row.add(r.get(3)); // count
row.add(r.get(4)); // ndv
row.add(r.get(5)); // num_null
row.add(r.get(6)); // min
row.add(r.get(7)); // max
row.add(r.get(8)); // data_size
row.add(r.get(9)); // updated_time
String updateRows = "N/A";
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(tableIf.getId());
if (tableStats != null && tableIf instanceof OlapTable) {
OlapTable olapTable = (OlapTable) tableIf;
ColStatsMeta columnStatsMeta = tableStats.findColumnStatsMeta(indexName, r.get(0));
if (columnStatsMeta != null && columnStatsMeta.partitionUpdateRows != null) {
Partition partition = olapTable.getPartition(r.get(1));
if (partition != null) {
Long rows = columnStatsMeta.partitionUpdateRows.get(partition.getId());
if (rows != null) {
updateRows = rows.toString();
}
}
}
}
row.add(updateRows); // update_rows
row.add("Manual"); // trigger. Manual or System
result.add(row);
});
}
return new ShowResultSet(getMetaData(), result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;

Expand Down Expand Up @@ -188,6 +190,8 @@ public enum ScheduleType {

@SerializedName("updateRows")
public final long updateRows;

public final Map<Long, Long> partitionUpdateRows = new HashMap();
/**
*
* Used to store the newest partition version of tbl when creating this job.
Expand All @@ -209,7 +213,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForExternalTable, long tblUpdateTime, long rowCount, boolean userInject,
long updateRows, JobPriority priority) {
long updateRows, JobPriority priority, Map<Long, Long> partitionUpdateRows) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -248,6 +252,9 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
this.userInject = userInject;
this.updateRows = updateRows;
this.priority = priority;
if (partitionUpdateRows != null) {
this.partitionUpdateRows.putAll(partitionUpdateRows);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.logging.log4j.core.util.CronExpression;

import java.util.List;
import java.util.Map;
import java.util.Set;

public class AnalysisInfoBuilder {
Expand Down Expand Up @@ -65,6 +66,7 @@ public class AnalysisInfoBuilder {
private boolean userInject;
private long updateRows;
private JobPriority priority;
private Map<Long, Long> partitionUpdateRows;

public AnalysisInfoBuilder() {
}
Expand Down Expand Up @@ -105,6 +107,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
userInject = info.userInject;
updateRows = info.updateRows;
priority = info.priority;
partitionUpdateRows = info.partitionUpdateRows;
}

public AnalysisInfoBuilder setJobId(long jobId) {
Expand Down Expand Up @@ -282,13 +285,18 @@ public AnalysisInfoBuilder setPriority(JobPriority priority) {
return this;
}

public AnalysisInfoBuilder setPartitionUpdateRows(Map<Long, Long> partitionUpdateRows) {
this.partitionUpdateRows = partitionUpdateRows;
return this;
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, jobColumns, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForExternalTable, tblUpdateTime, rowCount, userInject, updateRows,
priority);
priority, partitionUpdateRows);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) {
TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get());
infoBuilder.setPriority(JobPriority.MANUAL);
infoBuilder.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows);
return infoBuilder.build();
}

Expand Down Expand Up @@ -514,6 +515,9 @@ public void updateTableStats(AnalysisInfo jobInfo) {
if (jobInfo.partitionNames != null) {
jobInfo.partitionNames.clear();
}
if (jobInfo.partitionUpdateRows != null) {
jobInfo.partitionUpdateRows.clear();
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import com.google.gson.annotations.SerializedName;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

public class ColStatsMeta {
Expand All @@ -49,14 +52,20 @@ public class ColStatsMeta {
@SerializedName("rowCount")
public long rowCount;

@SerializedName("pur")
public ConcurrentMap<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>();

public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, AnalysisType analysisType, JobType jobType,
long queriedTimes, long rowCount, long updatedRows) {
long queriedTimes, long rowCount, long updatedRows, Map<Long, Long> partitionUpdateRows) {
this.updatedTime = updatedTime;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.jobType = jobType;
this.queriedTimes.addAndGet(queriedTimes);
this.updatedRows = updatedRows;
this.rowCount = rowCount;
if (partitionUpdateRows != null) {
this.partitionUpdateRows.putAll(partitionUpdateRows);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ protected AnalysisInfo createAnalyzeJobForTbl(
.setRowCount(rowCount)
.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get())
.setPriority(priority)
.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -122,6 +123,9 @@ public static ColumnStatistic queryColumnStatisticsByName(

public static List<ResultRow> queryColumnStatisticsByPartitions(TableIf table, Set<String> columnNames,
List<String> partitionNames) {
if (!table.isPartitionedTable()) {
return new ArrayList<>();
}
long ctlId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
Map<String, String> params = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,22 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
for (Pair<String, String> colPair : analyzedJob.jobColumns) {
ColStatsMeta colStatsMeta = colToColStatsMeta.get(colPair);
if (colStatsMeta == null) {
colToColStatsMeta.put(colPair, new ColStatsMeta(updatedTime, analyzedJob.analysisMethod,
colToColStatsMeta.put(colPair, new ColStatsMeta(analyzedJob.createTime, analyzedJob.analysisMethod,
analyzedJob.analysisType, analyzedJob.jobType, 0, analyzedJob.rowCount,
analyzedJob.updateRows));
analyzedJob.updateRows, analyzedJob.partitionUpdateRows));
} else {
colStatsMeta.updatedTime = updatedTime;
colStatsMeta.updatedTime = analyzedJob.startTime;
colStatsMeta.analysisType = analyzedJob.analysisType;
colStatsMeta.analysisMethod = analyzedJob.analysisMethod;
colStatsMeta.jobType = analyzedJob.jobType;
colStatsMeta.updatedRows = analyzedJob.updateRows;
colStatsMeta.rowCount = analyzedJob.rowCount;
if (colStatsMeta.partitionUpdateRows == null) {
colStatsMeta.partitionUpdateRows = new ConcurrentHashMap<>();
} else {
colStatsMeta.partitionUpdateRows.clear();
}
colStatsMeta.partitionUpdateRows.putAll(analyzedJob.partitionUpdateRows);
}
}
jobType = analyzedJob.jobType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public TableStatsMeta findTableStatsStatus(long tblId) {
new MockUp<TableStatsMeta>() {
@Mock
public ColStatsMeta findColumnStatsMeta(String indexName, String colName) {
return new ColStatsMeta(0, null, null, null, 0, 0, 0);
return new ColStatsMeta(0, null, null, null, 0, 0, 0, null);
}
};

Expand Down Expand Up @@ -244,7 +244,7 @@ public long getRowCount() {
new MockUp<TableStatsMeta>() {
@Mock
public ColStatsMeta findColumnStatsMeta(String indexName, String colName) {
return new ColStatsMeta(0, null, null, null, 0, 100, 0);
return new ColStatsMeta(0, null, null, null, 0, 100, 0, null);
}
};
tableMeta.newPartitionLoaded.set(false);
Expand All @@ -254,7 +254,7 @@ public ColStatsMeta findColumnStatsMeta(String indexName, String colName) {
new MockUp<TableStatsMeta>() {
@Mock
public ColStatsMeta findColumnStatsMeta(String indexName, String colName) {
return new ColStatsMeta(0, null, null, null, 0, 0, 0);
return new ColStatsMeta(0, null, null, null, 0, 0, 0, null);
}
};
tableMeta.newPartitionLoaded.set(false);
Expand All @@ -270,7 +270,7 @@ public long getRowCount() {
new MockUp<TableStatsMeta>() {
@Mock
public ColStatsMeta findColumnStatsMeta(String indexName, String colName) {
return new ColStatsMeta(0, null, null, null, 0, 500, 0);
return new ColStatsMeta(0, null, null, null, 0, 500, 0, null);
}
};
tableMeta.newPartitionLoaded.set(false);
Expand All @@ -286,7 +286,7 @@ public long getRowCount() {
new MockUp<TableStatsMeta>() {
@Mock
public ColStatsMeta findColumnStatsMeta(String indexName, String colName) {
return new ColStatsMeta(0, null, null, null, 0, 100, 80);
return new ColStatsMeta(0, null, null, null, 0, 100, 80, null);
}
};
tableMeta.newPartitionLoaded.set(false);
Expand Down

0 comments on commit ab4f348

Please sign in to comment.