Skip to content

Commit

Permalink
[#4812] fix(core): Make ensure the schema exists before creating tabl…
Browse files Browse the repository at this point in the history
…es and topics (#4818)

### What changes were proposed in this pull request?
If the schema is not created by Gravitino, the Gravitino will lack the
metadata in the backend storage.
If we create a table in this schema, the storage won't contain the
metadata. So it will fail to set owner. Because the storage won't store
the table. Because the storage won't contain schema id.

This won't bring too much performance cost. Because loadSchema will use
read lock after first loading. If we have cache, we could be more quick.

### Why are the changes needed?

Fix: #4812 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a ut.
  • Loading branch information
jerqi authored and web-flow committed Aug 30, 2024
1 parent 9ebe6ac commit 57c2a17
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,74 +143,25 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

// Retrieve the Table again to obtain some values generated by underlying catalog
Table table =
doWithCatalog(
catalogIdent,
c -> c.doWithTableOps(t -> t.loadTable(ident)),
NoSuchTableException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() ->
internalCreateTable(
ident,
columns,
comment,
properties,
partitions,
distribution,
sortOrders,
indexes));
}

/**
Expand Down Expand Up @@ -476,4 +427,79 @@ private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
table.properties()))
.withImported(tableEntity != null);
}

private Table internalCreateTable(
NameIdentifier ident,
Column[] columns,
String comment,
Map<String, String> properties,
Transform[] partitions,
Distribution distribution,
SortOrder[] sortOrders,
Index[] indexes) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the table again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Table table =
doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,61 +128,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

doWithCatalog(
catalogIdent,
c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

// Retrieve the Topic again to obtain some values generated by underlying catalog
Topic topic =
doWithCatalog(
catalogIdent,
c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
NoSuchTopicException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() -> internalCreateTopic(ident, comment, dataLayout, properties));
}

/**
Expand Down Expand Up @@ -374,4 +329,60 @@ private EntityCombinedTopic internalLoadTopic(NameIdentifier ident) {
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()))
.withImported(topicEntity != null);
}

private Topic internalCreateTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the topic again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Topic topic =
doWithCatalog(
catalogIdent,
c ->
c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends TestOperationDispatcher {
NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA, "TEST_PARTITION_NORMALIZE_TABLE");

@BeforeAll
public static void initialize() {
public static void initialize() throws IllegalAccessException {
TestPartitionOperationDispatcher.prepareTable();
partitionNormalizeDispatcher =
new PartitionNormalizeDispatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@
*/
package org.apache.gravitino.catalog;

import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import com.google.common.collect.Maps;
import java.util.Arrays;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
Expand Down Expand Up @@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends TestOperationDispatcher {
Maps.newHashMap());

@BeforeAll
public static void initialize() {
public static void initialize() throws IllegalAccessException {
prepareTable();
partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);

Expand All @@ -66,14 +76,22 @@ public static void initialize() {
"Custom class loader is not used");
}

protected static void prepareTable() {
protected static void prepareTable() throws IllegalAccessException {
schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator);

Config config = mock(Config.class);
doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true);
FieldUtils.writeField(
GravitinoEnv.getInstance(), "schemaDispatcher", schemaOperationDispatcher, true);

NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, catalog, SCHEMA);
schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
Column[] columns =
Expand Down
Loading

0 comments on commit 57c2a17

Please sign in to comment.