Skip to content

Commit

Permalink
[Enhancement](multi-catalog) Merge hms events every round to speed up…
Browse files Browse the repository at this point in the history
… events processing. (#21589)

Currently we find that MetastoreEventsProcessor can not catch up the event producing rate in our cluster, so we need to merge some hms events every round.
  • Loading branch information
dutyu authored Jul 12, 2023
1 parent 2e3d15b commit 105a162
Show file tree
Hide file tree
Showing 14 changed files with 574 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,18 @@
/**
* MetastoreEvent for ADD_PARTITION event type
*/
public class AddPartitionEvent extends MetastoreTableEvent {
public class AddPartitionEvent extends MetastorePartitionEvent {
private final Table hmsTbl;
private final List<String> partitionNames;

// for test
public AddPartitionEvent(long eventId, String catalogName, String dbName,
String tblName, List<String> partitionNames) {
super(eventId, catalogName, dbName, tblName);
this.partitionNames = partitionNames;
this.hmsTbl = null;
}

private AddPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,97 @@

package org.apache.doris.datasource.hive.event;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;

import java.util.List;

/**
* MetastoreEvent for Alter_DATABASE event type
* MetastoreEvent for ALTER_DATABASE event type
*/
public class AlterDatabaseEvent extends MetastoreEvent {

private final Database dbBefore;
private final Database dbAfter;

// true if this alter event was due to a rename operation
private final boolean isRename;

// for test
public AlterDatabaseEvent(long eventId, String catalogName, String dbName, boolean isRename) {
super(eventId, catalogName, dbName, null);
this.isRename = isRename;
this.dbBefore = null;
this.dbAfter = null;
}

private AlterDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));

try {
JSONAlterDatabaseMessage alterDatabaseMessage =
(JSONAlterDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
.getAlterDatabaseMessage(event.getMessage());
dbBefore = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore());
dbAfter = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter database message"), e);
}
// this is a rename event if either dbName of before and after object changed
isRename = !dbBefore.getName().equalsIgnoreCase(dbAfter.getName());
}

private void processRename() throws DdlException {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (catalog == null) {
throw new DdlException("No catalog found with name: " + catalogName);
}
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support ExternalCatalog Databases");
}
if (catalog.getDbNullable(dbAfter.getName()) != null) {
infoLog("AlterExternalDatabase canceled, because dbAfter has exist, "
+ "catalogName:[{}],dbName:[{}]",
catalogName, dbAfter.getName());
return;
}
Env.getCurrentEnv().getCatalogMgr().dropExternalDatabase(dbBefore.getName(), catalogName, true);
Env.getCurrentEnv().getCatalogMgr().createExternalDatabase(dbAfter.getName(), catalogName, true);

}

protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
}

public boolean isRename() {
return isRename;
}

