Skip to content

Commit

Permalink
[apache#4812] fix(core): Make ensure the schema exists before creatin…
Browse files Browse the repository at this point in the history
…g tables and topics
  • Loading branch information
jerqi committed Aug 30, 2024
1 parent 972edc1 commit e83f391
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,70 +143,25 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the table again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Table table =
doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() ->
internalCreateTable(
ident,
columns,
comment,
properties,
partitions,
distribution,
sortOrders,
indexes));
}

/**
Expand Down Expand Up @@ -462,4 +417,79 @@ private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
table.properties()))
.withImported(tableEntity != null);
}

private Table internalCreateTable(
NameIdentifier ident,
Column[] columns,
String comment,
Map<String, String> properties,
Transform[] partitions,
Distribution distribution,
SortOrder[] sortOrders,
Index[] indexes) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the table again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Table table =
doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,58 +128,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the topic again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Topic topic =
doWithCatalog(
catalogIdent,
c ->
c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() -> internalCreateTopic(ident, comment, dataLayout, properties));
}

/**
Expand Down Expand Up @@ -364,4 +322,60 @@ private EntityCombinedTopic internalLoadTopic(NameIdentifier ident) {
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()))
.withImported(topicEntity != null);
}

private Topic internalCreateTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the topic again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Topic topic =
doWithCatalog(
catalogIdent,
c ->
c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends TestOperationDispatcher {
NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA, "TEST_PARTITION_NORMALIZE_TABLE");

@BeforeAll
public static void initialize() {
public static void initialize() throws IllegalAccessException {
TestPartitionOperationDispatcher.prepareTable();
partitionNormalizeDispatcher =
new PartitionNormalizeDispatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@
*/
package org.apache.gravitino.catalog;

import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import com.google.common.collect.Maps;
import java.util.Arrays;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
Expand Down Expand Up @@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends TestOperationDispatcher {
Maps.newHashMap());

@BeforeAll
public static void initialize() {
public static void initialize() throws IllegalAccessException {
prepareTable();
partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);

Expand All @@ -66,14 +76,22 @@ public static void initialize() {
"Custom class loader is not used");
}

protected static void prepareTable() {
protected static void prepareTable() throws IllegalAccessException {
schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator);

Config config = mock(Config.class);
doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true);
FieldUtils.writeField(
GravitinoEnv.getInstance(), "schemaDispatcher", schemaOperationDispatcher, true);

NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, catalog, SCHEMA);
schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
Column[] columns =
Expand Down
Loading

0 comments on commit e83f391

Please sign in to comment.