Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: prevent creating topics just after deletion ( #1739) #1918

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public enum ErrorCode {
TIMEOUT(REQUEST_TIMEOUT),
TOPIC_ALREADY_EXISTS(BAD_REQUEST),
TOPIC_CREATED_RECENTLY(BAD_REQUEST),
TOPIC_NOT_EXISTS(NOT_FOUND),
GROUP_NOT_EXISTS(NOT_FOUND),
GROUP_NAME_IS_INVALID(BAD_REQUEST),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pl.allegro.tech.hermes.domain.topic;

import java.time.Instant;
import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.HermesException;

public class TopicDeletedRecentlyException extends HermesException {

public TopicDeletedRecentlyException(TopicName topicName, Instant thresholdTime) {
super(
String.format(
"Topic %s cannot created until %s",
topicName.qualifiedName(), thresholdTime.toString()));
}

@Override
public ErrorCode getCode() {
return ErrorCode.TOPIC_CREATED_RECENTLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public interface TopicRepository {

void createTopic(Topic topic);

void ensureTopicCanBeCreated(TopicName topicName);

void removeTopic(TopicName topicName);

void updateTopic(Topic topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -177,14 +178,30 @@ protected void deleteInTransaction(List<String> paths) throws Exception {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
CuratorTransactionFinal transaction =
zookeeper.inTransaction().delete().forPath(paths.get(0)).and();
addPathsToDelete(zookeeper.inTransaction(), paths).commit();
}

for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}
protected void deleteAndCreateInTransaction(List<String> paths, String path, Object value)
throws Exception {
ensureConnected();
CuratorTransactionFinal transaction = addPathsToDelete(zookeeper.inTransaction(), paths);
transaction.create().forPath(path, mapper.writeValueAsBytes(value)).and().commit();
}

transaction.commit();
protected void deleteAndCreateInTransaction(
List<String> pathsToDelete, List<String> paths, String path, Object value) throws Exception {
ensureConnected();
CuratorTransactionFinal transaction =
addPathsToDelete(zookeeper.inTransaction(), pathsToDelete);
transaction = addPathsToCreate(transaction, paths);
transaction.create().forPath(path, mapper.writeValueAsBytes(value)).and().commit();
}

protected void deleteAndOverwriteInTransaction(List<String> paths, String path, Object value)
throws Exception {
ensureConnected();
CuratorTransactionFinal transaction = addPathsToDelete(zookeeper.inTransaction(), paths);
transaction.setData().forPath(path, mapper.writeValueAsBytes(value)).and().commit();
}

protected void create(String path, Object value) throws Exception {
Expand All @@ -211,4 +228,24 @@ protected void remove(String path) throws Exception {
private interface ThrowingReader<T> {
T read(byte[] data) throws IOException;
}

private CuratorTransactionFinal addPathsToDelete(
CuratorTransaction transaction, List<String> paths) throws Exception {
CuratorTransactionFinal transactionWithDelete =
transaction.delete().forPath(paths.get(0)).and();
for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
}
return transactionWithDelete;
}

private CuratorTransactionFinal addPathsToCreate(
CuratorTransaction transaction, List<String> paths) throws Exception {
CuratorTransactionFinal transactionWithCreate =
transaction.create().forPath(paths.get(0)).and();
for (int i = 1; i < paths.size(); i++) {
transaction = transaction.create().forPath(paths.get(i)).and();
}
return transactionWithCreate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -65,8 +66,8 @@ public void updateGroup(Group group) {
}

/**
* Atomic removal of <code>group</code> and <code>group/topics</code> nodes is required to prevent
* lengthy loop during removal, see: {@link
* Atomic removal of <code>group</code>, <code>group/topics</code> and <code>group/deletion_time
* </code> nodes is required to prevent lengthy loop during removal, see: {@link
* pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperTopicRepository#removeTopic(TopicName)}.
*/
@Override
Expand All @@ -76,6 +77,16 @@ public void removeGroup(String groupName) {

logger.info("Removing group: {}", groupName);
List<String> pathsToDelete = List.of(paths.topicsPath(groupName), paths.groupPath(groupName));
String topicDeletionTimePath = paths.groupTopicDeletionTimePath(groupName);
if (pathExists(topicDeletionTimePath)) {
// topicDeletionTimePath can contain a list of previously deleted topics, need to delete those
// first
List<String> pathsToDeleteWithDeletionTime =
new ArrayList<String>(childrenPathsOf(topicDeletionTimePath));
pathsToDeleteWithDeletionTime.add(topicDeletionTimePath);
pathsToDeleteWithDeletionTime.addAll(pathsToDelete);
pathsToDelete = pathsToDeleteWithDeletionTime;
}
try {
deleteInTransaction(pathsToDelete);
} catch (Exception e) {
Expand All @@ -84,7 +95,8 @@ public void removeGroup(String groupName) {
}

private void ensureGroupIsEmpty(String groupName) {
if (!childrenOf(paths.topicsPath(groupName)).isEmpty()) {
String topicDeletionTimePath = paths.groupTopicDeletionTimePath(groupName);
if (!childrenOf(paths.topicsPath(groupName)).stream().collect(Collectors.toList()).isEmpty()) {
throw new GroupNotEmptyException(groupName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ZookeeperPaths {
public static final String GROUPS_PATH = "groups";
public static final String SUBSCRIPTIONS_PATH = "subscriptions";
public static final String KAFKA_TOPICS_PATH = "kafka_topics";
public static final String DELETION_TIME_PATH = "deletion_time";
public static final String URL_SEPARATOR = "/";
public static final String CONSUMERS_WORKLOAD_PATH = "consumers-workload";
public static final String CONSUMER_LOAD_PATH = "consumer-load";
Expand Down Expand Up @@ -61,10 +62,19 @@ public String groupPath(String groupName) {
return Joiner.on(URL_SEPARATOR).join(groupsPath(), groupName);
}

public String groupTopicDeletionTimePath(String groupName) {
return Joiner.on(URL_SEPARATOR).join(groupPath(groupName), DELETION_TIME_PATH);
}

public String topicsPath(String groupName) {
return Joiner.on(URL_SEPARATOR).join(groupPath(groupName), TOPICS_PATH);
}

public String topicDeletionTimePath(TopicName topicName) {
return Joiner.on(URL_SEPARATOR)
.join(groupPath(topicName.getGroupName()), DELETION_TIME_PATH, topicName.getName());
}

public String topicMetricPath(TopicName topicName, String metricName) {
return topicPath(topicName, "metrics", metricName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -15,6 +18,7 @@
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.domain.group.GroupRepository;
import pl.allegro.tech.hermes.domain.topic.TopicAlreadyExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicDeletedRecentlyException;
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException;
import pl.allegro.tech.hermes.domain.topic.TopicRepository;

Expand Down Expand Up @@ -64,6 +68,7 @@ public List<Topic> listTopics(String groupName) {
@Override
public void createTopic(Topic topic) {
groupRepository.ensureGroupExists(topic.getName().getGroupName());
ensureTopicCanBeCreated(topic.getName());

String topicPath = paths.topicPath(topic.getName());
logger.info("Creating topic for path {}", topicPath);
Expand Down Expand Up @@ -144,8 +149,23 @@ public void removeTopic(TopicName topicName) {
pathsForRemoval.add(paths.subscriptionsPath(topicName));
pathsForRemoval.add(paths.topicPath(topicName));

String topicDeletionTimePath = paths.topicDeletionTimePath(topicName);

try {
deleteInTransaction(pathsForRemoval);
if (pathExists(topicDeletionTimePath)) {
deleteAndOverwriteInTransaction(pathsForRemoval, topicDeletionTimePath, Instant.now());
} else {
String groupDeletionTimePath = paths.groupTopicDeletionTimePath(topicName.getGroupName());
if (!pathExists(groupDeletionTimePath)) {
deleteAndCreateInTransaction(
pathsForRemoval,
List.of(groupDeletionTimePath),
topicDeletionTimePath,
Instant.now());
} else {
deleteAndCreateInTransaction(pathsForRemoval, topicDeletionTimePath, Instant.now());
}
}
} catch (Exception e) {
throw new InternalProcessingException(e);
}
Expand Down Expand Up @@ -192,6 +212,19 @@ private Optional<Topic> getTopicDetails(TopicName topicName, boolean quiet) {
quiet);
}

@Override
public void ensureTopicCanBeCreated(TopicName topicName) {
String topicDeletionTimePath = paths.topicDeletionTimePath(topicName);
if (pathExists(topicDeletionTimePath)) {
Instant deletionTime = readFrom(topicDeletionTimePath, Instant.class);
// TODO: make threshold configurable
Instant thresholdTime = deletionTime.plus(5, ChronoUnit.MINUTES);
if (Duration.between(Instant.now(), thresholdTime).toSeconds() > 0) {
throw new TopicDeletedRecentlyException(topicName, thresholdTime);
}
}
}

@Override
public List<Topic> getTopicsDetails(Collection<TopicName> topicNames) {
return topicNames.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper

import pl.allegro.tech.hermes.api.Group
import pl.allegro.tech.hermes.api.TopicName
import pl.allegro.tech.hermes.domain.group.GroupNotEmptyException
import pl.allegro.tech.hermes.domain.group.GroupNotExistsException
import pl.allegro.tech.hermes.infrastructure.MalformedDataException
Expand Down Expand Up @@ -83,6 +84,23 @@ class ZookeeperGroupRepositoryTest extends IntegrationTest {
thrown(GroupNotEmptyException)
}

def "should remove group when recently deleted a topic"() {
given:
Group group = group('removeGroup').build()
repository.createGroup(group)
wait.untilGroupCreated('removeGroup')
topicRepository.createTopic(topic('removeGroup', 'remove').build())
wait.untilTopicCreated('removeGroup', 'remove')
topicRepository.removeTopic(new TopicName('removeGroup', 'remove'))
wait.untilTopicRemoved('removeGroup', 'remove')

when:
repository.removeGroup('removeGroup')

then:
!repository.listGroupNames().contains('removeGroup')
}

def "should not throw exception on malformed topic when reading list of all topics"() {
given:
zookeeper().create().forPath(paths.groupPath('malformedGroup'), ''.bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.allegro.tech.hermes.api.TopicName
import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterStorage
import pl.allegro.tech.hermes.domain.group.GroupNotExistsException
import pl.allegro.tech.hermes.domain.topic.TopicAlreadyExistsException
import pl.allegro.tech.hermes.domain.topic.TopicDeletedRecentlyException
import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException
import pl.allegro.tech.hermes.infrastructure.MalformedDataException
import pl.allegro.tech.hermes.test.IntegrationTest
Expand All @@ -24,7 +25,19 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
void setup() {
if (!groupRepository.groupExists(GROUP)) {
groupRepository.createGroup(Group.from(GROUP))
} else {
String malformedTopicPath = paths.topicPath(new TopicName(GROUP, 'malformed'));
if (repository.pathExists(malformedTopicPath)) {
zookeeper().delete().forPath(malformedTopicPath);
}
for (name in repository.listTopicNames(GROUP)) {
repository.removeTopic(new TopicName(GROUP, name))
wait.untilTopicRemoved(GROUP, name)
}
groupRepository.removeGroup(GROUP)
groupRepository.createGroup(Group.from(GROUP))
}
wait.untilGroupCreated(GROUP)
}

def "should create topic"() {
Expand Down Expand Up @@ -231,6 +244,7 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {

def "should not throw exception on malformed topic when reading list of all topics"() {
given:

zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes)
wait.untilTopicCreated(GROUP, 'malformed')

Expand All @@ -240,4 +254,19 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest {
then:
notThrown(MalformedDataException)
}
}

def "should throw exception removing a topic and immediately attempting to recreate it"() {
given:
repository.createTopic(topic(GROUP, 'remove').build())
wait.untilTopicCreated(GROUP, 'remove')
repository.removeTopic(new TopicName(GROUP, 'remove'))
wait.untilTopicRemoved(GROUP, 'remove')

when:
repository.createTopic(topic(GROUP, 'remove').build())

then:
thrown(TopicDeletedRecentlyException)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class RepositoryWaiter extends ZookeeperWaiter {
untilZookeeperPathIsCreated(paths.topicPath(new TopicName(groupName, topicName)))
}

void untilTopicRemoved(String groupName, String topicName) {
untilZookeeperPathIsCreated(paths.topicDeletionTimePath(new TopicName(groupName, topicName)))
}

void untilSubscriptionCreated(TopicName topic, String subscription) {
untilZookeeperPathIsCreated(paths.subscriptionPath(topic, subscription))
}
Expand Down
Loading