Skip to content

Commit

Permalink
remove transaction children atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Jul 30, 2024
1 parent 0e83625 commit 96aaf39
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public abstract class ZookeeperBasedRepository {

Expand Down Expand Up @@ -75,6 +78,13 @@ protected List<String> childrenOf(String path) {
}
}

protected List<String> childrenPathsOf(String path) {
List<String> childNodes = childrenOf(path);
return childNodes.stream()
.map(child -> ZKPaths.makePath(path, child))
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
protected byte[] readFrom(String path) {
return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get();
Expand Down Expand Up @@ -156,14 +166,18 @@ protected void createInTransaction(String path, Object value, String childPath)
.commit();
}

protected void deleteInTransaction(String path, String childPath) throws Exception {
protected void deleteInTransaction(List<String> paths) throws Exception {
if (paths.isEmpty()) {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
zookeeper.inTransaction()
.delete().forPath(childPath)
.and()
.delete().forPath(path)
.and()
.commit();
CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and();

for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}

transaction.commit();
}

protected void create(String path, Object value) throws Exception {
Expand All @@ -182,6 +196,11 @@ protected void touch(String path) throws Exception {
zookeeper.setData().forPath(path, oldData);
}

protected void removeIfExists(String path) throws Exception {
ensureConnected();
zookeeper.delete().quietly().guaranteed().deletingChildrenIfNeeded().forPath(path);
}

protected void remove(String path) throws Exception {
ensureConnected();
zookeeper.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ZookeeperMessagePreviewRepository(CuratorFramework zookeeper, ObjectMappe
@Override
public List<MessagePreview> loadPreview(TopicName topicName) {
try {
return Optional.of(paths.topicPath(topicName, ZookeeperPaths.PREVIEW_PATH))
return Optional.of(paths.topicPreviewPath(topicName))
.filter(this::pathExists)
.flatMap(p -> readFrom(p, new TypeReference<List<MessagePreview>>() {}, true))
.orElseGet(ArrayList::new);
Expand All @@ -50,7 +50,7 @@ private void persistMessage(TopicName topic, List<MessagePreview> messages) {
logger.debug("Persisting {} messages for preview of topic: {}", messages.size(), topic.qualifiedName());
try {
if (pathExists(paths.topicPath(topic))) {
String previewPath = paths.topicPath(topic, ZookeeperPaths.PREVIEW_PATH);
String previewPath = paths.topicPreviewPath(topic);
ensurePathExists(previewPath);
overwrite(previewPath, messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public String topicPath(TopicName topicName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(topicsPath(topicName.getGroupName()), topicName.getName(), (Object[]) tail);
}

public String topicPreviewPath(TopicName topicName) {
return topicPath(topicName, ZookeeperPaths.PREVIEW_PATH);
}

public String topicMetricsPath(TopicName topicName) {
return topicPath(topicName, METRICS_PATH);
}

public String subscriptionPath(TopicName topicName, String subscriptionName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(subscriptionsPath(topicName), subscriptionName, (Object[]) tail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -77,18 +78,58 @@ public void createTopic(Topic topic) {
}
}

/**
* To remove topic node, we must remove topic node and its children. The tree looks like this:
* - topic
* ----- /subscriptions (required)
* ----- /preview (optional)
* ----- /metrics (optional)
* --------------- /volume
* --------------- /published
* <p>

Check warning on line 89 in hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java#L89 <com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck>

<p> tag should be placed immediately before the first word, with no space after.
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java:89:0: warning: <p> tag should be placed immediately before the first word, with no space after. (com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck)

Check warning on line 89 in hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java#L89 <com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck>

<p> tag should be preceded with an empty line.
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java:89:0: warning: <p> tag should be preceded with an empty line. (com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck)
* One way to remove the whole tree for topic that would be to use 'deletingChildrenIfNeeded()':
* e.g. zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath).
* However, deletingChildrenIfNeeded is not atomic. It first tries to remove the node ('topic')
* and upon receiving 'KeeperException.NotEmptyException' it tries to remove children recursively
* and then retries the node removal. This means that there is a potentially large time gap between
* removal of 'topic/subscriptions' node and 'topic' node, especially when topic removal is being done
* in remote DC. It turns out that 'PathChildrenCache' used for 'HierarchicalCacheLevel' in
* consumers and management recreates 'topic/subscriptions' node when deleted. If the recreation is faster
* than the removal of 'topic' node, than the whole removal process must be repeated resulting in a lengthy loop
* that may even result in StackOverflowException.
* <p>

Check warning on line 100 in hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java#L100 <com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck>

<p> tag should be placed immediately before the first word, with no space after.
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java:100:0: warning: <p> tag should be placed immediately before the first word, with no space after. (com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck)

Check warning on line 100 in hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java

View workflow job for this annotation

GitHub Actions / checkstyle-hermes-common

[checkstyle-hermes-common] hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java#L100 <com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck>

<p> tag should be preceded with an empty line.
Raw output
/home/runner/work/hermes/hermes/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java:100:0: warning: <p> tag should be preceded with an empty line. (com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocParagraphCheck)
* To solve this we must remove 'topic' and 'topic/subscriptions' atomically. However, we must also remove
* other 'topic' children. Transaction API does not allow for 'optional' deletes so we:
* 1. find all children beforehand
* 2. delete all children in one transaction
*/
@Override
public void removeTopic(TopicName topicName) {
ensureTopicExists(topicName);
String topicPath = paths.topicPath(topicName);
logger.info("Removing topic: " + topicName);

List<String> pathsForRemoval = new ArrayList<>();
String topicMetricsPath = paths.topicMetricsPath(topicName);
if (pathExists(topicMetricsPath)) {
pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath));
pathsForRemoval.add(topicMetricsPath);
}

String topicPreviewPath = paths.topicPreviewPath(topicName);
if (pathExists(topicPreviewPath)) {
pathsForRemoval.add(topicPreviewPath);
}

pathsForRemoval.add(paths.subscriptionsPath(topicName));
pathsForRemoval.add(paths.topicPath(topicName));

try {
deleteInTransaction(topicPath, paths.subscriptionsPath(topicName));
} catch (Exception e) {
throw new InternalProcessingException(e);
deleteInTransaction(pathsForRemoval);
} catch (Exception ex) {
throw new InternalProcessingException(ex);
}
}


@Override
public void updateTopic(Topic topic) {
ensureTopicExists(topic.getName());
Expand All @@ -106,13 +147,28 @@ public void touchTopic(TopicName topicName) {
ensureTopicExists(topicName);

logger.info("Touching topic: " + topicName.qualifiedName());
removeTopicChildren(topicName);
try {
touch(paths.topicPath(topicName));
} catch (Exception ex) {
throw new InternalProcessingException(ex);
}
}

private void removeTopicChildren(TopicName topicName) {
try {
removeIfExists(paths.topicPreviewPath(topicName));
} catch (Exception e) {
throw new InternalProcessingException(e);
}

try {
removeIfExists(paths.topicMetricsPath(topicName));
} catch (Exception e) {
throw new InternalProcessingException(e);
}
}

@Override
public Topic getTopicDetails(TopicName topicName) {
return getTopicDetails(topicName, false).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,16 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, 'remove'))
}

def "should remove topic with metrics but without subscriptions"() {
def "should remove topic with metrics and without preview"() {
given:
def topicName = "topicWithMetrics"

repository.createTopic(topic(GROUP, topicName).build())
wait.untilTopicCreated(GROUP, topicName)

def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.SUBSCRIPTION_DELIVERED, pathContext()
def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.TOPIC_VOLUME_COUNTER, pathContext()
.withGroup(GROUP)
.withTopic(topicName)
.withSubscription("sample")
.build())
zookeeper().create().creatingParentsIfNeeded().forPath(path, '1'.bytes)
wait.untilZookeeperPathIsCreated(path)
Expand All @@ -207,6 +206,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, topicName))
}

def "should remove topic with metrics and preview"() {
given: "a topic"
Topic topic = topic(GROUP, "topicWithMetricsAndPreview").build()
repository.createTopic(topic)
wait.untilTopicCreated(GROUP, topic.getName().getName())

and: "volume metric in zk for that topic"
String metricPath = paths.topicMetricPath(topic.getName(), "volume")
zookeeper().create().creatingParentsIfNeeded().forPath(metricPath, '1'.bytes)
wait.untilZookeeperPathIsCreated(metricPath)

and: "preview in zk for that topic"
String previewPath = paths.topicPreviewPath(topic.getName())
zookeeper().create().creatingParentsIfNeeded().forPath(previewPath , '1'.bytes)
wait.untilZookeeperPathIsCreated(previewPath)

when:
repository.removeTopic(topic.getName())

then:
!repository.topicExists(topic.getName())
}

def "should not throw exception on malformed topic when reading list of all topics"() {
given:
zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes)
Expand Down

0 comments on commit 96aaf39

Please sign in to comment.