Skip to content

Commit

Permalink
[feat](nereids) support Iceberg time travel syntax (#34681)
Browse files Browse the repository at this point in the history
#15418
added Iceberg time travel in legacy parser but not added this syntax Neredis.
If we enable nereids and disable fallback to original palnner, time travel won't be available.

This PR added time travel syntas in Neredis.

BTW, we already have nereids time travel regression-test in

https://github.com/apache/doris/blob/master/regression-test/suites/external_table_p2/iceberg/test_external_catalog_icebergv2_nereids.groovy

this regression-test will always fail without this PR.

https://github.com/apache/doris/blob/88530bf9437e60190f198252ab82f53fa53d4c10/regression-test/suites/external_table_p2/iceberg/test_external_catalog_icebergv2_nereids.groovy#L70-L75
  • Loading branch information
zhangbutao authored and dataroaring committed May 26, 2024
1 parent 316f870 commit 480459e
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ optScanParams

relationPrimary
: multipartIdentifier optScanParams? materializedViewName? specifiedPartition?
tabletList? tableAlias sample? relationHint? lateralView* #tableName
tabletList? tableAlias sample? tableSnapshot? relationHint? lateralView* #tableName
| LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery
| tvfName=identifier LEFT_PAREN
(properties=propertyItemList)?
Expand Down Expand Up @@ -971,6 +971,11 @@ sampleMethod
| INTEGER_VALUE ROWS #sampleByRows
;

tableSnapshot
: FOR VERSION AS OF version=INTEGER_VALUE
| FOR TIME AS OF time=STRING_LITERAL
;

// this rule is used for explicitly capturing wrong identifiers such as test-table, which should actually be `test-table`
// replace identifier with errorCapturingIdentifier where the immediate follow symbol is not an expression, otherwise
// valid expressions such as "a-b" can be recognized as an identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
Expand Down Expand Up @@ -96,6 +97,9 @@ public abstract class FileQueryScanNode extends FileScanNode {

protected String brokerName;

@Getter
protected TableSnapshot tableSnapshot;

/**
* External file scan node for Query hms table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ private List<Split> doGetSplits() throws UserException {

public Long getSpecifiedSnapshot() throws UserException {
TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot();
if (tableSnapshot == null) {
tableSnapshot = this.tableSnapshot;
}
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
Expand Down Expand Up @@ -461,4 +464,8 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb);
}

public void setTableSnapshot(TableSnapshot tableSnapshot) {
this.tableSnapshot = tableSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids.analyzer;

import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
Expand Down Expand Up @@ -59,41 +60,47 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu
// the start and end position of the sql substring(e.g. "t1", "db1.t1", "ctl1.db1.t1")
private final Optional<Pair<Integer, Integer>> indexInSqlString;

private final Optional<TableSnapshot> tableSnapshot;

public UnboundRelation(RelationId id, List<String> nameParts) {
this(id, nameParts, Optional.empty(), Optional.empty(), ImmutableList.of(), false, ImmutableList.of(),
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty());
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty(), Optional.empty());
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart) {
this(id, nameParts, Optional.empty(), Optional.empty(), partNames, isTempPart, ImmutableList.of(),
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty());
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty(), Optional.empty());
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName) {
this(id, nameParts, Optional.empty(), Optional.empty(),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty());
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty(),
Optional.empty());
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName,
TableScanParams scanParams) {
TableScanParams scanParams, Optional<TableSnapshot> tableSnapshot) {
this(id, nameParts, Optional.empty(), Optional.empty(),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, Optional.empty());
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, Optional.empty(),
tableSnapshot);
}

public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName) {
this(id, nameParts, groupExpression, logicalProperties, partNames,
isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty());
isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty(), Optional.empty());
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName,
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString) {
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString,
Optional<TableSnapshot> tableSnapshot) {
this(id, nameParts, Optional.empty(), Optional.empty(),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, indexInSqlString);
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, indexInSqlString,
tableSnapshot);
}

/**
Expand All @@ -102,7 +109,8 @@ public UnboundRelation(RelationId id, List<String> nameParts, List<String> partN
public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName,
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString) {
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString,
Optional<TableSnapshot> tableSnapshot) {
super(id, PlanType.LOGICAL_UNBOUND_RELATION, groupExpression, logicalProperties);
this.nameParts = ImmutableList.copyOf(Objects.requireNonNull(nameParts, "nameParts should not null"));
this.partNames = ImmutableList.copyOf(Objects.requireNonNull(partNames, "partNames should not null"));
Expand All @@ -113,6 +121,7 @@ public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpr
this.indexName = indexName;
this.scanParams = scanParams;
this.indexInSqlString = indexInSqlString;
this.tableSnapshot = tableSnapshot;
}

public List<String> getNameParts() {
Expand All @@ -133,14 +142,14 @@ public LogicalProperties computeLogicalProperties() {
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundRelation(relationId, nameParts,
groupExpression, Optional.of(getLogicalProperties()),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString);
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString, tableSnapshot);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundRelation(relationId, nameParts, groupExpression, logicalProperties, partNames,
isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString);
isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString, tableSnapshot);
}

@Override
Expand Down Expand Up @@ -207,4 +216,8 @@ public TableScanParams getScanParams() {
public Optional<Pair<Integer, Integer>> getIndexInSqlString() {
return indexInSqlString;
}

public Optional<TableSnapshot> getTableSnapshot() {
return tableSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,10 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
break;
case ICEBERG:
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
IcebergScanNode icebergScanNode = (IcebergScanNode) scanNode;
if (fileScan.getTableSnapshot().isPresent()) {
icebergScanNode.setTableSnapshot(fileScan.getTableSnapshot().get());
}
break;
case HIVE:
scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.BuiltinAggregateFunctions;
Expand Down Expand Up @@ -1370,15 +1371,25 @@ public LogicalPlan visitTableName(TableNameContext ctx) {
scanParams = new TableScanParams(ctx.optScanParams().funcName.getText(), map);
}

TableSnapshot tableSnapshot = null;
if (ctx.tableSnapshot() != null) {
if (ctx.tableSnapshot().TIME() != null) {
tableSnapshot = new TableSnapshot(stripQuotes(ctx.tableSnapshot().time.getText()));
} else {
tableSnapshot = new TableSnapshot(Long.parseLong(ctx.tableSnapshot().version.getText()));
}
}

MultipartIdentifierContext identifier = ctx.multipartIdentifier();
TableSample tableSample = ctx.sample() == null ? null : (TableSample) visit(ctx.sample());
UnboundRelation relation = forCreateView ? new UnboundRelation(StatementScopeIdGenerator.newRelationId(),
tableId, partitionNames, isTempPart, tabletIdLists, relationHints,
Optional.ofNullable(tableSample), indexName, scanParams,
Optional.of(Pair.of(identifier.start.getStartIndex(), identifier.stop.getStopIndex()))) :
Optional.of(Pair.of(identifier.start.getStartIndex(), identifier.stop.getStopIndex())),
Optional.ofNullable(tableSnapshot)) :
new UnboundRelation(StatementScopeIdGenerator.newRelationId(),
tableId, partitionNames, isTempPart, tabletIdLists, relationHints,
Optional.ofNullable(tableSample), indexName, scanParams);
Optional.ofNullable(tableSample), indexName, scanParams, Optional.ofNullable(tableSnapshot));
LogicalPlan checkedRelation = LogicalPlanBuilderAssistant.withCheckPolicy(relation);
LogicalPlan plan = withTableAlias(checkedRelation, ctx.tableAlias());
for (LateralViewContext lateralViewContext : ctx.lateralView()) {
Expand All @@ -1387,6 +1398,14 @@ public LogicalPlan visitTableName(TableNameContext ctx) {
return plan;
}

public static String stripQuotes(String str) {
if ((str.charAt(0) == '\'' && str.charAt(str.length() - 1) == '\'')
|| (str.charAt(0) == '\"' && str.charAt(str.length() - 1) == '\"')) {
str = str.substring(1, str.length() - 1);
}
return str;
}

@Override
public LogicalPlan visitAliasedQuery(AliasedQueryContext ctx) {
if (ctx.tableAlias().getText().equals("")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,15 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio
}
hmsTable.setScanParams(unboundRelation.getScanParams());
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
qualifierWithoutTableName, unboundRelation.getTableSample());
qualifierWithoutTableName, unboundRelation.getTableSample(),
unboundRelation.getTableSnapshot());
case ICEBERG_EXTERNAL_TABLE:
case PAIMON_EXTERNAL_TABLE:
case MAX_COMPUTE_EXTERNAL_TABLE:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table,
qualifierWithoutTableName, unboundRelation.getTableSample());
qualifierWithoutTableName, unboundRelation.getTableSample(),
unboundRelation.getTableSnapshot());
case SCHEMA:
return new LogicalSchemaScan(unboundRelation.getRelationId(), table, qualifierWithoutTableName);
case JDBC_EXTERNAL_TABLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public Rule build() {
fileScan.getLogicalProperties(),
fileScan.getConjuncts(),
fileScan.getSelectedPartitions(),
fileScan.getTableSample())
fileScan.getTableSample(),
fileScan.getTableSnapshot())
).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.trees.plans.logical;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
Expand Down Expand Up @@ -46,22 +47,25 @@ public class LogicalFileScan extends LogicalExternalRelation {

private final SelectedPartitions selectedPartitions;
private final Optional<TableSample> tableSample;
private final Optional<TableSnapshot> tableSnapshot;

/**
* Constructor for LogicalFileScan.
*/
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample) {
Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample,
Optional<TableSnapshot> tableSnapshot) {
super(id, PlanType.LOGICAL_FILE_SCAN, table, qualifier, conjuncts, groupExpression, logicalProperties);
this.selectedPartitions = selectedPartitions;
this.tableSample = tableSample;
this.tableSnapshot = tableSnapshot;
}

public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<TableSample> tableSample) {
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED, tableSample);
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot);
}

public SelectedPartitions getSelectedPartitions() {
Expand All @@ -72,6 +76,10 @@ public Optional<TableSample> getTableSample() {
return tableSample;
}

public Optional<TableSnapshot> getTableSnapshot() {
return tableSnapshot;
}

@Override
public ExternalTable getTable() {
Preconditions.checkArgument(table instanceof ExternalTable,
Expand All @@ -90,31 +98,31 @@ public String toString() {
@Override
public LogicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, groupExpression,
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier,
groupExpression, logicalProperties, conjuncts, selectedPartitions, tableSample);
groupExpression, logicalProperties, conjuncts, selectedPartitions, tableSample, tableSnapshot);
}

@Override
public LogicalFileScan withConjuncts(Set<Expression> conjuncts) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot);
}

public LogicalFileScan withSelectedPartitions(SelectedPartitions selectedPartitions) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot);
}

@Override
public LogicalFileScan withRelationId(RelationId relationId) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
Optional.empty(), conjuncts, selectedPartitions, tableSample);
Optional.empty(), conjuncts, selectedPartitions, tableSample, tableSnapshot);
}

@Override
Expand Down
Loading

0 comments on commit 480459e

Please sign in to comment.