Skip to content

Commit

Permalink
[#317] feat(core, catalog): make fields in AuditInfo optional and ove…
Browse files Browse the repository at this point in the history
…rwriteable (#330)

### What changes were proposed in this pull request?

This PR proposes to change the requirements of the `AuditInfo` fields to
make them optional and overwriteable.

### Why are the changes needed?

This is the first change of #250, the change is going to address two
problems:

1. If the `AuditInfo` is not existed in both Graviton store and
underlying source, we should support the empty `AuditInfo`, or only
several fields are set in `AuditInfo`.
2. If the `AuditInfo` are both set in the Graviton store and underlying
source, we should support `AuditInfo` mergeable.

Fix: #317

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Modify and add the UTs to test.
  • Loading branch information
jerryshao authored Sep 6, 2023
1 parent f95d6ae commit d9dfd48
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ public HiveSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
.withId(baseSchema.getId())
.withCatalogId(baseSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(baseSchema.auditInfo())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(database, builder);

// Merge audit info from Graviton store
hiveSchema.auditInfo().merge(baseSchema.auditInfo(), true /*overwrite*/);

LOG.info("Loaded Hive schema (database) {} from Hive Metastore ", ident.name());

return hiveSchema;
Expand Down Expand Up @@ -331,16 +333,18 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
.withId(oldSchema.getId())
.withCatalogId(oldSchema.getCatalogId())
.withNamespace(ident.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(oldSchema.auditInfo().creator())
.withCreateTime(oldSchema.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build())
.withConf(hiveConf);
HiveSchema hiveSchema = HiveSchema.fromInnerDB(alteredDatabase, builder);

AuditInfo newAudit =
new AuditInfo.Builder()
.withCreator(oldSchema.auditInfo().creator())
.withCreateTime(oldSchema.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build();
hiveSchema.auditInfo().merge(newAudit, true /*overwrite*/);

// To be on the safe side, here uses delete before put (although hive schema does
// not support rename yet)
store.delete(ident, SCHEMA);
Expand Down Expand Up @@ -552,10 +556,12 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
.withId(baseTable.getId())
.withSchemaId(baseTable.getSchemaId())
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(baseTable.auditInfo());
.withNameSpace(tableIdent.namespace());
HiveTable table = HiveTable.fromInnerTable(hiveTable, builder);

// Merge the audit info from Graviton store.
table.auditInfo().merge(baseTable.auditInfo(), true /* overwrite */);

LOG.info("Loaded Hive table {} from Hive Metastore ", tableIdent.name());

return table;
Expand Down Expand Up @@ -728,15 +734,19 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
.withId(table.getId())
.withSchemaId(table.getSchemaId())
.withName(alteredHiveTable.getTableName())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(table.auditInfo().creator())
.withCreateTime(table.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build());
.withNameSpace(tableIdent.namespace());

HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder);

AuditInfo newAudit =
new AuditInfo.Builder()
.withCreator(table.auditInfo().creator())
.withCreateTime(table.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build();
alteredTable.auditInfo().merge(newAudit, true /* overwrite */);

store.delete(tableIdent, TABLE);
store.put(alteredTable, false);
clientPool.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.graviton.catalog.hive;

import com.datastrato.graviton.meta.AuditInfo;
import com.datastrato.graviton.meta.rel.BaseSchema;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -38,10 +39,16 @@ private HiveSchema() {}
public static HiveSchema fromInnerDB(Database db, Builder builder) {
Map<String, String> properties = convertToMetadata(db);

// Get audit info from Hive's Database object. Because Hive's database doesn't store create
// time, last modifier and last modified time, we only get creator from Hive's database.
AuditInfo.Builder auditInfoBuilder = new AuditInfo.Builder();
Optional.ofNullable(db.getOwnerName()).ifPresent(auditInfoBuilder::withCreator);

return builder
.withName(db.getName())
.withComment(db.getDescription())
.withProperties(properties)
.withAuditInfo(auditInfoBuilder.build())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.catalog.hive.converter.FromHiveType;
import com.datastrato.graviton.catalog.hive.converter.ToHiveType;
import com.datastrato.graviton.meta.AuditInfo;
import com.datastrato.graviton.meta.rel.BaseTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
Expand Down Expand Up @@ -47,6 +50,13 @@ private HiveTable() {}
* @return A new HiveTable instance.
*/
public static HiveTable fromInnerTable(Table table, Builder builder) {
// Get audit info from Hive's Table object. Because Hive's table doesn't store last modifier
// and last modified time, we only get creator and create time from Hive's table.
AuditInfo.Builder auditInfoBuilder = new AuditInfo.Builder();
Optional.ofNullable(table.getOwner()).ifPresent(auditInfoBuilder::withCreator);
if (table.isSetCreateTime()) {
auditInfoBuilder.withCreateTime(Instant.ofEpochSecond(table.getCreateTime()));
}

return builder
.withComment(table.getParameters().get(HMS_TABLE_COMMENT))
Expand All @@ -62,6 +72,7 @@ public static HiveTable fromInnerTable(Table table, Builder builder) {
.build())
.toArray(HiveColumn[]::new))
.withLocation(table.getSd().getLocation())
.withAuditInfo(auditInfoBuilder.build())
.build();
}

Expand All @@ -79,6 +90,11 @@ public Table toInnerTable() {
hiveTable.setParameters(properties);
hiveTable.setPartitionKeys(Lists.newArrayList() /* TODO(Minghuang): Add partition support */);

// Set AuditInfo to Hive's Table object. Hive's Table doesn't support setting last modifier
// and last modified time, so we only set creator and create time.
hiveTable.setOwner(auditInfo.creator());
hiveTable.setCreateTime(Math.toIntExact(auditInfo.createTime().getEpochSecond()));

return hiveTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public void testCreateHiveSchema() throws IOException {
Assertions.assertTrue(Arrays.asList(idents).contains(ident));
Assertions.assertTrue(store.exists(ident, SCHEMA));

Schema loadedSchema = hiveCatalog.asSchemas().loadSchema(ident);
Assertions.assertEquals(schema.auditInfo(), loadedSchema.auditInfo());

// Test illegal identifier
NameIdentifier ident1 = NameIdentifier.of("metalake", hiveCatalog.name());
Throwable exception =
Expand Down Expand Up @@ -136,7 +139,7 @@ public void testAlterSchema() {
properties.put("key2", "val2");
String comment = "comment";

hiveCatalog.asSchemas().createSchema(ident, comment, properties);
Schema createdSchema = hiveCatalog.asSchemas().createSchema(ident, comment, properties);
Assertions.assertTrue(hiveCatalog.asSchemas().schemaExists(ident));

Map<String, String> properties1 = hiveCatalog.asSchemas().loadSchema(ident).properties();
Expand All @@ -149,19 +152,35 @@ public void testAlterSchema() {
ident,
SchemaChange.removeProperty("key1"),
SchemaChange.setProperty("key2", "val2-alter"));
Map<String, String> properties2 = hiveCatalog.asSchemas().loadSchema(ident).properties();
Schema alteredSchema = hiveCatalog.asSchemas().loadSchema(ident);
Map<String, String> properties2 = alteredSchema.properties();
Assertions.assertFalse(properties2.containsKey("key1"));
Assertions.assertEquals("val2-alter", properties2.get("key2"));

Assertions.assertEquals(
createdSchema.auditInfo().creator(), alteredSchema.auditInfo().creator());
Assertions.assertEquals(
createdSchema.auditInfo().createTime(), alteredSchema.auditInfo().createTime());
Assertions.assertNotNull(alteredSchema.auditInfo().lastModifier());
Assertions.assertNotNull(alteredSchema.auditInfo().lastModifiedTime());

hiveCatalog
.asSchemas()
.alterSchema(
ident,
SchemaChange.setProperty("key3", "val3"),
SchemaChange.setProperty("key4", "val4"));
Map<String, String> properties3 = hiveCatalog.asSchemas().loadSchema(ident).properties();
Schema alteredSchema1 = hiveCatalog.asSchemas().loadSchema(ident);
Map<String, String> properties3 = alteredSchema1.properties();
Assertions.assertEquals("val3", properties3.get("key3"));
Assertions.assertEquals("val4", properties3.get("key4"));

Assertions.assertEquals(
createdSchema.auditInfo().creator(), alteredSchema1.auditInfo().creator());
Assertions.assertEquals(
createdSchema.auditInfo().createTime(), alteredSchema1.auditInfo().createTime());
Assertions.assertNotNull(alteredSchema1.auditInfo().lastModifier());
Assertions.assertNotNull(alteredSchema1.auditInfo().lastModifiedTime());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public void testCreateHiveTable() throws IOException {
Assertions.assertEquals(HIVE_COMMENT, table.comment());
Assertions.assertArrayEquals(columns, table.columns());

Table loadedTable = hiveCatalog.asTableCatalog().loadTable(tableIdentifier);
Assertions.assertEquals(table.auditInfo(), loadedTable.auditInfo());

Assertions.assertTrue(hiveCatalog.asTableCatalog().tableExists(tableIdentifier));
NameIdentifier[] tableIdents =
hiveCatalog.asTableCatalog().listTables(tableIdentifier.namespace());
Expand Down Expand Up @@ -244,7 +247,10 @@ public void testAlterHiveTable() throws IOException {
.build();
Column[] columns = new Column[] {col1, col2};

hiveCatalog.asTableCatalog().createTable(tableIdentifier, columns, HIVE_COMMENT, properties);
Table createdTable =
hiveCatalog
.asTableCatalog()
.createTable(tableIdentifier, columns, HIVE_COMMENT, properties);
Assertions.assertTrue(hiveCatalog.asTableCatalog().tableExists(tableIdentifier));

// test alter
Expand Down Expand Up @@ -283,6 +289,12 @@ public void testAlterHiveTable() throws IOException {
Assertions.assertFalse(store.exists(tableIdentifier, TABLE));
Assertions.assertTrue(store.exists(((HiveTable) alteredTable).nameIdentifier(), TABLE));

Assertions.assertEquals(createdTable.auditInfo().creator(), alteredTable.auditInfo().creator());
Assertions.assertEquals(
createdTable.auditInfo().createTime(), alteredTable.auditInfo().createTime());
Assertions.assertNotNull(alteredTable.auditInfo().lastModifier());
Assertions.assertNotNull(alteredTable.auditInfo().lastModifiedTime());

Column[] expected =
new Column[] {
new HiveColumn.Builder()
Expand All @@ -309,12 +321,19 @@ public void testAlterHiveTable() throws IOException {
.alterTable(
NameIdentifier.of(tableIdentifier.namespace(), "test_hive_table_new"),
TableChange.deleteColumn(new String[] {"col_1"}, false));
alteredTable =
Table alteredTable1 =
hiveCatalog
.asTableCatalog()
.loadTable(NameIdentifier.of(tableIdentifier.namespace(), "test_hive_table_new"));
expected =
Arrays.stream(expected).filter(c -> !"col_1".equals(c.name())).toArray(Column[]::new);
Assertions.assertArrayEquals(expected, alteredTable.columns());
Assertions.assertArrayEquals(expected, alteredTable1.columns());

Assertions.assertEquals(
createdTable.auditInfo().creator(), alteredTable1.auditInfo().creator());
Assertions.assertEquals(
createdTable.auditInfo().createTime(), alteredTable1.auditInfo().createTime());
Assertions.assertNotNull(alteredTable1.auditInfo().lastModifier());
Assertions.assertNotNull(alteredTable1.auditInfo().lastModifiedTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,5 @@ public void validate() throws IllegalArgumentException {
StringUtils.isNotBlank(catalog.name()), "catalog 'name' must not be null and empty");
Preconditions.checkArgument(catalog.type() != null, "catalog 'type' must not be null");
Preconditions.checkArgument(catalog.auditInfo() != null, "catalog 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(catalog.auditInfo().creator()),
"catalog 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
catalog.auditInfo().createTime() != null, "catalog 'audit.createTime' must not be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ public void validate() throws IllegalArgumentException {
"metalake 'name' must not be null and empty");
Preconditions.checkArgument(
metalake.auditInfo() != null, "metalake 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(metalake.auditInfo().creator()),
"metalake 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
metalake.auditInfo().createTime() != null,
"metalake 'audit.createTime' must not be null");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,5 @@ public void validate() throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotBlank(metalake.name()), "metalake 'name' must not be null and empty");
Preconditions.checkArgument(metalake.auditInfo() != null, "metalake 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(metalake.auditInfo().creator()),
"metalake 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
metalake.auditInfo().createTime() != null, "metalake 'audit.createTime' must not be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,5 @@ public void validate() throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotBlank(schema.name()), "schema 'name' must not be null and empty");
Preconditions.checkArgument(schema.auditInfo() != null, "schema 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(schema.auditInfo().creator()),
"schema 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
schema.auditInfo().createTime() != null, "schema 'audit.createTime' must not be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,5 @@ public void validate() throws IllegalArgumentException {
table.columns() != null && table.columns().length > 0,
"table 'columns' must not be null and empty");
Preconditions.checkArgument(table.auditInfo() != null, "table 'audit' must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(table.auditInfo().creator()),
"table 'audit.creator' must not be null and empty");
Preconditions.checkArgument(
table.auditInfo().createTime() != null, "table 'audit.createTime' must not be null");
}
}
Loading

0 comments on commit d9dfd48

Please sign in to comment.