Skip to content

Commit

Permalink
[feat](nereids) support Iceberg time travel syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangbutao committed May 14, 2024
1 parent 7a77cd7 commit afe401d
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ optScanParams

relationPrimary
: multipartIdentifier optScanParams? materializedViewName? specifiedPartition?
tabletList? tableAlias sample? relationHint? lateralView* #tableName
tabletList? tableAlias sample? tableSnapshot? relationHint? lateralView* #tableName
| LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery
| tvfName=identifier LEFT_PAREN
(properties=propertyItemList)?
Expand Down Expand Up @@ -966,6 +966,10 @@ sampleMethod
| INTEGER_VALUE ROWS #sampleByRows
;

tableSnapshot
: FOR tableSnapshotType=(TIME | VERSION) AS OF valueExpression
;

// this rule is used for explicitly capturing wrong identifiers such as test-table, which should actually be `test-table`
// replace identifier with errorCapturingIdentifier where the immediate follow symbol is not an expression, otherwise
// valid expressions such as "a-b" can be recognized as an identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public enum VersionType {
TIME, VERSION
}

private final VersionType type;
private VersionType type;
private String time;
private long version;

Expand All @@ -51,6 +51,10 @@ public VersionType getType() {
return type;
}

public void setType(VersionType type) {
this.type = type;
}

