From 105a162f94824c28e770a1013c259412b2dec25b Mon Sep 17 00:00:00 2001 From: Xiangyu Wang Date: Wed, 12 Jul 2023 23:41:07 +0800 Subject: [PATCH] [Enhancement](multi-catalog) Merge hms events every round to speed up events processing. (#21589) Currently we find that MetastoreEventsProcessor can not catch up the event producing rate in our cluster, so we need to merge some hms events every round. --- .../hive/event/AddPartitionEvent.java | 10 +- .../hive/event/AlterDatabaseEvent.java | 72 ++++++- .../hive/event/AlterPartitionEvent.java | 23 ++- .../hive/event/AlterTableEvent.java | 46 ++++- .../hive/event/CreateDatabaseEvent.java | 5 + .../hive/event/CreateTableEvent.java | 11 + .../hive/event/DropPartitionEvent.java | 2 +- .../datasource/hive/event/DropTableEvent.java | 19 ++ .../datasource/hive/event/InsertEvent.java | 23 +++ .../datasource/hive/event/MetastoreEvent.java | 11 + .../hive/event/MetastoreEventFactory.java | 86 +++++++- .../hive/event/MetastorePartitionEvent.java | 40 ++++ .../hive/event/MetastoreTableEvent.java | 50 +++++ .../hms/MetastoreEventFactoryTest.java | 193 ++++++++++++++++++ 14 files changed, 574 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index 11d74ed9c2d479..b94333e41b985b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -37,10 +37,18 @@ /** * MetastoreEvent for ADD_PARTITION event type */ -public class AddPartitionEvent extends MetastoreTableEvent { +public class AddPartitionEvent extends MetastorePartitionEvent { private final Table hmsTbl; private final List partitionNames; + // for test + public AddPartitionEvent(long eventId, String catalogName, String dbName, + String tblName, List partitionNames) { + super(eventId, catalogName, dbName, tblName); + this.partitionNames = partitionNames; + this.hmsTbl = null; + } + private AddPartitionEvent(NotificationEvent event, String catalogName) { super(event, catalogName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java index 59445a5dc4608d..d56eb52fad56e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java @@ -18,21 +18,74 @@ package org.apache.doris.datasource.hive.event; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalCatalog; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage; import java.util.List; /** - * MetastoreEvent for Alter_DATABASE event type + * MetastoreEvent for ALTER_DATABASE event type */ public class AlterDatabaseEvent extends MetastoreEvent { + private final Database dbBefore; + private final Database dbAfter; + + // true if this alter event was due to a rename operation + private final boolean isRename; + + // for test + public AlterDatabaseEvent(long eventId, String catalogName, String dbName, boolean isRename) { + super(eventId, catalogName, dbName, null); + this.isRename = isRename; + this.dbBefore = null; + this.dbAfter = null; + } + private AlterDatabaseEvent(NotificationEvent event, String catalogName) { super(event, catalogName); Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE)); + + try { + JSONAlterDatabaseMessage alterDatabaseMessage = + (JSONAlterDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) + .getAlterDatabaseMessage(event.getMessage()); + dbBefore = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore()); + dbAfter = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter()); + } catch (Exception e) { + throw new MetastoreNotificationException( + debugString("Unable to parse the alter database message"), e); + } + // this is a rename event if either dbName of before and after object changed + isRename = !dbBefore.getName().equalsIgnoreCase(dbAfter.getName()); + } + + private void processRename() throws DdlException { + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + catalogName); + } + if (!(catalog instanceof ExternalCatalog)) { + throw new DdlException("Only support ExternalCatalog Databases"); + } + if (catalog.getDbNullable(dbAfter.getName()) != null) { + infoLog("AlterExternalDatabase canceled, because dbAfter has exist, " + + "catalogName:[{}],dbName:[{}]", + catalogName, dbAfter.getName()); + return; + } + Env.getCurrentEnv().getCatalogMgr().dropExternalDatabase(dbBefore.getName(), catalogName, true); + Env.getCurrentEnv().getCatalogMgr().createExternalDatabase(dbAfter.getName(), catalogName, true); + } protected static List getEvents(NotificationEvent event, @@ -40,9 +93,22 @@ protected static List getEvents(NotificationEvent event, return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName)); } + public boolean isRename() { + return isRename; + } + @Override protected void process() throws MetastoreNotificationException { - // only can change properties,we do nothing - infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName); + try { + if (isRename) { + processRename(); + return; + } + // only can change properties,we do nothing + infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName); + } catch (Exception e) { + throw new MetastoreNotificationException( + debugString("Failed to process event"), e); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index 788b79f885abd2..0fc6be375d80ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -30,12 +30,13 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** * MetastoreEvent for ALTER_PARTITION event type */ -public class AlterPartitionEvent extends MetastoreTableEvent { +public class AlterPartitionEvent extends MetastorePartitionEvent { private final Table hmsTbl; private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter; private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore; @@ -44,6 +45,18 @@ public class AlterPartitionEvent extends MetastoreTableEvent { // true if this alter event was due to a rename operation private final boolean isRename; + // for test + public AlterPartitionEvent(long eventId, String catalogName, String dbName, String tblName, + String partitionNameBefore, String partitionNameAfter) { + super(eventId, catalogName, dbName, tblName); + this.partitionNameBefore = partitionNameBefore; + this.partitionNameAfter = partitionNameAfter; + this.hmsTbl = null; + this.partitionAfter = null; + this.partitionBefore = null; + isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter); + } + private AlterPartitionEvent(NotificationEvent event, String catalogName) { super(event, catalogName); @@ -94,4 +107,12 @@ protected void process() throws MetastoreNotificationException { debugString("Failed to process event"), e); } } + + @Override + protected boolean canBeBatched(MetastoreEvent event) { + return isSameTable(event) + && event instanceof AlterPartitionEvent + && Objects.equals(partitionBefore, ((AlterPartitionEvent) event).partitionBefore) + && Objects.equals(partitionAfter, ((AlterPartitionEvent) event).partitionAfter); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index cbb1ee84781f66..bc09d6ef2c38ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -41,6 +41,18 @@ public class AlterTableEvent extends MetastoreTableEvent { // true if this alter event was due to a rename operation private final boolean isRename; private final boolean isView; + private final boolean willCreateOrDropTable; + + // for test + public AlterTableEvent(long eventId, String catalogName, String dbName, + String tblName, boolean isRename, boolean isView) { + super(eventId, catalogName, dbName, tblName); + this.isRename = isRename; + this.isView = isView; + this.tableBefore = null; + this.tableAfter = null; + this.willCreateOrDropTable = isRename || isView; + } private AlterTableEvent(NotificationEvent event, String catalogName) { super(event, catalogName); @@ -61,13 +73,19 @@ private AlterTableEvent(NotificationEvent event, String catalogName) { isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName()) || !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName()); isView = tableBefore.isSetViewExpandedText() || tableBefore.isSetViewOriginalText(); + this.willCreateOrDropTable = isRename || isView; } public static List getEvents(NotificationEvent event, - String catalogName) { + String catalogName) { return Lists.newArrayList(new AlterTableEvent(event, catalogName)); } + @Override + protected boolean willCreateOrDropTable() { + return willCreateOrDropTable; + } + private void processRecreateTable() throws DdlException { if (!isView) { return; @@ -97,6 +115,14 @@ private void processRename() throws DdlException { } + public boolean isRename() { + return isRename; + } + + public boolean isView() { + return isView; + } + /** * If the ALTER_TABLE event is due a table rename, this method removes the old table * and creates a new table with the new name. Else, we just refresh table @@ -124,4 +150,22 @@ protected void process() throws MetastoreNotificationException { debugString("Failed to process event"), e); } } + + @Override + protected boolean canBeBatched(MetastoreEvent that) { + if (!isSameTable(that)) { + return false; + } + + // `that` event must not be a rename table event + // so if the process of this event will drop this table, + // it can merge all the table's events before + if (willCreateOrDropTable) { + return true; + } + + // that event must be a MetastoreTableEvent event + // otherwise `isSameTable` will return false + return !((MetastoreTableEvent) that).willCreateOrDropTable(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java index e115f80f5197e6..eb8da00cfea0fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java @@ -32,6 +32,11 @@ */ public class CreateDatabaseEvent extends MetastoreEvent { + // for test + public CreateDatabaseEvent(long eventId, String catalogName, String dbName) { + super(eventId, catalogName, dbName, null); + } + private CreateDatabaseEvent(NotificationEvent event, String catalogName) { super(event, catalogName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java index 9ac8fd4e684f18..1ec8cbfde5eb40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java @@ -35,6 +35,12 @@ public class CreateTableEvent extends MetastoreTableEvent { private final Table hmsTbl; + // for test + public CreateTableEvent(long eventId, String catalogName, String dbName, String tblName) { + super(eventId, catalogName, dbName, tblName); + this.hmsTbl = null; + } + private CreateTableEvent(NotificationEvent event, String catalogName) throws MetastoreNotificationException { super(event, catalogName); Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType())); @@ -55,6 +61,11 @@ public static List getEvents(NotificationEvent event, String cat return Lists.newArrayList(new CreateTableEvent(event, catalogName)); } + @Override + protected boolean willCreateOrDropTable() { + return true; + } + @Override protected void process() throws MetastoreNotificationException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java index a53cf218db522c..7f8ade0819fce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java @@ -36,7 +36,7 @@ /** * MetastoreEvent for ADD_PARTITION event type */ -public class DropPartitionEvent extends MetastoreTableEvent { +public class DropPartitionEvent extends MetastorePartitionEvent { private final Table hmsTbl; private final List partitionNames; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java index c73a59f1c38b0f..7b43a0966610d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -34,6 +34,13 @@ public class DropTableEvent extends MetastoreTableEvent { private final String tableName; + // for test + public DropTableEvent(long eventId, String catalogName, String dbName, + String tblName) { + super(eventId, catalogName, dbName, tblName); + this.tableName = tblName; + } + private DropTableEvent(NotificationEvent event, String catalogName) { super(event, catalogName); @@ -55,6 +62,11 @@ public static List getEvents(NotificationEvent event, return Lists.newArrayList(new DropTableEvent(event, catalogName)); } + @Override + protected boolean willCreateOrDropTable() { + return true; + } + @Override protected void process() throws MetastoreNotificationException { try { @@ -65,4 +77,11 @@ protected void process() throws MetastoreNotificationException { debugString("Failed to process event"), e); } } + + @Override + protected boolean canBeBatched(MetastoreEvent that) { + // `that` event must not be a rename table event + // so merge all events which belong to this table before is ok + return isSameTable(that); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java index 27438a4dcb9063..3b5650ade4a778 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java @@ -35,6 +35,13 @@ public class InsertEvent extends MetastoreTableEvent { private final Table hmsTbl; + // for test + public InsertEvent(long eventId, String catalogName, String dbName, + String tblName) { + super(eventId, catalogName, dbName, tblName); + this.hmsTbl = null; + } + private InsertEvent(NotificationEvent event, String catalogName) { super(event, catalogName); Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT)); @@ -54,6 +61,11 @@ protected static List getEvents(NotificationEvent event, String return Lists.newArrayList(new InsertEvent(event, catalogName)); } + @Override + protected boolean willCreateOrDropTable() { + return false; + } + @Override protected void process() throws MetastoreNotificationException { try { @@ -72,4 +84,15 @@ protected void process() throws MetastoreNotificationException { debugString("Failed to process event"), e); } } + + @Override + protected boolean canBeBatched(MetastoreEvent that) { + if (!isSameTable(that)) { + return false; + } + + // that event must be a MetastoreTableEvent event + // otherwise `isSameTable` will return false + return !((MetastoreTableEvent) that).willCreateOrDropTable(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index 9693bb0c4cdf13..08aff93ddaa415 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -57,6 +57,17 @@ public abstract class MetastoreEvent { protected final String catalogName; + // for test + protected MetastoreEvent(long eventId, String catalogName, String dbName, String tblName) { + this.eventId = eventId; + this.catalogName = catalogName; + this.dbName = dbName; + this.tblName = tblName; + this.eventType = null; + this.metastoreNotificationEvent = null; + this.event = null; + } + protected MetastoreEvent(NotificationEvent event, String catalogName) { this.event = event; this.dbName = event.getDbName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index 3ab2a7e030eac7..a5bf0d953ced6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -21,12 +21,17 @@ import org.apache.doris.datasource.HMSExternalCatalog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; /** * Factory class to create various MetastoreEvents. @@ -36,7 +41,7 @@ public class MetastoreEventFactory implements EventFactory { @Override public List transferNotificationEventToMetastoreEvents(NotificationEvent event, - String catalogName) { + String catalogName) { Preconditions.checkNotNull(event.getEventType()); MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType()); switch (metastoreEventType) { @@ -68,19 +73,80 @@ public List transferNotificationEventToMetastoreEvents(Notificat List getMetastoreEvents(List events, HMSExternalCatalog hmsExternalCatalog) { List metastoreEvents = Lists.newArrayList(); + String catalogName = hmsExternalCatalog.getName(); for (NotificationEvent event : events) { - metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, hmsExternalCatalog.getName())); + metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, catalogName)); } - return createBatchEvents(metastoreEvents); + return createBatchEvents(catalogName, metastoreEvents); } /** - * Create batch event tasks according to HivePartitionName to facilitate subsequent parallel processing. - * For ADD_PARTITION and DROP_PARTITION, we directly override any events before that partition. - * For a partition, it is meaningless to process any events before the drop partition. - */ - List createBatchEvents(List events) { - // now do nothing - return events; + * Merge events to reduce the cost time on event processing, currently mainly handles MetastoreTableEvent + * because merge MetastoreTableEvent is simple and cost-effective. + * For example, consider there are some events as following: + * + * event1: alter table db1.t1 add partition p1; + * event2: alter table db1.t1 drop partition p2; + * event3: alter table db1.t2 add partition p3; + * event4: alter table db2.t3 rename to t4; + * event5: drop table db1.t1; + * + * Only `event3 event4 event5` will be reserved and other events will be skipped. + * */ + public List createBatchEvents(String catalogName, List events) { + List eventsCopy = Lists.newArrayList(events); + Map> indexMap = Maps.newLinkedHashMap(); + for (int i = 0; i < events.size(); i++) { + MetastoreEvent event = events.get(i); + + // if the event is a rename event, just clear indexMap + // to make sure the table references of these events in indexMap will not change + if (event instanceof AlterDatabaseEvent && ((AlterDatabaseEvent) event).isRename()) { + indexMap.clear(); + continue; + } + + // Only check MetastoreTableEvent + if (!(event instanceof MetastoreTableEvent)) { + continue; + } + + // Divide events into multi groups to reduce check count + MetastoreTableEvent.TableKey groupKey = ((MetastoreTableEvent) event).getTableKey(); + if (!indexMap.containsKey(groupKey)) { + List indexList = Lists.newLinkedList(); + indexList.add(i); + indexMap.put(groupKey, indexList); + continue; + } + + List indexList = indexMap.get(groupKey); + for (int j = 0; j < indexList.size(); j++) { + int candidateIndex = indexList.get(j); + if (candidateIndex == -1) { + continue; + } + if (event.canBeBatched(events.get(candidateIndex))) { + eventsCopy.set(candidateIndex, null); + indexList.set(j, -1); + } + } + indexList = indexList.stream().filter(index -> index != -1) + .collect(Collectors.toList()); + indexList.add(i); + indexMap.put(groupKey, indexList); + + // if the event is a rename event, just clear indexMap + // to make sure the table references of these events in indexMap will not change + if (event instanceof AlterTableEvent && ((AlterTableEvent) event).isRename()) { + indexMap.clear(); + } + } + + List filteredEvents = eventsCopy.stream().filter(Objects::nonNull) + .collect(Collectors.toList()); + LOG.info("Event size on catalog [{}] before merge is [{}], after merge is [{}]", + catalogName, events.size(), filteredEvents.size()); + return ImmutableList.copyOf(filteredEvents); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java new file mode 100644 index 00000000000000..f8bb457ea3c248 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.datasource.hive.event; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * Base class for all the partition events + */ +public abstract class MetastorePartitionEvent extends MetastoreTableEvent { + + // for test + protected MetastorePartitionEvent(long eventId, String catalogName, String dbName, String tblName) { + super(eventId, catalogName, dbName, tblName); + } + + protected MetastorePartitionEvent(NotificationEvent event, String catalogName) { + super(event, catalogName); + } + + protected boolean willCreateOrDropTable() { + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java index 70f56bdbb06bf7..c797c1c08dac4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java @@ -23,12 +23,17 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import java.util.List; +import java.util.Objects; /** * Base class for all the table events */ public abstract class MetastoreTableEvent extends MetastoreEvent { + // for test + protected MetastoreTableEvent(long eventId, String catalogName, String dbName, String tblName) { + super(eventId, catalogName, dbName, tblName); + } protected MetastoreTableEvent(NotificationEvent event, String catalogName) { super(event, catalogName); @@ -47,4 +52,49 @@ protected MetastoreTableEvent(NotificationEvent event, String catalogName) { .add("numFiles") .add("comment") .build(); + + protected boolean isSameTable(MetastoreEvent that) { + if (!(that instanceof MetastoreTableEvent)) { + return false; + } + TableKey thisKey = getTableKey(); + TableKey thatKey = ((MetastoreTableEvent) that).getTableKey(); + return Objects.equals(thisKey, thatKey); + } + + /** + * Returns if the process of this event will create or drop this table. + */ + protected abstract boolean willCreateOrDropTable(); + + public TableKey getTableKey() { + return new TableKey(catalogName, dbName, tblName); + } + + static class TableKey { + private final String catalogName; + private final String dbName; + private final String tblName; + + private TableKey(String catalogName, String dbName, String tblName) { + this.catalogName = catalogName; + this.dbName = dbName; + this.tblName = tblName; + } + + @Override + public boolean equals(Object that) { + if (!(that instanceof TableKey)) { + return false; + } + return Objects.equals(catalogName, ((TableKey) that).catalogName) + && Objects.equals(dbName, ((TableKey) that).dbName) + && Objects.equals(tblName, ((TableKey) that).tblName); + } + + @Override + public int hashCode() { + return Objects.hash(catalogName, dbName, tblName); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java new file mode 100644 index 00000000000000..ba18c84bd7e0e1 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.hms; + +import org.apache.doris.datasource.hive.event.AddPartitionEvent; +import org.apache.doris.datasource.hive.event.AlterDatabaseEvent; +import org.apache.doris.datasource.hive.event.AlterPartitionEvent; +import org.apache.doris.datasource.hive.event.AlterTableEvent; +import org.apache.doris.datasource.hive.event.CreateDatabaseEvent; +import org.apache.doris.datasource.hive.event.CreateTableEvent; +import org.apache.doris.datasource.hive.event.DropTableEvent; +import org.apache.doris.datasource.hive.event.InsertEvent; +import org.apache.doris.datasource.hive.event.MetastoreEvent; +import org.apache.doris.datasource.hive.event.MetastoreEventFactory; + +import org.apache.hadoop.util.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + + +public class MetastoreEventFactoryTest { + private static final MetastoreEventFactory factory = new MetastoreEventFactory(); + + @Test + public void testCreateBatchEvents() { + AlterPartitionEvent e1 = new AlterPartitionEvent(1L, "test_ctl", "test_db", "t1", "p1", "p1"); + AlterPartitionEvent e2 = new AlterPartitionEvent(2L, "test_ctl", "test_db", "t1", "p1", "p1"); + AddPartitionEvent e3 = new AddPartitionEvent(3L, "test_ctl", "test_db", "t1", Arrays.asList("p1")); + AlterTableEvent e4 = new AlterTableEvent(4L, "test_ctl", "test_db", "t1", false, false); + AlterTableEvent e5 = new AlterTableEvent(5L, "test_ctl", "test_db", "t1", true, false); + AlterTableEvent e6 = new AlterTableEvent(6L, "test_ctl", "test_db", "t1", false, true); + DropTableEvent e7 = new DropTableEvent(7L, "test_ctl", "test_db", "t1"); + InsertEvent e8 = new InsertEvent(8L, "test_ctl", "test_db", "t1"); + CreateDatabaseEvent e9 = new CreateDatabaseEvent(9L, "test_ctl", "test_db2"); + AlterPartitionEvent e10 = new AlterPartitionEvent(10L, "test_ctl", "test_db", "t2", "p1", "p1"); + AlterTableEvent e11 = new AlterTableEvent(11L, "test_ctl", "test_db", "t1", false, false); + CreateTableEvent e12 = new CreateTableEvent(12L, "test_ctl", "test_db", "t1"); + AlterDatabaseEvent e13 = new AlterDatabaseEvent(13L, "test_ctl", "test_db", true); + AlterDatabaseEvent e14 = new AlterDatabaseEvent(14L, "test_ctl", "test_db", false); + + List mergedEvents; + List testEvents = Lists.newLinkedList(); + + testEvents.add(e1); + testEvents.add(e2); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 1); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 2L); + + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e3); + testEvents.add(e9); + testEvents.add(e10); + testEvents.add(e4); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 3); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 9L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L); + + // because e5 is a rename event, it will not be merged + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e5); + testEvents.add(e4); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 3); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L); + Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L); + + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e6); + testEvents.add(e4); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 3); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 6L); + Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L); + + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e4); + testEvents.add(e11); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 2); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 11L); + + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e4); + testEvents.add(e8); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 2); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 8L); + + // because e5 is a rename event, it will not be merged + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e5); + testEvents.add(e8); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 3); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L); + Assertions.assertTrue(mergedEvents.get(2).getEventId() == 8L); + + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e12); + testEvents.add(e4); + testEvents.add(e7); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 2); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 7L); + + // because e5 is a rename event, it will not be merged + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e5); + testEvents.add(e7); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 3); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L); + Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L); + + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e4); + testEvents.add(e13); + testEvents.add(e7); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 4); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 4L); + Assertions.assertTrue(mergedEvents.get(2).getEventId() == 13L); + Assertions.assertTrue(mergedEvents.get(3).getEventId() == 7L); + + testEvents.clear(); + testEvents.add(e1); + testEvents.add(e2); + testEvents.add(e10); + testEvents.add(e4); + testEvents.add(e14); + testEvents.add(e7); + mergedEvents = factory.createBatchEvents("test_ctl", testEvents); + Assertions.assertTrue(mergedEvents.size() == 3); + Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); + Assertions.assertTrue(mergedEvents.get(1).getEventId() == 14L); + Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L); + } +}