From 4e41e1d797611bcd05370fc24aab69a9f08717fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=A2=E5=81=A5?= Date: Tue, 16 Jan 2024 18:21:40 +0800 Subject: [PATCH] [feat](Nereids) persist constraint in table (#29767) --- .../apache/doris/common/FeMetaVersion.java | 4 +- .../java/org/apache/doris/catalog/Table.java | 20 +- .../apache/doris/catalog/TableAttributes.java | 53 +++ .../org/apache/doris/catalog/TableIf.java | 34 +- .../doris/catalog/constraint/Constraint.java | 31 +- .../constraint/ForeignKeyConstraint.java | 12 +- .../constraint/PrimaryKeyConstraint.java | 9 +- .../catalog/constraint/TableIdentifier.java | 26 +- .../catalog/constraint/UniqueConstraint.java | 8 +- .../doris/catalog/external/ExternalTable.java | 10 + .../doris/datasource/ExternalSchemaCache.java | 5 + .../apache/doris/journal/JournalEntity.java | 7 + .../exceptions/MetaNotFoundException.java | 74 +++++ .../plans/commands/AddConstraintCommand.java | 15 +- .../plans/commands/DropConstraintCommand.java | 9 +- .../commands/ShowConstraintsCommand.java | 2 +- .../doris/persist/AlterConstraintLog.java | 61 ++++ .../org/apache/doris/persist/EditLog.java | 18 ++ .../apache/doris/persist/OperationType.java | 3 + .../apache/doris/persist/gson/GsonUtils.java | 11 + .../constraint/ConstraintPersistTest.java | 305 ++++++++++++++++++ .../doris/utframe/TestWithFeService.java | 11 + 22 files changed, 683 insertions(+), 45 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/persist/AlterConstraintLog.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 650a831640bfb1..387ce91e2c68e2 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -72,9 +72,11 @@ public final class FeMetaVersion { public static final int VERSION_125 = 125; // For write/read function nullable mode info public static final int VERSION_126 = 126; + // For constraints + public static final int VERSION_127 = 127; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_126; + public static final int VERSION_CURRENT = VERSION_127; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 3395da6b58eeb2..783016e7d80d92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -22,12 +22,14 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.QueryableReentrantReadWriteLock; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; @@ -50,7 +52,6 @@ import java.io.IOException; import java.time.Instant; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -116,8 +117,8 @@ public abstract class Table extends MetaObject implements Writable, TableIf { @SerializedName(value = "comment") protected String comment = ""; - @SerializedName(value = "constraints") - private HashMap constraintsMap = new HashMap<>(); + @SerializedName(value = "ta") + private TableAttributes tableAttributes = new TableAttributes(); // check read lock leaky private Map readLockThreads = null; @@ -334,12 +335,12 @@ public String getQualifiedName() { } public Constraint getConstraint(String name) { - return constraintsMap.get(name); + return getConstraintsMap().get(name); } @Override public Map getConstraintsMapUnsafe() { - return constraintsMap; + return tableAttributes.getConstraintsMap(); } public TableType getType() { @@ -455,9 +456,9 @@ public void write(DataOutput out) throws IOException { for (Column column : fullSchema) { column.write(out); } - Text.writeString(out, comment); - + // write table attributes + Text.writeString(out, GsonUtils.GSON.toJson(tableAttributes)); // write create time out.writeLong(createTime); } @@ -488,7 +489,12 @@ public void readFields(DataInput in) throws IOException { hasCompoundKey = true; } comment = Text.readString(in); + // table attribute only support after version 127 + if (FeMetaVersion.VERSION_127 <= Env.getCurrentEnvJournalVersion()) { + String json = Text.readString(in); + this.tableAttributes = GsonUtils.GSON.fromJson(json, TableAttributes.class); + } // read create time this.createTime = in.readLong(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java new file mode 100644 index 00000000000000..2ac86fada5146a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.catalog.constraint.Constraint; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * TableAttributes contains additional information about all table + */ +public class TableAttributes implements Writable { + @SerializedName(value = "constraints") + private final Map constraintsMap = new HashMap<>(); + + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public TableAttributes read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), TableAttributes.class); + } + + public Map getConstraintsMap() { + return constraintsMap; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index bb6abcdd5092c0..b59aedfdbf3c1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -183,7 +183,7 @@ default Set getForeignKeyConstraints() { } } - default Map getConstraintMap() { + default Map getConstraintsMap() { readLock(); try { return ImmutableMap.copyOf(getConstraintsMapUnsafe()); @@ -236,25 +236,27 @@ default void checkConstraintNotExistence(String name, Constraint primaryKeyConst } } - default void addUniqueConstraint(String name, ImmutableList columns) { + default Constraint addUniqueConstraint(String name, ImmutableList columns) { writeLock(); try { Map constraintMap = getConstraintsMapUnsafe(); UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns)); checkConstraintNotExistence(name, uniqueConstraint, constraintMap); constraintMap.put(name, uniqueConstraint); + return uniqueConstraint; } finally { writeUnlock(); } } - default void addPrimaryKeyConstraint(String name, ImmutableList columns) { + default Constraint addPrimaryKeyConstraint(String name, ImmutableList columns) { writeLock(); try { Map constraintMap = getConstraintsMapUnsafe(); PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns)); checkConstraintNotExistence(name, primaryKeyConstraint, constraintMap); constraintMap.put(name, primaryKeyConstraint); + return primaryKeyConstraint; } finally { writeUnlock(); } @@ -277,7 +279,7 @@ default void updatePrimaryKeyForForeignKey(PrimaryKeyConstraint requirePrimaryKe } } - default void addForeignConstraint(String name, ImmutableList columns, + default Constraint addForeignConstraint(String name, ImmutableList columns, TableIf referencedTable, ImmutableList referencedColumns) { writeLock(); try { @@ -289,12 +291,32 @@ default void addForeignConstraint(String name, ImmutableList columns, foreignKeyConstraint.getReferencedColumnNames()); updatePrimaryKeyForForeignKey(requirePrimaryKey, referencedTable); constraintMap.put(name, foreignKeyConstraint); + return foreignKeyConstraint; } finally { writeUnlock(); } } - default void dropConstraint(String name) { + default void replayAddConstraint(Constraint constraint) { + if (constraint instanceof UniqueConstraint) { + UniqueConstraint uniqueConstraint = (UniqueConstraint) constraint; + this.addUniqueConstraint(constraint.getName(), + ImmutableList.copyOf(uniqueConstraint.getUniqueColumnNames())); + } else if (constraint instanceof PrimaryKeyConstraint) { + PrimaryKeyConstraint primaryKeyConstraint = (PrimaryKeyConstraint) constraint; + this.addPrimaryKeyConstraint(primaryKeyConstraint.getName(), + ImmutableList.copyOf(primaryKeyConstraint.getPrimaryKeyNames())); + } else if (constraint instanceof ForeignKeyConstraint) { + ForeignKeyConstraint foreignKey = (ForeignKeyConstraint) constraint; + this.addForeignConstraint(foreignKey.getName(), + ImmutableList.copyOf(foreignKey.getForeignKeyNames()), + foreignKey.getReferencedTable(), + ImmutableList.copyOf(foreignKey.getReferencedColumnNames())); + } + } + + default Constraint dropConstraint(String name) { + Constraint dropConstraint; writeLock(); try { Map constraintMap = getConstraintsMapUnsafe(); @@ -308,9 +330,11 @@ default void dropConstraint(String name) { ((PrimaryKeyConstraint) constraint).getForeignTables() .forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint)); } + dropConstraint = constraint; } finally { writeUnlock(); } + return dropConstraint; } default void dropFKReferringPK(TableIf table, PrimaryKeyConstraint constraint) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/Constraint.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/Constraint.java index 9faa80f8c1e536..01686aadaacb80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/Constraint.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/Constraint.java @@ -17,15 +17,25 @@ package org.apache.doris.catalog.constraint; -public abstract class Constraint { +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public abstract class Constraint implements Writable { public enum ConstraintType { FOREIGN_KEY("FOREIGN KEY"), PRIMARY_KEY("PRIMARY KEY"), UNIQUE("UNIQUE"); - + @SerializedName(value = "tn") private final String name; - private ConstraintType(String stringValue) { + ConstraintType(String stringValue) { this.name = stringValue; } @@ -34,7 +44,9 @@ public String getName() { } } + @SerializedName(value = "n") private final String name; + @SerializedName(value = "ty") private final ConstraintType type; @@ -50,4 +62,17 @@ public String getName() { public ConstraintType getType() { return type; } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + /** + * Read Constraint. + **/ + public static Constraint read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Constraint.class); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/ForeignKeyConstraint.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/ForeignKeyConstraint.java index 01dc00052b9fa4..66d8e0a3706b97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/ForeignKeyConstraint.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/ForeignKeyConstraint.java @@ -24,13 +24,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ImmutableSet; +import com.google.gson.annotations.SerializedName; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; public class ForeignKeyConstraint extends Constraint { - private final ImmutableMap foreignToReference; + @SerializedName(value = "ftr") + private final Map foreignToReference; + @SerializedName(value = "rt") private final TableIdentifier referencedTable; public ForeignKeyConstraint(String name, List columns, @@ -50,11 +54,11 @@ public ForeignKeyConstraint(String name, List columns, this.foreignToReference = builder.build(); } - public ImmutableSet getForeignKeyNames() { + public Set getForeignKeyNames() { return foreignToReference.keySet(); } - public ImmutableSet getReferencedColumnNames() { + public Set getReferencedColumnNames() { return ImmutableSet.copyOf(foreignToReference.values()); } @@ -62,7 +66,7 @@ public String getReferencedColumnName(String column) { return foreignToReference.get(column); } - public ImmutableMap getForeignToReference() { + public Map getForeignToReference() { return foreignToReference; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/PrimaryKeyConstraint.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/PrimaryKeyConstraint.java index c868d3a84f8395..47c822ad597139 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/PrimaryKeyConstraint.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/PrimaryKeyConstraint.java @@ -23,15 +23,18 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.gson.annotations.SerializedName; import java.util.HashSet; import java.util.List; import java.util.Set; public class PrimaryKeyConstraint extends Constraint { - private final ImmutableSet columns; + @SerializedName(value = "cols") + private final Set columns; // record the foreign table which references the primary key + @SerializedName(value = "ft") private final Set foreignTables = new HashSet<>(); public PrimaryKeyConstraint(String name, Set columns) { @@ -39,11 +42,11 @@ public PrimaryKeyConstraint(String name, Set columns) { this.columns = ImmutableSet.copyOf(columns); } - public ImmutableSet getPrimaryKeyNames() { + public Set getPrimaryKeyNames() { return columns; } - public ImmutableSet getPrimaryKeys(TableIf table) { + public Set getPrimaryKeys(TableIf table) { return columns.stream().map(table::getColumn).collect(ImmutableSet.toImmutableSet()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java index c4db65c14e8874..2688fd5784db7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java @@ -20,30 +20,42 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.nereids.exceptions.MetaNotFoundException; import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; import java.util.Objects; -class TableIdentifier { +public class TableIdentifier { + @SerializedName(value = "dbId") private final long databaseId; + @SerializedName(value = "tId") private final long tableId; + @SerializedName(value = "cId") + private final long catalogId; - TableIdentifier(TableIf tableIf) { + public TableIdentifier(TableIf tableIf) { Preconditions.checkArgument(tableIf != null, "Table can not be null in constraint"); - databaseId = tableIf.getDatabase().getId(); tableId = tableIf.getId(); + databaseId = tableIf.getDatabase().getId(); + catalogId = tableIf.getDatabase().getCatalog().getId(); } - TableIf toTableIf() { - DatabaseIf databaseIf = Env.getCurrentEnv().getCurrentCatalog().getDbNullable(databaseId); + public TableIf toTableIf() { + CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); + if (catalogIf == null) { + throw new MetaNotFoundException(String.format("Can not find catalog %s in constraint", catalogId)); + } + DatabaseIf databaseIf = catalogIf.getDbNullable(databaseId); if (databaseIf == null) { - throw new RuntimeException(String.format("Can not find database %s in constraint", databaseId)); + throw new MetaNotFoundException(String.format("Can not find database %s in constraint", databaseId)); } TableIf tableIf = databaseIf.getTableNullable(tableId); if (tableIf == null) { - throw new RuntimeException(String.format("Can not find table %s in constraint", databaseId)); + throw new MetaNotFoundException(String.format("Can not find table %s in constraint", databaseId)); } return tableIf; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/UniqueConstraint.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/UniqueConstraint.java index 3e8042dc8e02e6..0b54f13bdee721 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/UniqueConstraint.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/UniqueConstraint.java @@ -22,22 +22,24 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; +import com.google.gson.annotations.SerializedName; import java.util.Set; public class UniqueConstraint extends Constraint { - private final ImmutableSet columns; + @SerializedName(value = "cols") + private final Set columns; public UniqueConstraint(String name, Set columns) { super(ConstraintType.UNIQUE, name); this.columns = ImmutableSet.copyOf(columns); } - public ImmutableSet getUniqueColumnNames() { + public Set getUniqueColumnNames() { return columns; } - public ImmutableSet getUniqueKeys(TableIf table) { + public Set getUniqueKeys(TableIf table) { return columns.stream().map(table::getColumn).collect(ImmutableSet.toImmutableSet()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 49bf28589055db..1af013fcbfe721 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -21,7 +21,9 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableAttributes; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.constraint.Constraint; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -77,6 +79,9 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { protected long timestamp; @SerializedName(value = "dbName") protected String dbName; + @SerializedName(value = "ta") + private final TableAttributes tableAttributes = new TableAttributes(); + // this field will be refreshed after reloading schema protected volatile long schemaUpdateTime; @@ -272,6 +277,11 @@ public Column getColumn(String name) { return null; } + @Override + public Map getConstraintsMapUnsafe() { + return tableAttributes.getConstraintsMap(); + } + @Override public String getEngine() { return getType().toEngineName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index 3eab6028d4607e..962f9d977c9ae0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -94,6 +94,11 @@ public List getSchema(String dbName, String tblName) { } } + public void addSchemaForTest(String dbName, String tblName, ImmutableList schema) { + SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + schemaCache.put(key, schema); + } + public void invalidateTableCache(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); schemaCache.invalidate(key); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 6f6bc945096ea5..1bedda0a7e9d7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -58,6 +58,7 @@ import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.sync.SyncJob; import org.apache.doris.mysql.privilege.UserPropertyInfo; +import org.apache.doris.persist.AlterConstraintLog; import org.apache.doris.persist.AlterDatabasePropertyInfo; import org.apache.doris.persist.AlterLightSchemaChangeInfo; import org.apache.doris.persist.AlterMTMV; @@ -793,6 +794,12 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_DROP_CONSTRAINT: + case OperationType.OP_ADD_CONSTRAINT: { + data = AlterConstraintLog.read(in); + isRead = true; + break; + } case OperationType.OP_ALTER_USER: { data = AlterUserOperationLog.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java new file mode 100644 index 00000000000000..f7d19c3f844ddd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.exceptions; + +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; + +import java.util.Optional; + +/** Nereids's AnalysisException. */ +public class MetaNotFoundException extends RuntimeException { + private final String message; + private final Optional line; + private final Optional startPosition; + private final Optional plan; + + public MetaNotFoundException(String message, Throwable cause, Optional line, + Optional startPosition, Optional plan) { + super(message, cause); + this.message = message; + this.line = line; + this.startPosition = startPosition; + this.plan = plan; + } + + public MetaNotFoundException(String message, Optional line, + Optional startPosition, Optional plan) { + super(message); + this.message = message; + this.line = line; + this.startPosition = startPosition; + this.plan = plan; + } + + public MetaNotFoundException(String message, Throwable cause) { + this(message, cause, Optional.empty(), Optional.empty(), Optional.empty()); + } + + public MetaNotFoundException(String message) { + this(message, Optional.empty(), Optional.empty(), Optional.empty()); + } + + @Override + public String getMessage() { + String planAnnotation = plan.map(p -> ";\n" + p.treeString()).orElse(""); + return getSimpleMessage() + planAnnotation; + } + + private String getSimpleMessage() { + if (line.isPresent() || startPosition.isPresent()) { + String lineAnnotation = line.map(l -> "line " + l).orElse(""); + String positionAnnotation = startPosition.map(s -> " pos " + s).orElse(""); + return message + ";" + lineAnnotation + positionAnnotation; + } else { + return message; + } + } + + // TODO: support ErrorCode +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java index 4086758e74fe65..8c90bc0f9144b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AddConstraintCommand.java @@ -17,7 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; import org.apache.doris.nereids.NereidsPlanner; @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.persist.AlterConstraintLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; @@ -46,6 +47,7 @@ public class AddConstraintCommand extends Command implements ForwardWithSync { public static final Logger LOG = LogManager.getLogger(AddConstraintCommand.class); + private final String name; private final Constraint constraint; @@ -61,16 +63,19 @@ public AddConstraintCommand(String name, Constraint constraint) { @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { Pair, TableIf> columnsAndTable = extractColumnsAndTable(ctx, constraint.toProject()); + org.apache.doris.catalog.constraint.Constraint catalogConstraint = null; if (constraint.isForeignKey()) { Pair, TableIf> referencedColumnsAndTable = extractColumnsAndTable(ctx, constraint.toReferenceProject()); - columnsAndTable.second.addForeignConstraint(name, columnsAndTable.first, + catalogConstraint = columnsAndTable.second.addForeignConstraint(name, columnsAndTable.first, referencedColumnsAndTable.second, referencedColumnsAndTable.first); } else if (constraint.isPrimaryKey()) { - columnsAndTable.second.addPrimaryKeyConstraint(name, columnsAndTable.first); + catalogConstraint = columnsAndTable.second.addPrimaryKeyConstraint(name, columnsAndTable.first); } else if (constraint.isUnique()) { - columnsAndTable.second.addUniqueConstraint(name, columnsAndTable.first); + catalogConstraint = columnsAndTable.second.addUniqueConstraint(name, columnsAndTable.first); } + Env.getCurrentEnv().getEditLog().logAddConstraint( + new AlterConstraintLog(catalogConstraint, columnsAndTable.second)); } private Pair, TableIf> extractColumnsAndTable(ConnectContext ctx, LogicalPlan plan) { @@ -82,8 +87,6 @@ private Pair, TableIf> extractColumnsAndTable(ConnectConte throw new AnalysisException("Can not found table in constraint " + constraint.toString()); } LogicalCatalogRelation catalogRelation = logicalCatalogRelationSet.iterator().next(); - Preconditions.checkArgument(catalogRelation.getTable() instanceof Table, - "We only support table now but we meet ", catalogRelation.getTable()); ImmutableList columns = analyzedPlan.getOutput().stream() .map(s -> { Preconditions.checkArgument(s instanceof SlotReference diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java index 202f53197d2bec..84143f234c33f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropConstraintCommand.java @@ -17,7 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -28,10 +28,10 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.persist.AlterConstraintLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,7 +58,8 @@ public DropConstraintCommand(String name, LogicalPlan plan) { @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { TableIf table = extractTable(ctx, plan); - table.dropConstraint(name); + org.apache.doris.catalog.constraint.Constraint catalogConstraint = table.dropConstraint(name); + Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(catalogConstraint, table)); } private TableIf extractTable(ConnectContext ctx, LogicalPlan plan) { @@ -70,8 +71,6 @@ private TableIf extractTable(ConnectContext ctx, LogicalPlan plan) { throw new AnalysisException("Can not found table when dropping constraint"); } LogicalCatalogRelation catalogRelation = logicalCatalogRelationSet.iterator().next(); - Preconditions.checkArgument(catalogRelation.getTable() instanceof Table, - "Don't support table ", catalogRelation.getTable()); return catalogRelation.getTable(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowConstraintsCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowConstraintsCommand.java index 32950308e01885..13d1f0c5f6d2a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowConstraintsCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowConstraintsCommand.java @@ -51,7 +51,7 @@ public ShowConstraintsCommand(List nameParts) { public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { TableIf tableIf = RelationUtil.getDbAndTable( RelationUtil.getQualifierName(ctx, nameParts), ctx.getEnv()).value(); - List> res = tableIf.getConstraintMap().entrySet().stream() + List> res = tableIf.getConstraintsMap().entrySet().stream() .map(e -> Lists.newArrayList(e.getKey(), e.getValue().getType().getName(), e.getValue().toString())) diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterConstraintLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterConstraintLog.java new file mode 100644 index 00000000000000..eef8b2cd2b55bc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterConstraintLog.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.constraint.Constraint; +import org.apache.doris.catalog.constraint.TableIdentifier; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class AlterConstraintLog implements Writable { + @SerializedName("ct") + final Constraint constraint; + @SerializedName("tid") + final TableIdentifier tableIdentifier; + + public AlterConstraintLog(Constraint constraint, TableIf table) { + this.constraint = constraint; + this.tableIdentifier = new TableIdentifier(table); + } + + public TableIf getTableIf() { + return tableIdentifier.toTableIf(); + } + + public Constraint getConstraint() { + return constraint; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static AlterConstraintLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, AlterConstraintLog.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 6eae5a688647ba..fb97c02b00ccd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -973,6 +973,16 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_ALTER_MTMV_STMT: { break; } + case OperationType.OP_ADD_CONSTRAINT: { + final AlterConstraintLog log = (AlterConstraintLog) journal.getData(); + log.getTableIf().replayAddConstraint(log.getConstraint()); + break; + } + case OperationType.OP_DROP_CONSTRAINT: { + final AlterConstraintLog log = (AlterConstraintLog) journal.getData(); + log.getTableIf().dropConstraint(log.getConstraint().getName()); + break; + } case OperationType.OP_ALTER_USER: { final AlterUserOperationLog log = (AlterUserOperationLog) journal.getData(); env.getAuth().replayAlterUser(log); @@ -1969,6 +1979,14 @@ public void logAlterMTMV(AlterMTMV log) { } + public void logAddConstraint(AlterConstraintLog log) { + logEdit(OperationType.OP_ADD_CONSTRAINT, log); + } + + public void logDropConstraint(AlterConstraintLog log) { + logEdit(OperationType.OP_DROP_CONSTRAINT, log); + } + public void logInsertOverwrite(InsertOverwriteLog log) { logEdit(OperationType.OP_INSERT_OVERWRITE, log); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index d26ddc12f4901f..42312297b917d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -285,6 +285,8 @@ public class OperationType { public static final short OP_CHANGE_MTMV_TASK = 342; @Deprecated public static final short OP_ALTER_MTMV_STMT = 345; + public static final short OP_ADD_CONSTRAINT = 346; + public static final short OP_DROP_CONSTRAINT = 347; public static final short OP_DROP_EXTERNAL_TABLE = 350; public static final short OP_DROP_EXTERNAL_DB = 351; @@ -355,6 +357,7 @@ public class OperationType { public static final short OP_INSERT_OVERWRITE = 461; + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index db49ecdc9920a1..54c109b61a56eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -44,6 +44,10 @@ import org.apache.doris.catalog.SparkResource; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.constraint.Constraint; +import org.apache.doris.catalog.constraint.ForeignKeyConstraint; +import org.apache.doris.catalog.constraint.PrimaryKeyConstraint; +import org.apache.doris.catalog.constraint.UniqueConstraint; import org.apache.doris.catalog.external.EsExternalDatabase; import org.apache.doris.catalog.external.EsExternalTable; import org.apache.doris.catalog.external.ExternalDatabase; @@ -202,6 +206,12 @@ public class GsonUtils { Policy.class, "clazz").registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName()) .registerSubtype(StoragePolicy.class, StoragePolicy.class.getSimpleName()); + private static RuntimeTypeAdapterFactory constraintTypeAdapterFactory = RuntimeTypeAdapterFactory.of( + Constraint.class, "clazz") + .registerSubtype(PrimaryKeyConstraint.class, PrimaryKeyConstraint.class.getSimpleName()) + .registerSubtype(ForeignKeyConstraint.class, ForeignKeyConstraint.class.getSimpleName()) + .registerSubtype(UniqueConstraint.class, UniqueConstraint.class.getSimpleName()); + private static RuntimeTypeAdapterFactory dsTypeAdapterFactory = RuntimeTypeAdapterFactory.of( CatalogIf.class, "clazz") .registerSubtype(InternalCatalog.class, InternalCatalog.class.getSimpleName()) @@ -287,6 +297,7 @@ public class GsonUtils { .registerTypeAdapterFactory(hbResponseTypeAdapterFactory) .registerTypeAdapterFactory(rdsTypeAdapterFactory) .registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory) + .registerTypeAdapterFactory(constraintTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()) .registerTypeAdapter(PartitionKey.class, new PartitionKey.PartitionKeySerializer()) diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java new file mode 100644 index 00000000000000..64f3db583ad0c5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/constraint/ConstraintPersistTest.java @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.constraint; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.EsExternalDatabase; +import org.apache.doris.catalog.external.EsExternalTable; +import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.EsExternalCatalog; +import org.apache.doris.journal.JournalEntity; +import org.apache.doris.nereids.util.PlanPatternMatchSupported; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.persist.AlterConstraintLog; +import org.apache.doris.persist.EditLog; +import org.apache.doris.persist.OperationType; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.ImmutableList; +import mockit.Mock; +import mockit.MockUp; +import org.apache.hadoop.util.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +class ConstraintPersistTest extends TestWithFeService implements PlanPatternMatchSupported { + + @Override + public void runBeforeAll() throws Exception { + createDatabase("test"); + connectContext.setDatabase("test"); + createTable("create table t1 (\n" + + " k1 int,\n" + + " k2 int\n" + + ")\n" + + "unique key(k1, k2)\n" + + "distributed by hash(k1) buckets 4\n" + + "properties(\n" + + " \"replication_num\"=\"1\"\n" + + ")"); + createTable("create table t2 (\n" + + " k1 int,\n" + + " k2 int\n" + + ")\n" + + "unique key(k1, k2)\n" + + "distributed by hash(k1) buckets 4\n" + + "properties(\n" + + " \"replication_num\"=\"1\"\n" + + ")"); + } + + @Test + void addConstraintLogPersistTest() throws Exception { + Config.edit_log_type = "local"; + addConstraint("alter table t1 add constraint pk primary key (k1)"); + addConstraint("alter table t2 add constraint pk primary key (k1)"); + addConstraint("alter table t1 add constraint uk unique (k1)"); + addConstraint("alter table t1 add constraint fk foreign key (k1) references t2(k1)"); + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv()); + Map constraintMap = tableIf.getConstraintsMap(); + tableIf.getConstraintsMapUnsafe().clear(); + Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + for (Constraint value : constraintMap.values()) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.setData(new AlterConstraintLog(value, tableIf)); + journalEntity.setOpCode(OperationType.OP_ADD_CONSTRAINT); + journalEntity.write(output); + } + InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInput input = new DataInputStream(inputStream); + for (int i = 0; i < constraintMap.values().size(); i++) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.readFields(input); + EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity); + } + Assertions.assertEquals(tableIf.getConstraintsMap(), constraintMap); + dropConstraint("alter table t1 drop constraint fk"); + dropConstraint("alter table t1 drop constraint pk"); + dropConstraint("alter table t2 drop constraint pk"); + dropConstraint("alter table t1 drop constraint uk"); + } + + @Test + void dropConstraintLogPersistTest() throws Exception { + Config.edit_log_type = "local"; + addConstraint("alter table t1 add constraint pk primary key (k1)"); + addConstraint("alter table t2 add constraint pk primary key (k1)"); + addConstraint("alter table t1 add constraint uk unique (k1)"); + addConstraint("alter table t1 add constraint fk foreign key (k1) references t2(k1)"); + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv()); + Map constraintMap = tableIf.getConstraintsMap(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + for (Constraint value : constraintMap.values()) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.setData(new AlterConstraintLog(value, tableIf)); + journalEntity.setOpCode(OperationType.OP_DROP_CONSTRAINT); + journalEntity.write(output); + } + InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInput input = new DataInputStream(inputStream); + for (int i = 0; i < constraintMap.values().size(); i++) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.readFields(input); + EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity); + } + Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty()); + } + + @Test + void constraintWithTablePersistTest() throws Exception { + addConstraint("alter table t1 add constraint pk primary key (k1)"); + addConstraint("alter table t2 add constraint pk primary key (k1)"); + addConstraint("alter table t1 add constraint uk unique (k1)"); + addConstraint("alter table t1 add constraint fk foreign key (k1) references t2(k1)"); + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + tableIf.write(output); + InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInput input = new DataInputStream(inputStream); + TableIf loadTable = Table.read(input); + Assertions.assertEquals(loadTable.getConstraintsMap(), tableIf.getConstraintsMap()); + dropConstraint("alter table t1 drop constraint fk"); + dropConstraint("alter table t1 drop constraint pk"); + dropConstraint("alter table t2 drop constraint pk"); + dropConstraint("alter table t1 drop constraint uk"); + } + + @Test + void externalTableTest() throws Exception { + ExternalTable externalTable = new ExternalTable(); + externalTable.addPrimaryKeyConstraint("pk", ImmutableList.of("col")); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + externalTable.write(output); + InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInput input = new DataInputStream(inputStream); + TableIf loadTable = ExternalTable.read(input); + Assertions.assertEquals(1, loadTable.getConstraintsMap().size()); + } + + @Test + void addConstraintLogPersistForExternalTableTest() throws Exception { + Config.edit_log_type = "local"; + createCatalog("create catalog es properties('type' = 'es', 'elasticsearch.hosts' = 'http://192.168.0.1'," + + " 'elasticsearch.username' = 'user1');"); + + Env.getCurrentEnv().changeCatalog(connectContext, "es"); + EsExternalCatalog esCatalog = (EsExternalCatalog) getCatalog("es"); + EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, "es_db1"); + EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", "es_db1", esCatalog); + ImmutableList schema = ImmutableList.of(new Column("k1", PrimitiveType.INT)); + tbl.setNewFullSchema(schema); + db.addTableForTest(tbl); + esCatalog.addDatabaseForTest(db); + Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(esCatalog).addSchemaForTest(db.getFullName(), tbl.getName(), schema); + new MockUp() { + @Mock + public TableIf getTable(List qualifierName, Env env) { + return tbl; + } + }; + + new MockUp() { + @Mock + public DatabaseIf getDatabase() { + return db; + } + }; + + new MockUp() { + @Mock + public TableIf toTableIf() { + return tbl; + } + }; + + addConstraint("alter table es.es_db1.es_tbl1 add constraint pk primary key (k1)"); + addConstraint("alter table es.es_db1.es_tbl1 add constraint uk unique (k1)"); + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv()); + Map constraintMap = tableIf.getConstraintsMap(); + tableIf.getConstraintsMapUnsafe().clear(); + Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + for (Constraint value : new ArrayList<>(constraintMap.values())) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.setData(new AlterConstraintLog(value, tableIf)); + journalEntity.setOpCode(OperationType.OP_ADD_CONSTRAINT); + journalEntity.write(output); + } + InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInput input = new DataInputStream(inputStream); + for (int i = 0; i < constraintMap.values().size(); i++) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.readFields(input); + EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity); + } + Assertions.assertEquals(tableIf.getConstraintsMap(), constraintMap); + Env.getCurrentEnv().changeCatalog(connectContext, "internal"); + } + + @Test + void dropConstraintLogPersistForExternalTest() throws Exception { + Config.edit_log_type = "local"; + createCatalog("create catalog es2 properties('type' = 'es', 'elasticsearch.hosts' = 'http://192.168.0.1'," + + " 'elasticsearch.username' = 'user1');"); + + Env.getCurrentEnv().changeCatalog(connectContext, "es2"); + EsExternalCatalog esCatalog = (EsExternalCatalog) getCatalog("es2"); + EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, "es_db1"); + EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", "es_db1", esCatalog); + ImmutableList schema = ImmutableList.of(new Column("k1", PrimitiveType.INT)); + tbl.setNewFullSchema(schema); + db.addTableForTest(tbl); + esCatalog.addDatabaseForTest(db); + Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(esCatalog).addSchemaForTest(db.getFullName(), tbl.getName(), schema); + new MockUp() { + @Mock + public TableIf getTable(List qualifierName, Env env) { + return tbl; + } + }; + + new MockUp() { + @Mock + public DatabaseIf getDatabase() { + return db; + } + }; + + new MockUp() { + @Mock + public TableIf toTableIf() { + return tbl; + } + }; + addConstraint("alter table es.es_db1.es_tbl1 add constraint pk primary key (k1)"); + addConstraint("alter table es.es_db1.es_tbl1 add constraint uk unique (k1)"); + TableIf tableIf = RelationUtil.getTable( + RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")), + connectContext.getEnv()); + Map constraintMap = tableIf.getConstraintsMap(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + for (Constraint value : constraintMap.values()) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.setData(new AlterConstraintLog(value, tableIf)); + journalEntity.setOpCode(OperationType.OP_DROP_CONSTRAINT); + journalEntity.write(output); + } + InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + DataInput input = new DataInputStream(inputStream); + for (int i = 0; i < constraintMap.values().size(); i++) { + JournalEntity journalEntity = new JournalEntity(); + journalEntity.readFields(input); + EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity); + } + Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty()); + Env.getCurrentEnv().changeCatalog(connectContext, "internal"); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 85f6e22e6f46c2..1f57040d305b8f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterSqlBlockRuleStmt; import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CreateCatalogStmt; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateFunctionStmt; import org.apache.doris.analysis.CreateMaterializedViewStmt; @@ -55,6 +56,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.datasource.CatalogIf; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; @@ -650,6 +652,15 @@ public void createTableAsSelect(String sql) throws Exception { Env.getCurrentEnv().createTableAsSelect(createTableAsSelectStmt); } + public void createCatalog(String sql) throws Exception { + CreateCatalogStmt stmt = (CreateCatalogStmt) parseAndAnalyzeStmt(sql, connectContext); + Env.getCurrentEnv().getCatalogMgr().createCatalog(stmt); + } + + public CatalogIf getCatalog(String name) throws Exception { + return Env.getCurrentEnv().getCatalogMgr().getCatalog(name); + } + public void createTables(String... sqls) throws Exception { createTables(false, sqls); }