public String getTime() {
return time;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
Expand Down Expand Up @@ -169,6 +170,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
// for hudi incremental read
private TableScanParams scanParams = null;
private IncrementalRelation incrementalRelation = null;
private TableSnapshot tableSnapshot = null;

public enum DLAType {
UNKNOWN, HIVE, HUDI, ICEBERG
Expand Down Expand Up @@ -353,6 +355,14 @@ public void setScanParams(TableScanParams scanParams) {
this.scanParams = scanParams;
}

public void setTableSnapshotVersion(TableSnapshot tableSnapshot) {
this.tableSnapshot = tableSnapshot;
}

public TableSnapshot getTableSnapshotVersion() {
return tableSnapshot;
}

public IncrementalRelation getIncrementalRelation() {
return incrementalRelation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.iceberg.source;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
Expand All @@ -40,16 +41,18 @@ public class IcebergHMSSource implements IcebergSource {
private final TupleDescriptor desc;
private final Map<String, ColumnRange> columnNameToRange;
private final org.apache.iceberg.Table icebergTable;
private final TableSnapshot tableSnapshot;

public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
Map<String, ColumnRange> columnNameToRange, TableSnapshot tableSnapshot) {
this.hmsTable = hmsTable;
this.desc = desc;
this.columnNameToRange = columnNameToRange;
this.icebergTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
.getIcebergTable(hmsTable.getCatalog(),
hmsTable.getDbName(), hmsTable.getName());
this.tableSnapshot = tableSnapshot;
}

@Override
Expand Down Expand Up @@ -94,4 +97,9 @@ public TFileAttributes getFileAttributes() throws UserException {
public ExternalCatalog getCatalog() {
return hmsTable.getCatalog();
}

@Override
public TableSnapshot getTableSnapshot() {
return tableSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol

ExternalTable table = (ExternalTable) desc.getTable();
if (table instanceof HMSExternalTable) {
source = new IcebergHMSSource((HMSExternalTable) table, desc, columnNameToRange);
source = new IcebergHMSSource((HMSExternalTable) table, desc, columnNameToRange,
((HMSExternalTable) table).getTableSnapshotVersion());
} else if (table instanceof IcebergExternalTable) {
String catalogType = ((IcebergExternalTable) table).getIcebergCatalogType();
switch (catalogType) {
Expand Down Expand Up @@ -277,6 +278,9 @@ private List<Split> doGetSplits() throws UserException {

public Long getSpecifiedSnapshot() throws UserException {
TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot();
if (tableSnapshot == null) {
tableSnapshot = source.getTableSnapshot();
}
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.iceberg.source;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
Expand All @@ -41,4 +42,8 @@ public interface IcebergSource {
ExternalCatalog getCatalog();

String getFileFormat() throws DdlException, MetaNotFoundException;

default TableSnapshot getTableSnapshot() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids.analyzer;

import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
Expand Down Expand Up @@ -59,41 +60,46 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu
// the start and end position of the sql substring(e.g. "t1", "db1.t1", "ctl1.db1.t1")
private final Optional<Pair<Integer, Integer>> indexInSqlString;

private final TableSnapshot tableSnapshot;

public UnboundRelation(RelationId id, List<String> nameParts) {
this(id, nameParts, Optional.empty(), Optional.empty(), ImmutableList.of(), false, ImmutableList.of(),
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty());
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty(), null);
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart) {
this(id, nameParts, Optional.empty(), Optional.empty(), partNames, isTempPart, ImmutableList.of(),
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty());
ImmutableList.of(), Optional.empty(), Optional.empty(), null, Optional.empty(), null);
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName) {
this(id, nameParts, Optional.empty(), Optional.empty(),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty());
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty(), null);
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName,
TableScanParams scanParams) {
TableScanParams scanParams, TableSnapshot tableSnapshot) {
this(id, nameParts, Optional.empty(), Optional.empty(),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, Optional.empty());
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, Optional.empty(),
tableSnapshot);
}

public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName) {
this(id, nameParts, groupExpression, logicalProperties, partNames,
isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty());
isTempPart, tabletIds, hints, tableSample, indexName, null, Optional.empty(), null);
}

public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName,
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString) {
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString,
TableSnapshot tableSnapshot) {
this(id, nameParts, Optional.empty(), Optional.empty(),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, indexInSqlString);
partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams, indexInSqlString,
tableSnapshot);
}

/**
Expand All @@ -102,7 +108,8 @@ public UnboundRelation(RelationId id, List<String> nameParts, List<String> partN
public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<String> partNames, boolean isTempPart,
List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName,
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString) {
TableScanParams scanParams, Optional<Pair<Integer, Integer>> indexInSqlString,
TableSnapshot tableSnapshot) {
super(id, PlanType.LOGICAL_UNBOUND_RELATION, groupExpression, logicalProperties);
this.nameParts = ImmutableList.copyOf(Objects.requireNonNull(nameParts, "nameParts should not null"));
this.partNames = ImmutableList.copyOf(Objects.requireNonNull(partNames, "partNames should not null"));
Expand All @@ -113,6 +120,7 @@ public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpr
this.indexName = indexName;
this.scanParams = scanParams;
this.indexInSqlString = indexInSqlString;
this.tableSnapshot = tableSnapshot;
}

public List<String> getNameParts() {
Expand All @@ -133,14 +141,14 @@ public LogicalProperties computeLogicalProperties() {
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundRelation(relationId, nameParts,
groupExpression, Optional.of(getLogicalProperties()),
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString);
partNames, isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString, tableSnapshot);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundRelation(relationId, nameParts, groupExpression, logicalProperties, partNames,
isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString);
isTempPart, tabletIds, hints, tableSample, indexName, null, indexInSqlString, tableSnapshot);
}

@Override
Expand Down Expand Up @@ -202,4 +210,8 @@ public TableScanParams getScanParams() {
public Optional<Pair<Integer, Integer>> getIndexInSqlString() {
return indexInSqlString;
}

public TableSnapshot getTableSnapshot() {
return tableSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.BuiltinAggregateFunctions;
Expand Down Expand Up @@ -1362,15 +1363,26 @@ public LogicalPlan visitTableName(TableNameContext ctx) {
scanParams = new TableScanParams(ctx.optScanParams().funcName.getText(), map);
}

TableSnapshot tableSnapshot = null;
if (ctx.tableSnapshot() != null) {
if (ctx.tableSnapshot().tableSnapshotType.getText().equalsIgnoreCase("time")) {
tableSnapshot = new TableSnapshot(stripQuotes(ctx.tableSnapshot().valueExpression().getText()));
tableSnapshot.setType(TableSnapshot.VersionType.TIME);
} else {
tableSnapshot = new TableSnapshot(Long.parseLong(ctx.tableSnapshot().valueExpression().getText()));
tableSnapshot.setType(TableSnapshot.VersionType.VERSION);
}
}

MultipartIdentifierContext identifier = ctx.multipartIdentifier();
TableSample tableSample = ctx.sample() == null ? null : (TableSample) visit(ctx.sample());
UnboundRelation relation = forCreateView ? new UnboundRelation(StatementScopeIdGenerator.newRelationId(),
tableId, partitionNames, isTempPart, tabletIdLists, relationHints,
Optional.ofNullable(tableSample), indexName, scanParams,
Optional.of(Pair.of(identifier.start.getStartIndex(), identifier.stop.getStopIndex()))) :
Optional.of(Pair.of(identifier.start.getStartIndex(), identifier.stop.getStopIndex())), tableSnapshot) :
new UnboundRelation(StatementScopeIdGenerator.newRelationId(),
tableId, partitionNames, isTempPart, tabletIdLists, relationHints,
Optional.ofNullable(tableSample), indexName, scanParams);
Optional.ofNullable(tableSample), indexName, scanParams, tableSnapshot);
LogicalPlan checkedRelation = LogicalPlanBuilderAssistant.withCheckPolicy(relation);
LogicalPlan plan = withTableAlias(checkedRelation, ctx.tableAlias());
for (LateralViewContext lateralViewContext : ctx.lateralView()) {
Expand All @@ -1379,6 +1391,14 @@ public LogicalPlan visitTableName(TableNameContext ctx) {
return plan;
}

public static String stripQuotes(String str) {
if ((str.charAt(0) == '\'' && str.charAt(str.length() - 1) == '\'')
|| (str.charAt(0) == '\"' && str.charAt(str.length() - 1) == '\"')) {
str = str.substring(1, str.length() - 1);
}
return str;
}

@Override
public LogicalPlan visitAliasedQuery(AliasedQueryContext ctx) {
if (ctx.tableAlias().getText().equals("")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
}
hmsTable.setScanParams(unboundRelation.getScanParams());
hmsTable.setTableSnapshotVersion(unboundRelation.getTableSnapshot());
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
qualifierWithoutTableName, unboundRelation.getTableSample());
case ICEBERG_EXTERNAL_TABLE:
Expand Down

0 comments on commit afe401d

Please sign in to comment.