Skip to content

Commit

Permalink
Merge branch 'master' into SPAW-942_hermes_contribution
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekdrobczyk authored Aug 7, 2024
2 parents 844d9ba + 0bf4c0c commit 77ef546
Show file tree
Hide file tree
Showing 41 changed files with 258 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public abstract class ZookeeperBasedRepository {

Expand Down Expand Up @@ -75,6 +78,13 @@ protected List<String> childrenOf(String path) {
}
}

protected List<String> childrenPathsOf(String path) {
List<String> childNodes = childrenOf(path);
return childNodes.stream()
.map(child -> ZKPaths.makePath(path, child))
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
protected byte[] readFrom(String path) {
return readWithStatFrom(path, bytes -> bytes, (t, stat) -> {}, false).get();
Expand Down Expand Up @@ -156,6 +166,20 @@ protected void createInTransaction(String path, Object value, String childPath)
.commit();
}

protected void deleteInTransaction(List<String> paths) throws Exception {
if (paths.isEmpty()) {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
CuratorTransactionFinal transaction = zookeeper.inTransaction().delete().forPath(paths.get(0)).and();

for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}

transaction.commit();
}

protected void create(String path, Object value) throws Exception {
ensureConnected();
zookeeper.create().forPath(path, mapper.writeValueAsBytes(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Group;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.group.GroupAlreadyExistsException;
import pl.allegro.tech.hermes.domain.group.GroupNotEmptyException;
Expand Down Expand Up @@ -65,14 +66,23 @@ public void updateGroup(Group group) {
}
}

/**
* Atomic removal of <code>group</code> and <code>group/topics</code>
* nodes is required to prevent lengthy loop during removal, see:
* {@link pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)}.
*/
@Override
public void removeGroup(String groupName) {
ensureGroupExists(groupName);
ensureGroupIsEmpty(groupName);

logger.info("Removing group: {}", groupName);
List<String> pathsToDelete = List.of(
paths.topicsPath(groupName),
paths.groupPath(groupName)
);
try {
remove(paths.groupPath(groupName));
deleteInTransaction(pathsToDelete);
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ZookeeperMessagePreviewRepository(CuratorFramework zookeeper, ObjectMappe
@Override
public List<MessagePreview> loadPreview(TopicName topicName) {
try {
return Optional.of(paths.topicPath(topicName, ZookeeperPaths.PREVIEW_PATH))
return Optional.of(paths.topicPreviewPath(topicName))
.filter(this::pathExists)
.flatMap(p -> readFrom(p, new TypeReference<List<MessagePreview>>() {}, true))
.orElseGet(ArrayList::new);
Expand All @@ -50,7 +50,7 @@ private void persistMessage(TopicName topic, List<MessagePreview> messages) {
logger.debug("Persisting {} messages for preview of topic: {}", messages.size(), topic.qualifiedName());
try {
if (pathExists(paths.topicPath(topic))) {
String previewPath = paths.topicPath(topic, ZookeeperPaths.PREVIEW_PATH);
String previewPath = paths.topicPreviewPath(topic);
ensurePathExists(previewPath);
overwrite(previewPath, messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public String topicPath(TopicName topicName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(topicsPath(topicName.getGroupName()), topicName.getName(), (Object[]) tail);
}

public String topicPreviewPath(TopicName topicName) {
return topicPath(topicName, ZookeeperPaths.PREVIEW_PATH);
}

public String topicMetricsPath(TopicName topicName) {
return topicPath(topicName, METRICS_PATH);
}

public String subscriptionPath(TopicName topicName, String subscriptionName, String... tail) {
return Joiner.on(URL_SEPARATOR).join(subscriptionsPath(topicName), subscriptionName, (Object[]) tail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -77,12 +78,67 @@ public void createTopic(Topic topic) {
}
}

/**
* To remove topic node, we must remove topic node and its children. The tree looks like this:
* <ul>
* <li>- topic
* <li>----- /subscriptions (required)
* <li>----- /preview (optional)
* <li>----- /metrics (optional)
* <li>--------------- /volume
* <li>--------------- /published
* </ul>
*
* <p>One way to remove the whole tree for topic that would be to use <code>deletingChildrenIfNeeded()</code>:
* e.g. <code>zookeeper.delete().deletingChildrenIfNeeded().forPath(topicPath)</code>.
* However, <code>deletingChildrenIfNeeded</code> is not atomic. It first tries to remove the node <code>topic</code>
* and upon receiving <code>KeeperException.NotEmptyException</code> it tries to remove children recursively
* and then retries the node removal. This means that there is a potentially large time gap between
* removal of <code>topic/subscriptions</code> node and <code>topic</code> node, especially when topic removal is being done
* in remote DC.
*
* <p>It turns out that <code>PathChildrenCache</code> used by <code>HierarchicalCacheLevel</code> in
* Consumers and Frontend listens for <code>topics/subscriptions</code> changes and recreates that node when deleted.
* If the recreation happens between the <code>topic/subscriptions</code> and <code>topic</code> node removal
* than the whole removal process must be repeated resulting in a lengthy loop that may even result in <code>StackOverflowException</code>.
* Example of that scenario would be
* <ol>
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> DELETE <code>topic/subscriptions</code> - issued by management, succeeds
* <li> CREATE <code>topic/subscriptions</code> - issued by frontend, succeeds
* <li> DELETE <code>topic</code> - issued by management, fails with KeeperException.NotEmptyException
* <li> [...]
* </ol>
*
* <p>To solve this we must remove <code>topic</code> and <code>topic/subscriptions</code> atomically. However, we must also remove
* other <code>topic</code> children. Transaction API does not allow for optional deletes so we:
* <ol>
* <li> find all children paths
* <li> delete all children in one transaction
* </ol>
*/
@Override
public void removeTopic(TopicName topicName) {
ensureTopicExists(topicName);
logger.info("Removing topic: " + topicName);

List<String> pathsForRemoval = new ArrayList<>();
String topicMetricsPath = paths.topicMetricsPath(topicName);
if (pathExists(topicMetricsPath)) {
pathsForRemoval.addAll(childrenPathsOf(topicMetricsPath));
pathsForRemoval.add(topicMetricsPath);
}

String topicPreviewPath = paths.topicPreviewPath(topicName);
if (pathExists(topicPreviewPath)) {
pathsForRemoval.add(topicPreviewPath);
}

pathsForRemoval.add(paths.subscriptionsPath(topicName));
pathsForRemoval.add(paths.topicPath(topicName));

try {
remove(paths.topicPath(topicName));
deleteInTransaction(pathsForRemoval);
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,16 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, 'remove'))
}

def "should remove topic with metrics but without subscriptions"() {
def "should remove topic with metrics and without preview"() {
given:
def topicName = "topicWithMetrics"

repository.createTopic(topic(GROUP, topicName).build())
wait.untilTopicCreated(GROUP, topicName)

def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.SUBSCRIPTION_DELIVERED, pathContext()
def path = pathsCompiler.compile(BASE_ZOOKEEPER_PATH + ZookeeperCounterStorage.TOPIC_VOLUME_COUNTER, pathContext()
.withGroup(GROUP)
.withTopic(topicName)
.withSubscription("sample")
.build())
zookeeper().create().creatingParentsIfNeeded().forPath(path, '1'.bytes)
wait.untilZookeeperPathIsCreated(path)
Expand All @@ -207,6 +206,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
!repository.topicExists(new TopicName(GROUP, topicName))
}

def "should remove topic with metrics and preview"() {
given: "a topic"
Topic topic = topic(GROUP, "topicWithMetricsAndPreview").build()
repository.createTopic(topic)
wait.untilTopicCreated(GROUP, topic.getName().getName())

and: "volume metric in zk for that topic"
String metricPath = paths.topicMetricPath(topic.getName(), "volume")
zookeeper().create().creatingParentsIfNeeded().forPath(metricPath, '1'.bytes)
wait.untilZookeeperPathIsCreated(metricPath)

and: "preview in zk for that topic"
String previewPath = paths.topicPreviewPath(topic.getName())
zookeeper().create().creatingParentsIfNeeded().forPath(previewPath , '1'.bytes)
wait.untilZookeeperPathIsCreated(previewPath)

when:
repository.removeTopic(topic.getName())

then:
!repository.topicExists(topic.getName())
}

def "should not throw exception on malformed topic when reading list of all topics"() {
given:
zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch
AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties);
BrokerStorage storage = brokersStorage(brokerAdminClient);
BrokerTopicManagement brokerTopicManagement =
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper);
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper, kafkaProperties.getDatacenter());
KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBrokerList());
KafkaRawMessageReader kafkaRawMessageReader =
new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void execute(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> ho
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder, Exception exception) {
holder.getRepository().remove(qualifiedTopicName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void execute(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> ho
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<TopicBlacklistRepository> holder, Exception exception) {
if (exists) {
holder.getRepository().add(qualifiedTopicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void execute(DatacenterBoundRepositoryHolder<CredentialsRepository> holde
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<CredentialsRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<CredentialsRepository> holder, Exception exception) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,37 @@ private <T> void execute(RepositoryCommand<T> command, boolean isRollbackEnabled
List<DatacenterBoundRepositoryHolder<T>> executedRepoHolders = new ArrayList<>();

for (DatacenterBoundRepositoryHolder<T> repoHolder : repoHolders) {
long start = System.currentTimeMillis();
try {
executedRepoHolders.add(repoHolder);
logger.info("Executing repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName());
command.execute(repoHolder);
logger.info("Successfully executed repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start);
} catch (RepositoryNotAvailableException e) {
logger.warn("Execute failed with an RepositoryNotAvailableException error", e);
if (isRollbackEnabled) {
rollback(executedRepoHolders, command);
rollback(executedRepoHolders, command, e);
}
if (shouldStopExecutionOnFailure) {
throw ExceptionWrapper.wrapInInternalProcessingExceptionIfNeeded(e, command.toString(), repoHolder.getDatacenterName());
}
} catch (Exception e) {
logger.warn("Execute failed with an error", e);
logger.warn("Failed to execute repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start, e);
if (isRollbackEnabled) {
rollback(executedRepoHolders, command);
rollback(executedRepoHolders, command, e);
}
throw ExceptionWrapper.wrapInInternalProcessingExceptionIfNeeded(e, command.toString(), repoHolder.getDatacenterName());
}
}
}

private <T> void rollback(List<DatacenterBoundRepositoryHolder<T>> repoHolders, RepositoryCommand<T> command) {
private <T> void rollback(List<DatacenterBoundRepositoryHolder<T>> repoHolders, RepositoryCommand<T> command, Exception exception) {
long start = System.currentTimeMillis();
for (DatacenterBoundRepositoryHolder<T> repoHolder : repoHolders) {
logger.info("Executing rollback of repository command: {} in ZK dc: {}", command, repoHolder.getDatacenterName());
try {
command.rollback(repoHolder);
command.rollback(repoHolder, exception);
logger.info("Successfully executed rollback of repository command: {} in ZK dc: {} in: {} ms", command, repoHolder.getDatacenterName(), System.currentTimeMillis() - start);
} catch (Exception e) {
logger.error("Rollback procedure failed for command {} on DC {}", command, repoHolder.getDatacenterName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public abstract class RepositoryCommand<T> {

public abstract void execute(DatacenterBoundRepositoryHolder<T> holder);

public abstract void rollback(DatacenterBoundRepositoryHolder<T> holder);
public abstract void rollback(DatacenterBoundRepositoryHolder<T> holder, Exception exception);

public abstract Class<T> getRepositoryType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void execute(DatacenterBoundRepositoryHolder<GroupRepository> holder) {
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<GroupRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<GroupRepository> holder, Exception exception) {
if (!exists) {
holder.getRepository().removeGroup(group.getGroupName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void execute(DatacenterBoundRepositoryHolder<GroupRepository> holder) {
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<GroupRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<GroupRepository> holder, Exception exception) {
holder.getRepository().createGroup(backup);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void execute(DatacenterBoundRepositoryHolder<GroupRepository> holder) {
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<GroupRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<GroupRepository> holder, Exception exception) {
holder.getRepository().updateGroup(backup);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void execute(DatacenterBoundRepositoryHolder<OAuthProviderRepository> hol
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<OAuthProviderRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<OAuthProviderRepository> holder, Exception exception) {
holder.getRepository().removeOAuthProvider(provider.getName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void execute(DatacenterBoundRepositoryHolder<OAuthProviderRepository> hol
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<OAuthProviderRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<OAuthProviderRepository> holder, Exception exception) {
holder.getRepository().createOAuthProvider(backup);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void execute(DatacenterBoundRepositoryHolder<OAuthProviderRepository> hol
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<OAuthProviderRepository> holder) {
public void rollback(DatacenterBoundRepositoryHolder<OAuthProviderRepository> holder, Exception exception) {
holder.getRepository().updateOAuthProvider(backup);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void execute(DatacenterBoundRepositoryHolder<DatacenterReadinessRepositor
}

@Override
public void rollback(DatacenterBoundRepositoryHolder<DatacenterReadinessRepository> holder) { }
public void rollback(DatacenterBoundRepositoryHolder<DatacenterReadinessRepository> holder, Exception exception) { }

@Override
public Class<DatacenterReadinessRepository> getRepositoryType() {
Expand Down
Loading

0 comments on commit 77ef546

Please sign in to comment.