diff --git a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java index 404af695ac3..1257fafcf56 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java @@ -143,74 +143,25 @@ public Table createTable( SortOrder[] sortOrders, Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - doWithCatalog( - catalogIdent, - c -> - c.doWithPropertiesMeta( - p -> { - validatePropertyForCreate(p.tablePropertiesMetadata(), properties); - return null; - }), - IllegalArgumentException.class); - long uid = idGenerator.nextId(); - // Add StringIdentifier to the properties, the specific catalog will handle this - // StringIdentifier to make sure only when the operation is successful, the related - // TableEntity will be visible. - StringIdentifier stringId = StringIdentifier.fromId(uid); - Map updatedProperties = - StringIdentifier.newPropertiesWithId(stringId, properties); - - doWithCatalog( - catalogIdent, - c -> - c.doWithTableOps( - t -> - t.createTable( - ident, - columns, - comment, - updatedProperties, - partitions == null ? EMPTY_TRANSFORM : partitions, - distribution == null ? Distributions.NONE : distribution, - sortOrders == null ? new SortOrder[0] : sortOrders, - indexes == null ? Indexes.EMPTY_INDEXES : indexes)), - NoSuchSchemaException.class, - TableAlreadyExistsException.class); - - // Retrieve the Table again to obtain some values generated by underlying catalog - Table table = - doWithCatalog( - catalogIdent, - c -> c.doWithTableOps(t -> t.loadTable(ident)), - NoSuchTableException.class); - TableEntity tableEntity = - TableEntity.builder() - .withId(uid) - .withName(ident.name()) - .withNamespace(ident.namespace()) - .withAuditInfo( - AuditInfo.builder() - .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) - .withCreateTime(Instant.now()) - .build()) - .build(); - - try { - store.put(tableEntity, true /* overwrite */); - } catch (Exception e) { - LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e); - return EntityCombinedTable.of(table) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties())); - } - - return EntityCombinedTable.of(table, tableEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties())); + // Load the schema to make sure the schema exists. + SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher(); + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + schemaDispatcher.loadSchema(schemaIdent); + + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> + internalCreateTable( + ident, + columns, + comment, + properties, + partitions, + distribution, + sortOrders, + indexes)); } /** @@ -476,4 +427,79 @@ private EntityCombinedTable internalLoadTable(NameIdentifier ident) { table.properties())) .withImported(tableEntity != null); } + + private Table internalCreateTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + doWithCatalog( + catalogIdent, + c -> + c.doWithPropertiesMeta( + p -> { + validatePropertyForCreate(p.tablePropertiesMetadata(), properties); + return null; + }), + IllegalArgumentException.class); + long uid = idGenerator.nextId(); + // Add StringIdentifier to the properties, the specific catalog will handle this + // StringIdentifier to make sure only when the operation is successful, the related + // TableEntity will be visible. + StringIdentifier stringId = StringIdentifier.fromId(uid); + Map updatedProperties = + StringIdentifier.newPropertiesWithId(stringId, properties); + + // we do not retrieve the table again (to obtain some values generated by underlying catalog) + // since some catalogs' API is async and the table may not be created immediately + Table table = + doWithCatalog( + catalogIdent, + c -> + c.doWithTableOps( + t -> + t.createTable( + ident, + columns, + comment, + updatedProperties, + partitions == null ? EMPTY_TRANSFORM : partitions, + distribution == null ? Distributions.NONE : distribution, + sortOrders == null ? new SortOrder[0] : sortOrders, + indexes == null ? Indexes.EMPTY_INDEXES : indexes)), + NoSuchSchemaException.class, + TableAlreadyExistsException.class); + + TableEntity tableEntity = + TableEntity.builder() + .withId(uid) + .withName(ident.name()) + .withNamespace(ident.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(tableEntity, true /* overwrite */); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e); + return EntityCombinedTable.of(table) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties())); + } + + return EntityCombinedTable.of(table, tableEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties())); + } } diff --git a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java index 981d999f99f..e5f1ea16ccb 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java @@ -128,61 +128,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { public Topic createTopic( NameIdentifier ident, String comment, DataLayout dataLayout, Map properties) throws NoSuchSchemaException, TopicAlreadyExistsException { - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - doWithCatalog( - catalogIdent, - c -> - c.doWithPropertiesMeta( - p -> { - validatePropertyForCreate(p.topicPropertiesMetadata(), properties); - return null; - }), - IllegalArgumentException.class); - Long uid = idGenerator.nextId(); - StringIdentifier stringId = StringIdentifier.fromId(uid); - Map updatedProperties = - StringIdentifier.newPropertiesWithId(stringId, properties); - - doWithCatalog( - catalogIdent, - c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)), - NoSuchSchemaException.class, - TopicAlreadyExistsException.class); - - // Retrieve the Topic again to obtain some values generated by underlying catalog - Topic topic = - doWithCatalog( - catalogIdent, - c -> c.doWithTopicOps(t -> t.loadTopic(ident)), - NoSuchTopicException.class); - - TopicEntity topicEntity = - TopicEntity.builder() - .withId(fromProperties(topic.properties()).id()) - .withName(ident.name()) - .withComment(comment) - .withNamespace(ident.namespace()) - .withAuditInfo( - AuditInfo.builder() - .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) - .withCreateTime(Instant.now()) - .build()) - .build(); - try { - store.put(topicEntity, true /* overwrite */); - } catch (Exception e) { - LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e); - return EntityCombinedTopic.of(topic) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); - } + // Load the schema to make sure the schema exists. + SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher(); + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + schemaDispatcher.loadSchema(schemaIdent); - return EntityCombinedTopic.of(topic, topicEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> internalCreateTopic(ident, comment, dataLayout, properties)); } /** @@ -374,4 +329,60 @@ private EntityCombinedTopic internalLoadTopic(NameIdentifier ident) { catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())) .withImported(topicEntity != null); } + + private Topic internalCreateTopic( + NameIdentifier ident, String comment, DataLayout dataLayout, Map properties) { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + doWithCatalog( + catalogIdent, + c -> + c.doWithPropertiesMeta( + p -> { + validatePropertyForCreate(p.topicPropertiesMetadata(), properties); + return null; + }), + IllegalArgumentException.class); + Long uid = idGenerator.nextId(); + StringIdentifier stringId = StringIdentifier.fromId(uid); + Map updatedProperties = + StringIdentifier.newPropertiesWithId(stringId, properties); + + // we do not retrieve the topic again (to obtain some values generated by underlying catalog) + // since some catalogs' API is async and the table may not be created immediately + Topic topic = + doWithCatalog( + catalogIdent, + c -> + c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)), + NoSuchSchemaException.class, + TopicAlreadyExistsException.class); + + TopicEntity topicEntity = + TopicEntity.builder() + .withId(fromProperties(topic.properties()).id()) + .withName(ident.name()) + .withComment(comment) + .withNamespace(ident.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(topicEntity, true /* overwrite */); + } catch (Exception e) { + LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e); + return EntityCombinedTopic.of(topic) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + } + + return EntityCombinedTopic.of(topic, topicEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + } } diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java index 2c47d69bc2a..0151dcf2484 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java @@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends TestOperationDispatcher { NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA, "TEST_PARTITION_NORMALIZE_TABLE"); @BeforeAll - public static void initialize() { + public static void initialize() throws IllegalAccessException { TestPartitionOperationDispatcher.prepareTable(); partitionNormalizeDispatcher = new PartitionNormalizeDispatcher( diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java index 3d81ee557ce..9ddc3b1d30e 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java @@ -18,9 +18,19 @@ */ package org.apache.gravitino.catalog; +import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + import com.google.common.collect.Maps; import java.util.Arrays; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.gravitino.Config; +import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.lock.LockManager; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.expressions.literals.Literal; import org.apache.gravitino.rel.expressions.literals.Literals; @@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends TestOperationDispatcher { Maps.newHashMap()); @BeforeAll - public static void initialize() { + public static void initialize() throws IllegalAccessException { prepareTable(); partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION); @@ -66,7 +76,7 @@ public static void initialize() { "Custom class loader is not used"); } - protected static void prepareTable() { + protected static void prepareTable() throws IllegalAccessException { schemaOperationDispatcher = new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator); tableOperationDispatcher = @@ -74,6 +84,14 @@ protected static void prepareTable() { partitionOperationDispatcher = new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator); + Config config = mock(Config.class); + doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY); + doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY); + doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL); + FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true); + FieldUtils.writeField( + GravitinoEnv.getInstance(), "schemaDispatcher", schemaOperationDispatcher, true); + NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, catalog, SCHEMA); schemaOperationDispatcher.createSchema(schemaIdent, "comment", null); Column[] columns = diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java index f5f69b77af1..1dd31fd33fb 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -44,8 +45,10 @@ import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.TestCatalog; import org.apache.gravitino.TestColumn; import org.apache.gravitino.auth.AuthConstants; +import org.apache.gravitino.connector.TestCatalogOperations; import org.apache.gravitino.exceptions.NoSuchEntityException; import org.apache.gravitino.lock.LockManager; import org.apache.gravitino.meta.AuditInfo; @@ -306,6 +309,9 @@ public void testCreateAndDropTable() throws IOException { TestColumn.builder().withName("col2").withType(Types.StringType.get()).build() }; + schemaOperationDispatcher.createSchema( + NameIdentifier.of(tableIdent.namespace().levels()), "comment", props); + tableOperationDispatcher.createTable(tableIdent, columns, "comment", props, new Transform[0]); boolean dropped = tableOperationDispatcher.dropTable(tableIdent); @@ -319,4 +325,24 @@ public void testCreateAndDropTable() throws IOException { Assertions.assertThrows( RuntimeException.class, () -> tableOperationDispatcher.dropTable(tableIdent)); } + + @Test + public void testCreateTableNeedImportingSchema() throws IOException { + Namespace tableNs = Namespace.of(metalake, catalog, "schema181"); + NameIdentifier tableIdent = NameIdentifier.of(tableNs, "topic81"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + TestCatalog testCatalog = + (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, catalog)); + TestCatalogOperations testCatalogOperations = (TestCatalogOperations) testCatalog.ops(); + testCatalogOperations.createSchema( + NameIdentifier.of(tableNs.levels()), "", Collections.emptyMap()); + Column[] columns = + new Column[] { + TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(), + TestColumn.builder().withName("col2").withType(Types.StringType.get()).build() + }; + tableOperationDispatcher.createTable(tableIdent, columns, "comment", props); + Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), SCHEMA)); + Assertions.assertTrue(entityStore.exists(tableIdent, TABLE)); + } } diff --git a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java index 27afc50906f..ac694883d59 100644 --- a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java +++ b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java @@ -21,6 +21,7 @@ import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; +import static org.apache.gravitino.Entity.EntityType.SCHEMA; import static org.apache.gravitino.StringIdentifier.ID_KEY; import static org.apache.gravitino.TestBasePropertiesMetadata.COMMENT_KEY; import static org.mockito.ArgumentMatchers.any; @@ -33,6 +34,7 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.time.Instant; +import java.util.Collections; import java.util.Map; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.gravitino.Config; @@ -40,7 +42,9 @@ import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.TestCatalog; import org.apache.gravitino.auth.AuthConstants; +import org.apache.gravitino.connector.TestCatalogOperations; import org.apache.gravitino.exceptions.NoSuchEntityException; import org.apache.gravitino.lock.LockManager; import org.apache.gravitino.messaging.Topic; @@ -122,25 +126,23 @@ public void testCreateAndLoadTopic() throws IOException { // Case 2: Test if the topic entity is not found in the entity store reset(entityStore); entityStore.delete(topicIdent1, Entity.EntityType.TOPIC); - entityStore.delete(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA); + entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA); doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(), any()); Topic loadedTopic2 = topicOperationDispatcher.loadTopic(topicIdent1); // Succeed to import the topic entity Assertions.assertTrue(entityStore.exists(topicIdent1, Entity.EntityType.TOPIC)); - Assertions.assertTrue( - entityStore.exists(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA)); + Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), SCHEMA)); // Audit info is gotten from the catalog, not from the entity store Assertions.assertEquals("test", loadedTopic2.auditInfo().creator()); // Case 3: Test if the entity store is failed to get the topic entity reset(entityStore); entityStore.delete(topicIdent1, Entity.EntityType.TOPIC); - entityStore.delete(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA); + entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA); doThrow(new IOException()).when(entityStore).get(any(), any(), any()); Topic loadedTopic3 = topicOperationDispatcher.loadTopic(topicIdent1); // Succeed to import the topic entity - Assertions.assertTrue( - entityStore.exists(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA)); + Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), SCHEMA)); Assertions.assertTrue(entityStore.exists(topicIdent1, Entity.EntityType.TOPIC)); // Audit info is gotten from the catalog, not from the entity store Assertions.assertEquals("test", loadedTopic3.auditInfo().creator()); @@ -250,4 +252,19 @@ public void testCreateAndDropTopic() throws IOException { Assertions.assertThrows( RuntimeException.class, () -> topicOperationDispatcher.dropTopic(topicIdent)); } + + @Test + public void testCreateTopicNeedImportingSchema() throws IOException { + Namespace topicNs = Namespace.of(metalake, catalog, "schema161"); + NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topic61"); + Map props = ImmutableMap.of("k1", "v1", "k2", "v2"); + TestCatalog testCatalog = + (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, catalog)); + TestCatalogOperations testCatalogOperations = (TestCatalogOperations) testCatalog.ops(); + testCatalogOperations.createSchema( + NameIdentifier.of(topicNs.levels()), "", Collections.emptyMap()); + topicOperationDispatcher.createTopic(topicIdent, "comment", null, props); + Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), SCHEMA)); + Assertions.assertTrue(entityStore.exists(topicIdent, Entity.EntityType.TOPIC)); + } } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java index e8ad564010d..d5cf1ffc7be 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java @@ -121,19 +121,15 @@ public Response createTable( NameIdentifierUtil.ofTable(metalake, catalog, schema, request.getName()); Table table = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake, catalog, schema), - LockType.WRITE, - () -> - dispatcher.createTable( - ident, - fromDTOs(request.getColumns()), - request.getComment(), - request.getProperties(), - fromDTOs(request.getPartitioning()), - fromDTO(request.getDistribution()), - fromDTOs(request.getSortOrders()), - fromDTOs(request.getIndexes()))); + dispatcher.createTable( + ident, + fromDTOs(request.getColumns()), + request.getComment(), + request.getProperties(), + fromDTOs(request.getPartitioning()), + fromDTO(request.getDistribution()), + fromDTOs(request.getSortOrders()), + fromDTOs(request.getIndexes())); Response response = Utils.ok(new TableResponse(DTOConverters.toDTO(table))); LOG.info("Table created: {}.{}.{}.{}", metalake, catalog, schema, request.getName()); return response; diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java index 5e32ff6d2c7..4e9bcd55077 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java @@ -119,15 +119,11 @@ public Response createTopic( NameIdentifierUtil.ofTopic(metalake, catalog, schema, request.getName()); Topic topic = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofSchema(metalake, catalog, schema), - LockType.WRITE, - () -> - dispatcher.createTopic( - ident, - request.getComment(), - null /* dataLayout, always null because it's not supported yet.*/, - request.getProperties())); + dispatcher.createTopic( + ident, + request.getComment(), + null /* dataLayout, always null because it's not supported yet.*/, + request.getProperties()); Response response = Utils.ok(new TopicResponse(DTOConverters.toDTO(topic))); LOG.info("Topic created: {}.{}.{}.{}", metalake, catalog, schema, topic.name()); return response;