From afe401d33558aa5e7053cfe36e15d6846cd8bd8d Mon Sep 17 00:00:00 2001 From: zhangbutao Date: Sat, 11 May 2024 09:14:08 +0800 Subject: [PATCH] [feat](nereids) support Iceberg time travel syntax --- .../org/apache/doris/nereids/DorisParser.g4 | 6 +++- .../apache/doris/analysis/TableSnapshot.java | 6 +++- .../datasource/hive/HMSExternalTable.java | 10 ++++++ .../iceberg/source/IcebergHMSSource.java | 10 +++++- .../iceberg/source/IcebergScanNode.java | 6 +++- .../iceberg/source/IcebergSource.java | 5 +++ .../nereids/analyzer/UnboundRelation.java | 34 +++++++++++++------ .../nereids/parser/LogicalPlanBuilder.java | 24 +++++++++++-- .../nereids/rules/analysis/BindRelation.java | 1 + 9 files changed, 85 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 8a279d455c506e5..1c90d08296bcac4 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -524,7 +524,7 @@ optScanParams relationPrimary : multipartIdentifier optScanParams? materializedViewName? specifiedPartition? - tabletList? tableAlias sample? relationHint? lateralView* #tableName + tabletList? tableAlias sample? tableSnapshot? relationHint? lateralView* #tableName | LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery | tvfName=identifier LEFT_PAREN (properties=propertyItemList)? @@ -966,6 +966,10 @@ sampleMethod | INTEGER_VALUE ROWS #sampleByRows ; +tableSnapshot + : FOR tableSnapshotType=(TIME | VERSION) AS OF valueExpression + ; + // this rule is used for explicitly capturing wrong identifiers such as test-table, which should actually be `test-table` // replace identifier with errorCapturingIdentifier where the immediate follow symbol is not an expression, otherwise // valid expressions such as "a-b" can be recognized as an identifier diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java index 0a851a9fd665adb..c77c4c38268b9ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java @@ -27,7 +27,7 @@ public enum VersionType { TIME, VERSION } - private final VersionType type; + private VersionType type; private String time; private long version; @@ -51,6 +51,10 @@ public VersionType getType() { return type; } + public void setType(VersionType type) { + this.type = type; + } + public String getTime() { return time; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 68cd9d374a50f1e..817d6ed90ebe81c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableScanParams; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; @@ -169,6 +170,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI // for hudi incremental read private TableScanParams scanParams = null; private IncrementalRelation incrementalRelation = null; + private TableSnapshot tableSnapshot = null; public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG @@ -353,6 +355,14 @@ public void setScanParams(TableScanParams scanParams) { this.scanParams = scanParams; } + public void setTableSnapshotVersion(TableSnapshot tableSnapshot) { + this.tableSnapshot = tableSnapshot; + } + + public TableSnapshot getTableSnapshotVersion() { + return tableSnapshot; + } + public IncrementalRelation getIncrementalRelation() { return incrementalRelation; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java index 632120e5c452ec3..130e9c67affa0de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg.source; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; @@ -40,9 +41,10 @@ public class IcebergHMSSource implements IcebergSource { private final TupleDescriptor desc; private final Map columnNameToRange; private final org.apache.iceberg.Table icebergTable; + private final TableSnapshot tableSnapshot; public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, - Map columnNameToRange) { + Map columnNameToRange, TableSnapshot tableSnapshot) { this.hmsTable = hmsTable; this.desc = desc; this.columnNameToRange = columnNameToRange; @@ -50,6 +52,7 @@ public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() .getIcebergTable(hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + this.tableSnapshot = tableSnapshot; } @Override @@ -94,4 +97,9 @@ public TFileAttributes getFileAttributes() throws UserException { public ExternalCatalog getCatalog() { return hmsTable.getCatalog(); } + + @Override + public TableSnapshot getTableSnapshot() { + return tableSnapshot; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index ab8b889fdc59d64..c2fd6eb20b246e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -105,7 +105,8 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol ExternalTable table = (ExternalTable) desc.getTable(); if (table instanceof HMSExternalTable) { - source = new IcebergHMSSource((HMSExternalTable) table, desc, columnNameToRange); + source = new IcebergHMSSource((HMSExternalTable) table, desc, columnNameToRange, + ((HMSExternalTable) table).getTableSnapshotVersion()); } else if (table instanceof IcebergExternalTable) { String catalogType = ((IcebergExternalTable) table).getIcebergCatalogType(); switch (catalogType) { @@ -277,6 +278,9 @@ private List doGetSplits() throws UserException { public Long getSpecifiedSnapshot() throws UserException { TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot(); + if (tableSnapshot == null) { + tableSnapshot = source.getTableSnapshot(); + } if (tableSnapshot != null) { TableSnapshot.VersionType type = tableSnapshot.getType(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java index 270a4d4df18f612..75e0f654ee05487 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg.source; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; @@ -41,4 +42,8 @@ public interface IcebergSource { ExternalCatalog getCatalog(); String getFileFormat() throws DdlException, MetaNotFoundException; + + default TableSnapshot getTableSnapshot() { + return null; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java index 34217b249fc5108..53dfb899c857790 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.analyzer; import org.apache.doris.analysis.TableScanParams; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.common.Pair; import org.apache.doris.nereids.exceptions.UnboundException; import org.apache.doris.nereids.memo.GroupExpression; @@ -59,41 +60,46 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu // the start and end position of the sql substring(e.g. "t1", "db1.t1", "ctl1.db1.t1") private final Optional> indexInSqlString; + private final TableSnapshot tableSnapshot; + public UnboundRelation(RelationId id, List nameParts) { this(id, nameParts, Optional.empty(), Optional.empty(), ImmutableList.of(), false, ImmutableList.of(), - ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty()); + ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty(), null); } public UnboundRelation(RelationId id, List nameParts, List partNames, boolean isTempPart) { this(id, nameParts, Optional.empty(), Optional.empty(), partNames, isTempPart, ImmutableList.of(), - ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty()); + ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty(), null); } public UnboundRelation(RelationId id, List nameParts, List partNames, boolean isTempPart, List tabletIds, List hints, Optional tableSample, Optional indexName) { this(id, nameParts, Optional.empty(), Optional.empty(), - partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty()); + partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty(), null); } public UnboundRelation(RelationId id, List nameParts, List partNames, boolean isTempPart, List tabletIds, List hints, Optional tableSample, Optional indexName, - TableScanParams scanParams) { + TableScanParams scanParams, TableSnapshot tableSnapshot) { this(id, nameParts, Optional.empty(), Optional.empty(), - partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, Optional.empty()); + partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, Optional.empty(), + tableSnapshot); } public UnboundRelation(RelationId id, List nameParts, Optional groupExpression, Optional logicalProperties, List partNames, boolean isTempPart, List tabletIds, List hints, Optional tableSample, Optional indexName) { this(id, nameParts, groupExpression, logicalProperties, partNames, - isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty()); + isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty(), null); } public UnboundRelation(RelationId id, List nameParts, List partNames, boolean isTempPart, List tabletIds, List hints, Optional tableSample, Optional indexName, - TableScanParams scanParams, Optional> indexInSqlString) { + TableScanParams scanParams, Optional> indexInSqlString, + TableSnapshot tableSnapshot) { this(id, nameParts, Optional.empty(), Optional.empty(), - partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, indexInSqlString); + partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, indexInSqlString, + tableSnapshot); } /** @@ -102,7 +108,8 @@ public UnboundRelation(RelationId id, List nameParts, List partN public UnboundRelation(RelationId id, List nameParts, Optional groupExpression, Optional logicalProperties, List partNames, boolean isTempPart, List tabletIds, List hints, Optional tableSample, Optional indexName, - TableScanParams scanParams, Optional> indexInSqlString) { + TableScanParams scanParams, Optional> indexInSqlString, + TableSnapshot tableSnapshot) { super(id, PlanType.LOGICAL_UNBOUND_RELATION, groupExpression, logicalProperties); this.nameParts = ImmutableList.copyOf(Objects.requireNonNull(nameParts, "nameParts should not null")); this.partNames = ImmutableList.copyOf(Objects.requireNonNull(partNames, "partNames should not null")); @@ -113,6 +120,7 @@ public UnboundRelation(RelationId id, List nameParts, Optional getNameParts() { @@ -133,14 +141,14 @@ public LogicalProperties computeLogicalProperties() { public Plan withGroupExpression(Optional groupExpression) { return new UnboundRelation(relationId, nameParts, groupExpression, Optional.of(getLogicalProperties()), - partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString); + partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString, tableSnapshot); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new UnboundRelation(relationId, nameParts, groupExpression, logicalProperties, partNames, - isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString); + isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString, tableSnapshot); } @Override @@ -202,4 +210,8 @@ public TableScanParams getScanParams() { public Optional> getIndexInSqlString() { return indexInSqlString; } + + public TableSnapshot getTableSnapshot() { + return tableSnapshot; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 9c7b3ca229ae139..6eef595ead74bc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableScanParams; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.BuiltinAggregateFunctions; @@ -1362,15 +1363,26 @@ public LogicalPlan visitTableName(TableNameContext ctx) { scanParams = new TableScanParams(ctx.optScanParams().funcName.getText(), map); } + TableSnapshot tableSnapshot = null; + if (ctx.tableSnapshot() != null) { + if (ctx.tableSnapshot().tableSnapshotType.getText().equalsIgnoreCase("time")) { + tableSnapshot = new TableSnapshot(stripQuotes(ctx.tableSnapshot().valueExpression().getText())); + tableSnapshot.setType(TableSnapshot.VersionType.TIME); + } else { + tableSnapshot = new TableSnapshot(Long.parseLong(ctx.tableSnapshot().valueExpression().getText())); + tableSnapshot.setType(TableSnapshot.VersionType.VERSION); + } + } + MultipartIdentifierContext identifier = ctx.multipartIdentifier(); TableSample tableSample = ctx.sample() == null ? null : (TableSample) visit(ctx.sample()); UnboundRelation relation = forCreateView ? new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableId, partitionNames, isTempPart, tabletIdLists, relationHints, Optional.ofNullable(tableSample), indexName, scanParams, - Optional.of(Pair.of(identifier.start.getStartIndex(), identifier.stop.getStopIndex()))) : + Optional.of(Pair.of(identifier.start.getStartIndex(), identifier.stop.getStopIndex())), tableSnapshot) : new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableId, partitionNames, isTempPart, tabletIdLists, relationHints, - Optional.ofNullable(tableSample), indexName, scanParams); + Optional.ofNullable(tableSample), indexName, scanParams, tableSnapshot); LogicalPlan checkedRelation = LogicalPlanBuilderAssistant.withCheckPolicy(relation); LogicalPlan plan = withTableAlias(checkedRelation, ctx.tableAlias()); for (LateralViewContext lateralViewContext : ctx.lateralView()) { @@ -1379,6 +1391,14 @@ public LogicalPlan visitTableName(TableNameContext ctx) { return plan; } + public static String stripQuotes(String str) { + if ((str.charAt(0) == '\'' && str.charAt(str.length() - 1) == '\'') + || (str.charAt(0) == '\"' && str.charAt(str.length() - 1) == '\"')) { + str = str.substring(1, str.length() - 1); + } + return str; + } + @Override public LogicalPlan visitAliasedQuery(AliasedQueryContext ctx) { if (ctx.tableAlias().getText().equals("")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 0e6d940891ebce5..4dd8dd032cbda7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -273,6 +273,7 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan); } hmsTable.setScanParams(unboundRelation.getScanParams()); + hmsTable.setTableSnapshotVersion(unboundRelation.getTableSnapshot()); return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, qualifierWithoutTableName, unboundRelation.getTableSample()); case ICEBERG_EXTERNAL_TABLE: