Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#317] feat(core, catalog): make fields in AuditInfo optional and overwriteable #330

Merged
merged 4 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
.withId(baseSchema.getId())
.withCatalogId(baseSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(baseSchema.auditInfo())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(database, builder);

// Merge audit info from Graviton store
hiveSchema.auditInfo().merge(baseSchema.auditInfo(), true /*overwrite*/);
mchades marked this conversation as resolved.
Show resolved Hide resolved

LOG.info("Loaded Hive schema (database) {} from Hive Metastore ", ident.name());

return hiveSchema;
Expand Down Expand Up @@ -331,16 +333,18 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
.withId(oldSchema.getId())
.withCatalogId(oldSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(oldSchema.auditInfo().creator())
.withCreateTime(oldSchema.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);

AuditInfo newAudit =
new AuditInfo.Builder()
.withCreator(oldSchema.auditInfo().creator())
.withCreateTime(oldSchema.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build();
hiveSchema.auditInfo().merge(newAudit, true /*overwrite*/);

// To be on the safe side, here uses delete before put (although hive schema does
// not support rename yet)
store.delete(ident, SCHEMA);
Expand Down Expand Up @@ -552,10 +556,12 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
.withId(baseTable.getId())
.withSchemaId(baseTable.getSchemaId())
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(baseTable.auditInfo());
.withNameSpace(tableIdent.namespace());
HiveTable table = HiveTable.fromInnerTable(hiveTable, builder);

// Merge the audit info from Graviton store.
table.auditInfo().merge(baseTable.auditInfo(), true /* overwrite */);

LOG.info("Loaded Hive table {} from Hive Metastore ", tableIdent.name());

return table;
Expand Down Expand Up @@ -728,15 +734,19 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
.withId(table.getId())
.withSchemaId(table.getSchemaId())
.withName(alteredHiveTable.getTableName())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(table.auditInfo().creator())
.withCreateTime(table.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build());
.withNameSpace(tableIdent.namespace());

HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder);

AuditInfo newAudit =
new AuditInfo.Builder()
.withCreator(table.auditInfo().creator())
.withCreateTime(table.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build();
alteredTable.auditInfo().merge(newAudit, true /* overwrite */);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. Does the code mean that the audit information in gravition is more recent than that in external storage, and that's what we'll use as the final result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the audit info also exists in Graviton, we will use the one from Graviton as precendence.


store.delete(tableIdent, TABLE);
store.put(alteredTable, false);
clientPool.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.graviton.catalog.hive;

import com.datastrato.graviton.meta.AuditInfo;
import com.datastrato.graviton.meta.rel.BaseSchema;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -38,10 +39,16 @@ private HiveSchema() {}
public static HiveSchema fromInnerDB(Database db, Builder builder) {
Map<String, String> properties = convertToMetadata(db);

// Get audit info from Hive's Database object. Because Hive's database doesn't store create
// time, last modifier and last modified time, we only get creator from Hive's database.
AuditInfo.Builder auditInfoBuilder = new AuditInfo.Builder();
Optional.ofNullable(db.getOwnerName()).ifPresent(auditInfoBuilder::withCreator);

return builder
.withName(db.getName())
.withComment(db.getDescription())
.withProperties(properties)
.withAuditInfo(auditInfoBuilder.build())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.catalog.hive.converter.FromHiveType;
import com.datastrato.graviton.catalog.hive.converter.ToHiveType;
import com.datastrato.graviton.meta.AuditInfo;
import com.datastrato.graviton.meta.rel.BaseTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
Expand Down Expand Up @@ -47,6 +50,13 @@ private HiveTable() {}
* @return A new HiveTable instance.
*/
public static HiveTable fromInnerTable(Table table, Builder builder) {
// Get audit info from Hive's Table object. Because Hive's table doesn't store last modifier
// and last modified time, we only get creator and create time from Hive's table.
AuditInfo.Builder auditInfoBuilder = new AuditInfo.Builder();
Optional.ofNullable(table.getOwner()).ifPresent(auditInfoBuilder::withCreator);
if (table.isSetCreateTime()) {
auditInfoBuilder.withCreateTime(Instant.ofEpochSecond(table.getCreateTime()));
}

return builder
.withComment(table.getParameters().get(HMS_TABLE_COMMENT))
Expand All @@ -62,6 +72,7 @@ public static HiveTable fromInnerTable(Table table, Builder builder) {
.build())
.toArray(HiveColumn[]::new))
.withLocation(table.getSd().getLocation())
.withAuditInfo(auditInfoBuilder.build())
.build();
}

Expand All @@ -79,6 +90,11 @@ public Table toInnerTable() {
hiveTable.setParameters(properties);
hiveTable.setPartitionKeys(Lists.newArrayList() /* TODO(Minghuang): Add partition support */);

// Set AuditInfo to Hive's Table object. Hive's Table doesn't support setting last modifier
// and last modified time, so we only set creator and create time.
hiveTable.setOwner(auditInfo.creator());
hiveTable.setCreateTime(Math.toIntExact(auditInfo.createTime().getEpochSecond()));

return hiveTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public void testCreateHiveSchema() throws IOException {
Assertions.assertTrue(Arrays.asList(idents).contains(ident));
Assertions.assertTrue(store.exists(ident, SCHEMA));

Schema loadedSchema = hiveCatalog.asSchemas().loadSchema(ident);
Assertions.assertEquals(schema.auditInfo(), loadedSchema.auditInfo());

// Test illegal identifier
NameIdentifier ident1 = NameIdentifier.of("metalake", hiveCatalog.name());
Throwable exception =
Expand Down Expand Up @@ -136,7 +139,7 @@ public void testAlterSchema() {
properties.put("key2", "val2");
String comment = "comment";

hiveCatalog.asSchemas().createSchema(ident, comment, properties);
Schema createdSchema = hiveCatalog.asSchemas().createSchema(ident, comment, properties);
Assertions.assertTrue(hiveCatalog.asSchemas().schemaExists(ident));

Map<String, String> properties1 = hiveCatalog.asSchemas().loadSchema(ident).properties();
Expand All @@ -149,19 +152,35 @@ public void testAlterSchema() {
ident,
SchemaChange.removeProperty("key1"),
SchemaChange.setProperty("key2", "val2-alter"));
Map<String, String> properties2 = hiveCatalog.asSchemas().loadSchema(ident).properties();
Schema alteredSchema = hiveCatalog.asSchemas().loadSchema(ident);
Map<String, String> properties2 = alteredSchema.properties();
Assertions.assertFalse(properties2.containsKey("key1"));
Assertions.assertEquals("val2-alter", properties2.get("key2"));

Assertions.assertEquals(
createdSchema.auditInfo().creator(), alteredSchema.auditInfo().creator());
Assertions.assertEquals(
createdSchema.auditInfo().createTime(), alteredSchema.auditInfo().createTime());
Assertions.assertNotNull(alteredSchema.auditInfo().lastModifier());
Assertions.assertNotNull(alteredSchema.auditInfo().lastModifiedTime());

hiveCatalog
.asSchemas()
.alterSchema(
ident,
SchemaChange.setProperty("key3", "val3"),
SchemaChange.setProperty("key4", "val4"));
Map<String, String> properties3 = hiveCatalog.asSchemas().loadSchema(ident).properties();
Schema alteredSchema1 = hiveCatalog.asSchemas().loadSchema(ident);
Map<String, String> properties3 = alteredSchema1.properties();
Assertions.assertEquals("val3", properties3.get("key3"));
Assertions.assertEquals("val4", properties3.get("key4"));

Assertions.assertEquals(
createdSchema.auditInfo().creator(), alteredSchema1.auditInfo().creator());
Assertions.assertEquals(
createdSchema.auditInfo().createTime(), alteredSchema1.auditInfo().createTime());
Assertions.assertNotNull(alteredSchema1.auditInfo().lastModifier());
Assertions.assertNotNull(alteredSchema1.auditInfo().lastModifiedTime());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public void testCreateHiveTable() throws IOException {
Assertions.assertEquals(HIVE_COMMENT, table.comment());
Assertions.assertArrayEquals(columns, table.columns());

Table loadedTable = hiveCatalog.asTableCatalog().loadTable(tableIdentifier);
Assertions.assertEquals(table.auditInfo(), loadedTable.auditInfo());

Assertions.assertTrue(hiveCatalog.asTableCatalog().tableExists(tableIdentifier));
NameIdentifier[] tableIdents =
hiveCatalog.asTableCatalog().listTables(tableIdentifier.namespace());
Expand Down Expand Up @@ -244,7 +247,10 @@ public void testAlterHiveTable() throws IOException {
.build();
Column[] columns = new Column[] {col1, col2};

hiveCatalog.asTableCatalog().createTable(tableIdentifier, columns, HIVE_COMMENT, properties);
Table createdTable =
hiveCatalog
.asTableCatalog()
.createTable(tableIdentifier, columns, HIVE_COMMENT, properties);
Assertions.assertTrue(hiveCatalog.asTableCatalog().tableExists(tableIdentifier));

// test alter
Expand Down Expand Up @@ -283,6 +289,12 @@ public void testAlterHiveTable() throws IOException {
Assertions.assertFalse(store.exists(tableIdentifier, TABLE));
Assertions.assertTrue(store.exists(((HiveTable) alteredTable).nameIdentifier(), TABLE));

Assertions.assertEquals(createdTable.auditInfo().creator(), alteredTable.auditInfo().creator());
Assertions.assertEquals(
createdTable.auditInfo().createTime(), alteredTable.auditInfo().createTime());
Assertions.assertNotNull(alteredTable.auditInfo().lastModifier());
Assertions.assertNotNull(alteredTable.auditInfo().lastModifiedTime());

Column[] expected =
new Column[] {
new HiveColumn.Builder()
Expand All @@ -309,12 +321,19 @@ public void testAlterHiveTable() throws IOException {
.alterTable(
NameIdentifier.of(tableIdentifier.namespace(), "test_hive_table_new"),
TableChange.deleteColumn(new String[] {"col_1"}, false));
alteredTable =
Table alteredTable1 =
hiveCatalog
.asTableCatalog()
.loadTable(NameIdentifier.of(tableIdentifier.namespace(), "test_hive_table_new"));
expected =
Arrays.stream(expected).filter(c -> !"col_1".equals(c.name())).toArray(Column[]::new);
Assertions.assertArrayEquals(expected, alteredTable.columns());
Assertions.assertArrayEquals(expected, alteredTable1.columns());

Assertions.assertEquals(
createdTable.auditInfo().creator(), alteredTable1.auditInfo().creator());
Assertions.assertEquals(
createdTable.auditInfo().createTime(), alteredTable1.auditInfo().createTime());
Assertions.assertNotNull(alteredTable1.auditInfo().lastModifier());
Assertions.assertNotNull(alteredTable1.auditInfo().lastModifiedTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,5 @@ public void validate() throws IllegalArgumentException {
StringUtils.isNotBlank(catalog.name()), "catalog 'name' must not be null and empty");
Preconditions.checkArgument(catalog.type() != null, "catalog 'type' must not be null");
Preconditions.checkArgument(catalog.auditInfo() != null, "catalog 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(catalog.auditInfo().creator()),
"catalog 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
catalog.auditInfo().createTime() != null, "catalog 'audit.createTime' must not be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ public void validate() throws IllegalArgumentException {
"metalake 'name' must not be null and empty");
Preconditions.checkArgument(
metalake.auditInfo() != null, "metalake 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(metalake.auditInfo().creator()),
"metalake 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
metalake.auditInfo().createTime() != null,
"metalake 'audit.createTime' must not be null");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,5 @@ public void validate() throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotBlank(metalake.name()), "metalake 'name' must not be null and empty");
Preconditions.checkArgument(metalake.auditInfo() != null, "metalake 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(metalake.auditInfo().creator()),
"metalake 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
metalake.auditInfo().createTime() != null, "metalake 'audit.createTime' must not be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,5 @@ public void validate() throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotBlank(schema.name()), "schema 'name' must not be null and empty");
Preconditions.checkArgument(schema.auditInfo() != null, "schema 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(schema.auditInfo().creator()),
"schema 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
schema.auditInfo().createTime() != null, "schema 'audit.createTime' must not be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,5 @@ public void validate() throws IllegalArgumentException {
table.columns() != null && table.columns().length > 0,
"table 'columns' must not be null and empty");
Preconditions.checkArgument(table.auditInfo() != null, "table 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(table.auditInfo().creator()),
"table 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
table.auditInfo().createTime() != null, "table 'audit.createTime' must not be null");
}
}
Loading
Loading