Skip to content

Commit

Permalink
[apache#4812] fix(core): Make ensure the schema exists before creatin…
Browse files Browse the repository at this point in the history
…g tables and topics
  • Loading branch information
jerqi committed Aug 30, 2024
1 parent 972edc1 commit 1c36eef
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,70 +143,25 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the table again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Table table =
doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() ->
internalCreateTable(
ident,
columns,
comment,
properties,
partitions,
distribution,
sortOrders,
indexes));
}

/**
Expand Down Expand Up @@ -462,4 +417,79 @@ private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
table.properties()))
.withImported(tableEntity != null);
}

private Table internalCreateTable(
NameIdentifier ident,
Column[] columns,
String comment,
Map<String, String> properties,
Transform[] partitions,
Distribution distribution,
SortOrder[] sortOrders,
Index[] indexes) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the table again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Table table =
doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,58 +128,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the topic again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Topic topic =
doWithCatalog(
catalogIdent,
c ->
c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() -> internalCreateTopic(ident, comment, dataLayout, properties));
}

/**
Expand Down Expand Up @@ -364,4 +322,60 @@ private EntityCombinedTopic internalLoadTopic(NameIdentifier ident) {
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()))
.withImported(topicEntity != null);
}

private Topic internalCreateTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the topic again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Topic topic =
doWithCatalog(
catalogIdent,
c ->
c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -44,8 +45,10 @@
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.TestCatalog;
import org.apache.gravitino.TestColumn;
import org.apache.gravitino.auth.AuthConstants;
import org.apache.gravitino.connector.TestCatalogOperations;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
Expand Down Expand Up @@ -306,6 +309,9 @@ public void testCreateAndDropTable() throws IOException {
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
};

schemaOperationDispatcher.createSchema(
NameIdentifier.of(tableIdent.namespace().levels()), "comment", props);

tableOperationDispatcher.createTable(tableIdent, columns, "comment", props, new Transform[0]);

boolean dropped = tableOperationDispatcher.dropTable(tableIdent);
Expand All @@ -319,4 +325,24 @@ public void testCreateAndDropTable() throws IOException {
Assertions.assertThrows(
RuntimeException.class, () -> tableOperationDispatcher.dropTable(tableIdent));
}

@Test
public void testCreateTableNeedImportingSchema() throws IOException {
Namespace tableNs = Namespace.of(metalake, catalog, "schema181");
NameIdentifier tableIdent = NameIdentifier.of(tableNs, "topic81");
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
TestCatalog testCatalog =
(TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, catalog));
TestCatalogOperations testCatalogOperations = (TestCatalogOperations) testCatalog.ops();
testCatalogOperations.createSchema(
NameIdentifier.of(tableNs.levels()), "", Collections.emptyMap());
Column[] columns =
new Column[] {
TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(),
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
};
tableOperationDispatcher.createTable(tableIdent, columns, "comment", props);
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), SCHEMA));
Assertions.assertTrue(entityStore.exists(tableIdent, TABLE));
}
}
Loading

0 comments on commit 1c36eef

Please sign in to comment.