Skip to content

Commit

Permalink
adjust stats derive by delta row
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Aug 30, 2024
1 parent d580a0a commit 966c06b
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,19 @@ public FilterEstimation(boolean isOnBaseTable) {
/**
* This method will update the stats according to the selectivity.
*/
public Statistics estimate(Expression expression, Statistics statistics) {
// For a comparison predicate, only when it's left side is a slot and right side is a literal, we would
// consider is a valid predicate.
Statistics stats = expression.accept(this, new EstimationContext(statistics));
stats.enforceValid();
return stats;
public Statistics estimate(Expression expression, Statistics inputStats) {
Statistics outputStats = expression.accept(this, new EstimationContext(inputStats));
if (outputStats.getRowCount() == 0 && inputStats.getDeltaRowCount() > 0) {
StatisticsBuilder deltaStats = new StatisticsBuilder();
deltaStats.setDeltaRowCount(0);
deltaStats.setRowCount(inputStats.getDeltaRowCount());
for (Expression expr : inputStats.columnStatistics().keySet()) {
deltaStats.putColumnStatistics(expr, ColumnStatistic.UNKNOWN);
}
outputStats = expression.accept(this, new EstimationContext(deltaStats.build()));
}
outputStats.enforceValid();
return outputStats;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColStatsMeta;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Histogram;
Expand Down Expand Up @@ -306,48 +305,16 @@ public Statistics visitLogicalFilter(LogicalFilter<? extends Plan> filter, Void
/**
* returns the sum of deltaRowCount for all selected partitions or for the table.
*/
private long computeDeltaRowCount(OlapScan olapScan, SlotReference slot) {
private long computeDeltaRowCount(OlapScan olapScan) {
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
TableStatsMeta tableMeta = analysisManager.findTableStatsStatus(olapScan.getTable().getId());
long deltaRowCount = 0;
if (tableMeta != null) {
ColStatsMeta colMeta = tableMeta.findColumnStatsMeta(
olapScan.getTable().getIndexNameById(olapScan.getSelectedIndexId()), slot.getName());
if (colMeta != null && colMeta.partitionUpdateRows != null) {
// when fe upgraded from old version, colMeta object may be deserialized from json,
// and colMeta.partitionUpdateRows could be null
if (olapScan.getSelectedPartitionIds().isEmpty()) {
deltaRowCount = tableMeta.updatedRows.get() - colMeta.updatedRows;
} else {
// sum partition delta row
for (long partitionId : olapScan.getSelectedPartitionIds()) {
deltaRowCount += tableMeta.partitionUpdateRows.getOrDefault(partitionId, 0L)
- colMeta.partitionUpdateRows.getOrDefault(partitionId, 0L);
}
}
}
deltaRowCount = tableMeta.getBaseIndexDeltaRowCount(olapScan.getTable());
}
return deltaRowCount;
}

private void adjustColStats(OlapScan olapScan, SlotReference slot,
ColumnStatisticBuilder builder) {
if (builder.getAvgSizeByte() <= 0) {
builder.setAvgSizeByte(slot.getDataType().toCatalogDataType().getSlotSize());
}
long delta = computeDeltaRowCount(olapScan, slot);
if (delta > 0) {
builder.setCount(builder.getCount() + delta);
// clear min-max to avoid error estimation
// for example, after yesterday data loaded, user send query about yesterday immediately.
// since yesterday data are not analyzed, the max date is before yesterday, and hence optimizer
// estimates the filter result is zero
builder.setMinExpr(null).setMinValue(Double.NEGATIVE_INFINITY)
.setMaxExpr(null).setMaxValue(Double.POSITIVE_INFINITY);
}

}

private ColumnStatistic getColumnStatsFromTableCache(CatalogRelation catalogRelation, SlotReference slot) {
long idxId = -1;
if (catalogRelation instanceof OlapScan) {
Expand Down Expand Up @@ -462,6 +429,8 @@ private Statistics computeOlapScan(OlapScan olapScan) {
}

// build Stats for olapScan
double deltaRowCount = computeDeltaRowCount(olapScan);
builder.setDeltaRowCount(deltaRowCount);
// if slot is invisible, use UNKNOWN
List<SlotReference> visibleOutputSlots = new ArrayList<>();
for (Slot slot : ((Plan) olapScan).getOutput()) {
Expand All @@ -484,34 +453,34 @@ private Statistics computeOlapScan(OlapScan olapScan) {
ColumnStatistic cache = getColumnStatsFromPartitionCache(olapScan, slot, selectedPartitionNames);
ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder(cache);
colStatsBuilder.setCount(selectedPartitionsRowCount);
adjustColStats(olapScan, slot, colStatsBuilder);
colStatsBuilder.normalizeAvgSizeByte(slot);
builder.putColumnStatistics(slot, colStatsBuilder.build());
}
checkIfUnknownStatsUsedAsKey(builder);
builder.setRowCount(selectedPartitionsRowCount);
builder.setRowCount(selectedPartitionsRowCount + deltaRowCount);
} else {
// if partition row count is invalid (-1), fallback to table stats
for (SlotReference slot : visibleOutputSlots) {
ColumnStatistic cache = getColumnStatsFromTableCache((CatalogRelation) olapScan, slot);
ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder(cache);
colStatsBuilder.setCount(tableRowCount);
adjustColStats(olapScan, slot, colStatsBuilder);
colStatsBuilder.normalizeAvgSizeByte(slot);
builder.putColumnStatistics(slot, colStatsBuilder.build());
}
checkIfUnknownStatsUsedAsKey(builder);
builder.setRowCount(tableRowCount);
builder.setRowCount(tableRowCount + deltaRowCount);
}
} else {
// get table level stats
for (SlotReference slot : visibleOutputSlots) {
ColumnStatistic cache = getColumnStatsFromTableCache((CatalogRelation) olapScan, slot);
ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder(cache);
colStatsBuilder.setCount(tableRowCount);
adjustColStats(olapScan, slot, colStatsBuilder);
colStatsBuilder.normalizeAvgSizeByte(slot);
builder.putColumnStatistics(slot, colStatsBuilder.build());
}
checkIfUnknownStatsUsedAsKey(builder);
builder.setRowCount(tableRowCount);
builder.setRowCount(tableRowCount + deltaRowCount);
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public abstract class CharacterType extends PrimitiveType {

protected final int len;

// When defining SQL schemas, users often tend to set the length of string
// fields much longer than actually needed for storage.
public static final int DEFAULT_SLOT_SIZE = 20;

public CharacterType(int len) {
this.len = len;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.doris.statistics;

import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.types.coercion.CharacterType;

public class ColumnStatisticBuilder {
private double count;
Expand Down Expand Up @@ -170,4 +172,20 @@ public ColumnStatistic build() {
isUnknown, updatedTime);
return colStats;
}

public void normalizeAvgSizeByte(SlotReference slot) {
if (isUnknown) {
return;
}
if (avgSizeByte > 0) {
return;
}
avgSizeByte = slot.getDataType().toCatalogDataType().getSlotSize();
// When defining SQL schemas, users often tend to set the length of string \
// fields much longer than actually needed for storage.
if (slot.getDataType() instanceof CharacterType) {
avgSizeByte = Math.min(avgSizeByte,
CharacterType.DEFAULT_SLOT_SIZE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.types.coercion.CharacterType;

import java.text.DecimalFormat;
import java.util.HashMap;
Expand All @@ -43,15 +44,31 @@ public class Statistics {
// the byte size of one tuple
private double tupleSize;

public Statistics(double rowCount, Map<Expression, ColumnStatistic> expressionToColumnStats) {
this(rowCount, 1, expressionToColumnStats);
private double deltaRowCount = 0.0;

public Statistics(Statistics another) {
this.rowCount = another.rowCount;
this.widthInJoinCluster = another.widthInJoinCluster;
this.expressionToColumnStats = new HashMap<>(another.expressionToColumnStats);
this.tupleSize = another.tupleSize;
this.deltaRowCount = another.getDeltaRowCount();
}

public Statistics(double rowCount, int widthInJoinCluster,
Map<Expression, ColumnStatistic> expressionToColumnStats) {
Map<Expression, ColumnStatistic> expressionToColumnStats, double deltaRowCount) {
this.rowCount = rowCount;
this.widthInJoinCluster = widthInJoinCluster;
this.expressionToColumnStats = expressionToColumnStats;
this.deltaRowCount = deltaRowCount;
}

public Statistics(double rowCount, Map<Expression, ColumnStatistic> expressionToColumnStats) {
this(rowCount, 1, expressionToColumnStats, 0);
}

public Statistics(double rowCount, int widthInJoinCluster,
Map<Expression, ColumnStatistic> expressionToColumnStats) {
this(rowCount, widthInJoinCluster, expressionToColumnStats, 0);
}

public ColumnStatistic findColumnStatistics(Expression expression) {
Expand Down Expand Up @@ -133,7 +150,7 @@ public double computeTupleSize(List<Slot> slots) {
for (Slot slot : slots) {
ColumnStatistic s = expressionToColumnStats.get(slot);
if (s != null) {
tempSize += Math.max(1, Math.min(20, s.avgSizeByte));
tempSize += Math.max(1, Math.min(CharacterType.DEFAULT_SLOT_SIZE, s.avgSizeByte));
}
}
tupleSize = Math.max(1, tempSize);
Expand Down Expand Up @@ -186,7 +203,11 @@ public String toString() {
return "-Infinite";
}
DecimalFormat format = new DecimalFormat("#,###.##");
return format.format(rowCount);
String rows = format.format(rowCount);
if (deltaRowCount > 0) {
rows = rows + "(" + format.format(deltaRowCount) + ")";
}
return rows;
}

public String printColumnStats() {
Expand Down Expand Up @@ -263,4 +284,12 @@ public Statistics normalizeByRatio(double originRowCount) {
}
return builder.build();
}

public double getDeltaRowCount() {
return deltaRowCount;
}

public void setDeltaRowCount(double deltaRowCount) {
this.deltaRowCount = deltaRowCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ public class StatisticsBuilder {
private int widthInJoinCluster = 1;
private final Map<Expression, ColumnStatistic> expressionToColumnStats;

private double deltaRowCount = 0.0;

public StatisticsBuilder() {
expressionToColumnStats = new HashMap<>();
}

public StatisticsBuilder(Statistics statistics) {
this.rowCount = statistics.getRowCount();
this.widthInJoinCluster = statistics.getWidthInJoinCluster();
this.deltaRowCount = statistics.getDeltaRowCount();
expressionToColumnStats = new HashMap<>();
expressionToColumnStats.putAll(statistics.columnStatistics());
}
Expand All @@ -50,6 +53,11 @@ public StatisticsBuilder setWidthInJoinCluster(int widthInJoinCluster) {
return this;
}

public StatisticsBuilder setDeltaRowCount(double deltaRowCount) {
this.deltaRowCount = deltaRowCount;
return this;
}

public StatisticsBuilder putColumnStatistics(
Map<Expression, ColumnStatistic> expressionToColumnStats) {
this.expressionToColumnStats.putAll(expressionToColumnStats);
Expand All @@ -66,6 +74,6 @@ public Set<Map.Entry<Expression, ColumnStatistic>> getExpressionColumnStatsEntri
}

public Statistics build() {
return new Statistics(rowCount, widthInJoinCluster, expressionToColumnStats);
return new Statistics(rowCount, widthInJoinCluster, expressionToColumnStats, deltaRowCount);
}
}
55 changes: 55 additions & 0 deletions regression-test/suites/nereids_p0/delta_row/delta_row.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("delta_row") {
String database = context.config.getDbNameByFile(context.file)
sql """
drop database if exists ${database};
create database ${database};
use ${database};
CREATE TABLE IF NOT EXISTS t (
k int(11) null comment "",
v string replace null comment "",
) engine=olap
DISTRIBUTED BY HASH(k) BUCKETS 5 properties("replication_num" = "1");
insert into t values (1, "a"),(2, "b"),(3, 'c'),(4,'d');
analyze table t with sync;
"""
explain {
sql "physical plan select * from t where k > 6"
contains("stats=0,")
contains("stats=4 ")
// PhysicalResultSink[75] ( outputExprs=[k#0, v#1] )
// +--PhysicalFilter[72]@1 ( stats=0, predicates=(k#0 > 6) )
// +--PhysicalOlapScan[t]@0 ( stats=4 )
}

sql "set global enable_auto_analyze=false;"

sql "insert into t values (10, 'c');"
explain {
sql "physical plan select * from t where k > 6"
contains("stats=0.5,")
contains("stats=5(1)")
notContains("stats=0,")
notContains("stats=4 ")
// PhysicalResultSink[75] ( outputExprs=[k#0, v#1] )
// +--PhysicalFilter[72]@1 ( stats=0.5, predicates=(k#0 > 6) )
// +--PhysicalOlapScan[t]@0 ( stats=5(1) )
}
}

0 comments on commit 966c06b

Please sign in to comment.