Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4812] fix(core): Make ensure the schema exists before creating tables and topics #4823

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,74 +143,25 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

// Retrieve the Table again to obtain some values generated by underlying catalog
Table table =
doWithCatalog(
catalogIdent,
c -> c.doWithTableOps(t -> t.loadTable(ident)),
NoSuchTableException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() ->
internalCreateTable(
ident,
columns,
comment,
properties,
partitions,
distribution,
sortOrders,
indexes));
}

/**
Expand Down Expand Up @@ -476,4 +427,79 @@ private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
table.properties()))
.withImported(tableEntity != null);
}

private Table internalCreateTable(
NameIdentifier ident,
Column[] columns,
String comment,
Map<String, String> properties,
Transform[] partitions,
Distribution distribution,
SortOrder[] sortOrders,
Index[] indexes) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
long uid = idGenerator.nextId();
// Add StringIdentifier to the properties, the specific catalog will handle this
// StringIdentifier to make sure only when the operation is successful, the related
// TableEntity will be visible.
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the table again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Table table =
doWithCatalog(
catalogIdent,
c ->
c.doWithTableOps(
t ->
t.createTable(
ident,
columns,
comment,
updatedProperties,
partitions == null ? EMPTY_TRANSFORM : partitions,
distribution == null ? Distributions.NONE : distribution,
sortOrders == null ? new SortOrder[0] : sortOrders,
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
NoSuchSchemaException.class,
TableAlreadyExistsException.class);

TableEntity tableEntity =
TableEntity.builder()
.withId(uid)
.withName(ident.name())
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}

return EntityCombinedTable.of(table, tableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,61 +128,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

doWithCatalog(
catalogIdent,
c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

// Retrieve the Topic again to obtain some values generated by underlying catalog
Topic topic =
doWithCatalog(
catalogIdent,
c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
NoSuchTopicException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
// Load the schema to make sure the schema exists.
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
schemaDispatcher.loadSchema(schemaIdent);

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
return TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()),
LockType.WRITE,
() -> internalCreateTopic(ident, comment, dataLayout, properties));
}

/**
Expand Down Expand Up @@ -374,4 +329,60 @@ private EntityCombinedTopic internalLoadTopic(NameIdentifier ident) {
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()))
.withImported(topicEntity != null);
}

private Topic internalCreateTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties) {
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
doWithCatalog(
catalogIdent,
c ->
c.doWithPropertiesMeta(
p -> {
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
return null;
}),
IllegalArgumentException.class);
Long uid = idGenerator.nextId();
StringIdentifier stringId = StringIdentifier.fromId(uid);
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);

// we do not retrieve the topic again (to obtain some values generated by underlying catalog)
// since some catalogs' API is async and the table may not be created immediately
Topic topic =
doWithCatalog(
catalogIdent,
c ->
c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
NoSuchSchemaException.class,
TopicAlreadyExistsException.class);

TopicEntity topicEntity =
TopicEntity.builder()
.withId(fromProperties(topic.properties()).id())
.withName(ident.name())
.withComment(comment)
.withNamespace(ident.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
.withCreateTime(Instant.now())
.build())
.build();

try {
store.put(topicEntity, true /* overwrite */);
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTopic.of(topic)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}

return EntityCombinedTopic.of(topic, topicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends TestOperationDispatcher {
NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA, "TEST_PARTITION_NORMALIZE_TABLE");

@BeforeAll
public static void initialize() {
public static void initialize() throws IllegalAccessException {
TestPartitionOperationDispatcher.prepareTable();
partitionNormalizeDispatcher =
new PartitionNormalizeDispatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@
*/
package org.apache.gravitino.catalog;

import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import com.google.common.collect.Maps;
import java.util.Arrays;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
Expand Down Expand Up @@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends TestOperationDispatcher {
Maps.newHashMap());

@BeforeAll
public static void initialize() {
public static void initialize() throws IllegalAccessException {
prepareTable();
partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);

Expand All @@ -66,14 +76,22 @@ public static void initialize() {
"Custom class loader is not used");
}

protected static void prepareTable() {
protected static void prepareTable() throws IllegalAccessException {
schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator);

Config config = mock(Config.class);
doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true);
FieldUtils.writeField(
GravitinoEnv.getInstance(), "schemaDispatcher", schemaOperationDispatcher, true);

NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, catalog, SCHEMA);
schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
Column[] columns =
Expand Down
Loading
Loading