@Override
protected void process() throws MetastoreNotificationException {
// only can change properties,we do nothing
infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
try {
if (isRename) {
processRename();
return;
}
// only can change properties,we do nothing
infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Failed to process event"), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* MetastoreEvent for ALTER_PARTITION event type
*/
public class AlterPartitionEvent extends MetastoreTableEvent {
public class AlterPartitionEvent extends MetastorePartitionEvent {
private final Table hmsTbl;
private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter;
private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore;
Expand All @@ -44,6 +45,18 @@ public class AlterPartitionEvent extends MetastoreTableEvent {
// true if this alter event was due to a rename operation
private final boolean isRename;

// for test
public AlterPartitionEvent(long eventId, String catalogName, String dbName, String tblName,
String partitionNameBefore, String partitionNameAfter) {
super(eventId, catalogName, dbName, tblName);
this.partitionNameBefore = partitionNameBefore;
this.partitionNameAfter = partitionNameAfter;
this.hmsTbl = null;
this.partitionAfter = null;
this.partitionBefore = null;
isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
}

private AlterPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Expand Down Expand Up @@ -94,4 +107,12 @@ protected void process() throws MetastoreNotificationException {
debugString("Failed to process event"), e);
}
}

@Override
protected boolean canBeBatched(MetastoreEvent event) {
return isSameTable(event)
&& event instanceof AlterPartitionEvent
&& Objects.equals(partitionBefore, ((AlterPartitionEvent) event).partitionBefore)
&& Objects.equals(partitionAfter, ((AlterPartitionEvent) event).partitionAfter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ public class AlterTableEvent extends MetastoreTableEvent {
// true if this alter event was due to a rename operation
private final boolean isRename;
private final boolean isView;
private final boolean willCreateOrDropTable;

// for test
public AlterTableEvent(long eventId, String catalogName, String dbName,
String tblName, boolean isRename, boolean isView) {
super(eventId, catalogName, dbName, tblName);
this.isRename = isRename;
this.isView = isView;
this.tableBefore = null;
this.tableAfter = null;
this.willCreateOrDropTable = isRename || isView;
}

private AlterTableEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
Expand All @@ -61,13 +73,19 @@ private AlterTableEvent(NotificationEvent event, String catalogName) {
isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
|| !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
isView = tableBefore.isSetViewExpandedText() || tableBefore.isSetViewOriginalText();
this.willCreateOrDropTable = isRename || isView;
}

public static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
String catalogName) {
return Lists.newArrayList(new AlterTableEvent(event, catalogName));
}

@Override
protected boolean willCreateOrDropTable() {
return willCreateOrDropTable;
}

private void processRecreateTable() throws DdlException {
if (!isView) {
return;
Expand Down Expand Up @@ -97,6 +115,14 @@ private void processRename() throws DdlException {

}

public boolean isRename() {
return isRename;
}

public boolean isView() {
return isView;
}

/**
* If the ALTER_TABLE event is due a table rename, this method removes the old table
* and creates a new table with the new name. Else, we just refresh table
Expand Down Expand Up @@ -124,4 +150,22 @@ protected void process() throws MetastoreNotificationException {
debugString("Failed to process event"), e);
}
}

@Override
protected boolean canBeBatched(MetastoreEvent that) {
if (!isSameTable(that)) {
return false;
}

// `that` event must not be a rename table event
// so if the process of this event will drop this table,
// it can merge all the table's events before
if (willCreateOrDropTable) {
return true;
}

// that event must be a MetastoreTableEvent event
// otherwise `isSameTable` will return false
return !((MetastoreTableEvent) that).willCreateOrDropTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
*/
public class CreateDatabaseEvent extends MetastoreEvent {

// for test
public CreateDatabaseEvent(long eventId, String catalogName, String dbName) {
super(eventId, catalogName, dbName, null);
}

private CreateDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
public class CreateTableEvent extends MetastoreTableEvent {
private final Table hmsTbl;

// for test
public CreateTableEvent(long eventId, String catalogName, String dbName, String tblName) {
super(eventId, catalogName, dbName, tblName);
this.hmsTbl = null;
}

private CreateTableEvent(NotificationEvent event, String catalogName) throws MetastoreNotificationException {
super(event, catalogName);
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
Expand All @@ -55,6 +61,11 @@ public static List<MetastoreEvent> getEvents(NotificationEvent event, String cat
return Lists.newArrayList(new CreateTableEvent(event, catalogName));
}

@Override
protected boolean willCreateOrDropTable() {
return true;
}

@Override
protected void process() throws MetastoreNotificationException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* MetastoreEvent for ADD_PARTITION event type
*/
public class DropPartitionEvent extends MetastoreTableEvent {
public class DropPartitionEvent extends MetastorePartitionEvent {
private final Table hmsTbl;
private final List<String> partitionNames;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
public class DropTableEvent extends MetastoreTableEvent {
private final String tableName;

// for test
public DropTableEvent(long eventId, String catalogName, String dbName,
String tblName) {
super(eventId, catalogName, dbName, tblName);
this.tableName = tblName;
}

private DropTableEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Expand All @@ -55,6 +62,11 @@ public static List<MetastoreEvent> getEvents(NotificationEvent event,
return Lists.newArrayList(new DropTableEvent(event, catalogName));
}

@Override
protected boolean willCreateOrDropTable() {
return true;
}

@Override
protected void process() throws MetastoreNotificationException {
try {
Expand All @@ -65,4 +77,11 @@ protected void process() throws MetastoreNotificationException {
debugString("Failed to process event"), e);
}
}

@Override
protected boolean canBeBatched(MetastoreEvent that) {
// `that` event must not be a rename table event
// so merge all events which belong to this table before is ok
return isSameTable(that);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
public class InsertEvent extends MetastoreTableEvent {
private final Table hmsTbl;

// for test
public InsertEvent(long eventId, String catalogName, String dbName,
String tblName) {
super(eventId, catalogName, dbName, tblName);
this.hmsTbl = null;
}

private InsertEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT));
Expand All @@ -54,6 +61,11 @@ protected static List<MetastoreEvent> getEvents(NotificationEvent event, String
return Lists.newArrayList(new InsertEvent(event, catalogName));
}

@Override
protected boolean willCreateOrDropTable() {
return false;
}

@Override
protected void process() throws MetastoreNotificationException {
try {
Expand All @@ -72,4 +84,15 @@ protected void process() throws MetastoreNotificationException {
debugString("Failed to process event"), e);
}
}

@Override
protected boolean canBeBatched(MetastoreEvent that) {
if (!isSameTable(that)) {
return false;
}

// that event must be a MetastoreTableEvent event
// otherwise `isSameTable` will return false
return !((MetastoreTableEvent) that).willCreateOrDropTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ public abstract class MetastoreEvent {

protected final String catalogName;

// for test
protected MetastoreEvent(long eventId, String catalogName, String dbName, String tblName) {
this.eventId = eventId;
this.catalogName = catalogName;
this.dbName = dbName;
this.tblName = tblName;
this.eventType = null;
this.metastoreNotificationEvent = null;
this.event = null;
}

protected MetastoreEvent(NotificationEvent event, String catalogName) {
this.event = event;
this.dbName = event.getDbName();
Expand Down
Loading

0 comments on commit 105a162

Please sign in to comment.