From 96aaf394a7a47de8dea0b67fddce863d0d42be6f Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Tue, 30 Jul 2024 15:26:52 +0200 Subject: [PATCH] remove transaction children atomically --- .../zookeeper/ZookeeperBasedRepository.java | 33 ++++++++-- .../ZookeeperMessagePreviewRepository.java | 4 +- .../zookeeper/ZookeeperPaths.java | 8 +++ .../zookeeper/ZookeeperTopicRepository.java | 66 +++++++++++++++++-- .../ZookeeperTopicRepositoryTest.groovy | 28 +++++++- 5 files changed, 122 insertions(+), 17 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index 559bf3715e..2578467bfe 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.ArrayUtils; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +20,7 @@ import java.util.List; import java.util.Optional; import java.util.function.BiConsumer; +import java.util.stream.Collectors; public abstract class ZookeeperBasedRepository { @@ -75,6 +78,13 @@ protected List childrenOf(String path) { } } + protected List childrenPathsOf(String path) { + List childNodes = childrenOf(path); + return childNodes.stream() + .map(child -> ZKPaths.makePath(path, child)) + .collect(Collectors.toList()); + } + @SuppressWarnings("unchecked") protected byte[] readFrom(String path) { return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get(); @@ -156,14 +166,18 @@ protected void createInTransaction(String path, Object value, String childPath) .commit(); } - protected void deleteInTransaction(String path, String childPath) throws Exception { + protected void deleteInTransaction(List paths) throws Exception { + if (paths.isEmpty()) { + throw new InternalProcessingException("Attempting to remove empty set of paths from ZK"); + } ensureConnected(); - zookeeper.inTransaction() - .delete().forPath(childPath) - .and() - .delete().forPath(path) - .and() - .commit(); + CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and(); + + for (int i = 1; i < paths.size(); i++) { + transaction = transaction.delete().forPath(paths.get(i)).and(); + } + + transaction.commit(); } protected void create(String path, Object value) throws Exception { @@ -182,6 +196,11 @@ protected void touch(String path) throws Exception { zookeeper.setData().forPath(path, oldData); } + protected void removeIfExists(String path) throws Exception { + ensureConnected(); + zookeeper.delete().quietly().guaranteed().deletingChildrenIfNeeded().forPath(path); + } + protected void remove(String path) throws Exception { ensureConnected(); zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java index 0fd938b668..e80b574f02 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperMessagePreviewRepository.java @@ -28,7 +28,7 @@ public ZookeeperMessagePreviewRepository(CuratorFramework zookeeper, ObjectMappe @Override public List loadPreview(TopicName topicName) { try { - return Optional.of(paths.topicPath(topicName, ZookeeperPaths.PREVIEW_PATH)) + return Optional.of(paths.topicPreviewPath(topicName)) .filter(this::pathExists) .flatMap(p -> readFrom(p, new TypeReference>() {}, true)) .orElseGet(ArrayList::new); @@ -50,7 +50,7 @@ private void persistMessage(TopicName topic, List messages) { logger.debug("Persisting {} messages for preview of topic: {}", messages.size(), topic.qualifiedName()); try { if (pathExists(paths.topicPath(topic))) { - String previewPath = paths.topicPath(topic, ZookeeperPaths.PREVIEW_PATH); + String previewPath = paths.topicPreviewPath(topic); ensurePathExists(previewPath); overwrite(previewPath, messages); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java index 9e04bcc64c..be3cb23393 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java @@ -77,6 +77,14 @@ public String topicPath(TopicName topicName, String... tail) { return Joiner.on(URL_SEPARATOR).join(topicsPath(topicName.getGroupName()), topicName.getName(), (Object[]) tail); } + public String topicPreviewPath(TopicName topicName) { + return topicPath(topicName, ZookeeperPaths.PREVIEW_PATH); + } + + public String topicMetricsPath(TopicName topicName) { + return topicPath(topicName, METRICS_PATH); + } + public String subscriptionPath(TopicName topicName, String subscriptionName, String... tail) { return Joiner.on(URL_SEPARATOR).join(subscriptionsPath(topicName), subscriptionName, (Object[]) tail); } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index b58593f75c..b04122f1ff 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -13,6 +13,7 @@ import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException; import pl.allegro.tech.hermes.domain.topic.TopicRepository; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -77,18 +78,58 @@ public void createTopic(Topic topic) { } } + /** + * To remove topic node, we must remove topic node and its children. The tree looks like this: + * - topic + * ----- /subscriptions (required) + * ----- /preview (optional) + * ----- /metrics (optional) + * --------------- /volume + * --------------- /published + *

+ * One way to remove the whole tree for topic that would be to use 'deletingChildrenIfNeeded()': + * e.g. zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath). + * However, deletingChildrenIfNeeded is not atomic. It first tries to remove the node ('topic') + * and upon receiving 'KeeperException.NotEmptyException' it tries to remove children recursively + * and then retries the node removal. This means that there is a potentially large time gap between + * removal of 'topic/subscriptions' node and 'topic' node, especially when topic removal is being done + * in remote DC. It turns out that 'PathChildrenCache' used for 'HierarchicalCacheLevel' in + * consumers and management recreates 'topic/subscriptions' node when deleted. If the recreation is faster + * than the removal of 'topic' node, than the whole removal process must be repeated resulting in a lengthy loop + * that may even result in StackOverflowException. + *

+ * To solve this we must remove 'topic' and 'topic/subscriptions' atomically. However, we must also remove + * other 'topic' children. Transaction API does not allow for 'optional' deletes so we: + * 1. find all children beforehand + * 2. delete all children in one transaction + */ @Override public void removeTopic(TopicName topicName) { ensureTopicExists(topicName); - String topicPath = paths.topicPath(topicName); - logger.info("Removing topic: " + topicName); + + List pathsForRemoval = new ArrayList<>(); + String topicMetricsPath = paths.topicMetricsPath(topicName); + if (pathExists(topicMetricsPath)) { + pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath)); + pathsForRemoval.add(topicMetricsPath); + } + + String topicPreviewPath = paths.topicPreviewPath(topicName); + if (pathExists(topicPreviewPath)) { + pathsForRemoval.add(topicPreviewPath); + } + + pathsForRemoval.add(paths.subscriptionsPath(topicName)); + pathsForRemoval.add(paths.topicPath(topicName)); + try { - deleteInTransaction(topicPath, paths.subscriptionsPath(topicName)); - } catch (Exception e) { - throw new InternalProcessingException(e); + deleteInTransaction(pathsForRemoval); + } catch (Exception ex) { + throw new InternalProcessingException(ex); } } + @Override public void updateTopic(Topic topic) { ensureTopicExists(topic.getName()); @@ -106,6 +147,7 @@ public void touchTopic(TopicName topicName) { ensureTopicExists(topicName); logger.info("Touching topic: " + topicName.qualifiedName()); + removeTopicChildren(topicName); try { touch(paths.topicPath(topicName)); } catch (Exception ex) { @@ -113,6 +155,20 @@ public void touchTopic(TopicName topicName) { } } + private void removeTopicChildren(TopicName topicName) { + try { + removeIfExists(paths.topicPreviewPath(topicName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } + + try { + removeIfExists(paths.topicMetricsPath(topicName)); + } catch (Exception e) { + throw new InternalProcessingException(e); + } + } + @Override public Topic getTopicDetails(TopicName topicName) { return getTopicDetails(topicName, false).get(); diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy index 71bcd15bb4..7ac1377790 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy @@ -185,17 +185,16 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest { !repository.topicExists(new TopicName(GROUP, 'remove')) } - def "should remove topic with metrics but without subscriptions"() { + def "should remove topic with metrics and without preview"() { given: def topicName = "topicWithMetrics" repository.createTopic(topic(GROUP, topicName).build()) wait.untilTopicCreated(GROUP, topicName) - def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.SUBSCRIPTION_DELIVERED, pathContext() + def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.TOPIC_VOLUME_COUNTER, pathContext() .withGroup(GROUP) .withTopic(topicName) - .withSubscription("sample") .build()) zookeeper().create().creatingParentsIfNeeded().forPath(path, '1'.bytes) wait.untilZookeeperPathIsCreated(path) @@ -207,6 +206,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest { !repository.topicExists(new TopicName(GROUP, topicName)) } + def "should remove topic with metrics and preview"() { + given: "a topic" + Topic topic = topic(GROUP, "topicWithMetricsAndPreview").build() + repository.createTopic(topic) + wait.untilTopicCreated(GROUP, topic.getName().getName()) + + and: "volume metric in zk for that topic" + String metricPath = paths.topicMetricPath(topic.getName(), "volume") + zookeeper().create().creatingParentsIfNeeded().forPath(metricPath, '1'.bytes) + wait.untilZookeeperPathIsCreated(metricPath) + + and: "preview in zk for that topic" + String previewPath = paths.topicPreviewPath(topic.getName()) + zookeeper().create().creatingParentsIfNeeded().forPath(previewPath , '1'.bytes) + wait.untilZookeeperPathIsCreated(previewPath) + + when: + repository.removeTopic(topic.getName()) + + then: + !repository.topicExists(topic.getName()) + } + def "should not throw exception on malformed topic when reading list of all topics"() { given: zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes)