diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/TopicDeletedRecentlyException.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/TopicDeletedRecentlyException.java index 8925290c54..7de9cc4397 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/TopicDeletedRecentlyException.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/domain/topic/TopicDeletedRecentlyException.java @@ -1,23 +1,21 @@ package pl.allegro.tech.hermes.domain.topic; import java.time.Instant; - import pl.allegro.tech.hermes.api.ErrorCode; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.common.exception.HermesException; - public class TopicDeletedRecentlyException extends HermesException { -public TopicDeletedRecentlyException(TopicName topicName, Instant thresholdTime) { - super(String.format("Topic %s cannot created until %s", topicName.qualifiedName(), thresholdTime.toString())); -} - - -@Override -public ErrorCode getCode() { // TODO Auto-generated method stub - return ErrorCode.TOPIC_CREATED_RECENTLY; -} - + public TopicDeletedRecentlyException(TopicName topicName, Instant thresholdTime) { + super( + String.format( + "Topic %s cannot created until %s", + topicName.qualifiedName(), thresholdTime.toString())); + } + + @Override + public ErrorCode getCode() { + return ErrorCode.TOPIC_CREATED_RECENTLY; + } } - diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java index 94935c7a93..ce93d9dcea 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepository.java @@ -79,10 +79,13 @@ public void removeGroup(String groupName) { List pathsToDelete = List.of(paths.topicsPath(groupName), paths.groupPath(groupName)); String topicDeletionTimePath = paths.groupTopicDeletionTimePath(groupName); if (pathExists(topicDeletionTimePath)) { - // topicDeletionTimePath can contain a list of previously deleted topics - pathsToDelete = new ArrayList(pathsToDelete); - pathsToDelete.addAll(childrenPathsOf(topicDeletionTimePath)); - pathsToDelete.add(topicDeletionTimePath); + // topicDeletionTimePath can contain a list of previously deleted topics, need to delete those + // first + List pathsToDeleteWithDeletionTime = + new ArrayList(childrenPathsOf(topicDeletionTimePath)); + pathsToDeleteWithDeletionTime.add(topicDeletionTimePath); + pathsToDeleteWithDeletionTime.addAll(pathsToDelete); + pathsToDelete = pathsToDeleteWithDeletionTime; } try { deleteInTransaction(pathsToDelete); @@ -93,10 +96,7 @@ public void removeGroup(String groupName) { private void ensureGroupIsEmpty(String groupName) { String topicDeletionTimePath = paths.groupTopicDeletionTimePath(groupName); - if (!childrenOf(paths.topicsPath(groupName)).stream() - .filter(path -> !path.equals(topicDeletionTimePath)) - .collect(Collectors.toList()) - .isEmpty()) { + if (!childrenOf(paths.topicsPath(groupName)).stream().collect(Collectors.toList()).isEmpty()) { throw new GroupNotEmptyException(groupName); } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java index 194307ee85..698d8dfa23 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepository.java @@ -219,7 +219,7 @@ public void ensureTopicCanBeCreated(TopicName topicName) { Instant deletionTime = readFrom(topicDeletionTimePath, Instant.class); // TODO: make threshold configurable Instant thresholdTime = deletionTime.plus(5, ChronoUnit.MINUTES); - if (Duration.between(thresholdTime, Instant.now()).toSeconds() > 0) { + if (Duration.between(Instant.now(), thresholdTime).toSeconds() > 0) { throw new TopicDeletedRecentlyException(topicName, thresholdTime); } } diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepositoryTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepositoryTest.groovy index 172144eeb3..5b0f3aefa0 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepositoryTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperGroupRepositoryTest.groovy @@ -1,6 +1,7 @@ package pl.allegro.tech.hermes.infrastructure.zookeeper import pl.allegro.tech.hermes.api.Group +import pl.allegro.tech.hermes.api.TopicName import pl.allegro.tech.hermes.domain.group.GroupNotEmptyException import pl.allegro.tech.hermes.domain.group.GroupNotExistsException import pl.allegro.tech.hermes.infrastructure.MalformedDataException @@ -83,6 +84,23 @@ class ZookeeperGroupRepositoryTest extends IntegrationTest { thrown(GroupNotEmptyException) } + def "should remove group when recently deleted a topic"() { + given: + Group group = group('removeGroup').build() + repository.createGroup(group) + wait.untilGroupCreated('removeGroup') + topicRepository.createTopic(topic('removeGroup', 'remove').build()) + wait.untilTopicCreated('removeGroup', 'remove') + topicRepository.removeTopic(new TopicName('removeGroup', 'remove')) + wait.untilTopicRemoved('removeGroup', 'remove') + + when: + repository.removeGroup('removeGroup') + + then: + !repository.listGroupNames().contains('removeGroup') + } + def "should not throw exception on malformed topic when reading list of all topics"() { given: zookeeper().create().forPath(paths.groupPath('malformedGroup'), ''.bytes) diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy index 7ac1377790..2fe7ca02f9 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperTopicRepositoryTest.groovy @@ -6,6 +6,7 @@ import pl.allegro.tech.hermes.api.TopicName import pl.allegro.tech.hermes.common.metric.counter.zookeeper.ZookeeperCounterStorage import pl.allegro.tech.hermes.domain.group.GroupNotExistsException import pl.allegro.tech.hermes.domain.topic.TopicAlreadyExistsException +import pl.allegro.tech.hermes.domain.topic.TopicDeletedRecentlyException import pl.allegro.tech.hermes.domain.topic.TopicNotExistsException import pl.allegro.tech.hermes.infrastructure.MalformedDataException import pl.allegro.tech.hermes.test.IntegrationTest @@ -18,13 +19,25 @@ import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic class ZookeeperTopicRepositoryTest extends IntegrationTest { private static final String GROUP = "topicRepositoryGroup" + private static final String MALFORMED_TOPIC_GROUP = "topicRepositorMalformedTopicGroup" private ZookeeperTopicRepository repository = new ZookeeperTopicRepository(zookeeper(), mapper, paths, groupRepository) void setup() { + if (!groupRepository.groupExists(MALFORMED_TOPIC_GROUP)) { + groupRepository.createGroup(Group.from(MALFORMED_TOPIC_GROUP)) + } if (!groupRepository.groupExists(GROUP)) { groupRepository.createGroup(Group.from(GROUP)) + } else { + for (name in repository.listTopicNames(GROUP)) { + repository.removeTopic(new TopicName(GROUP, name)) + wait.untilTopicRemoved(GROUP, name) + } + groupRepository.removeGroup(GROUP) + groupRepository.createGroup(Group.from(GROUP)) } + wait.untilGroupCreated(GROUP) } def "should create topic"() { @@ -231,13 +244,29 @@ class ZookeeperTopicRepositoryTest extends IntegrationTest { def "should not throw exception on malformed topic when reading list of all topics"() { given: - zookeeper().create().forPath(paths.topicPath(new TopicName(GROUP, 'malformed')), ''.bytes) - wait.untilTopicCreated(GROUP, 'malformed') + + zookeeper().create().forPath(paths.topicPath(new TopicName(MALFORMED_TOPIC_GROUP, 'malformed')), ''.bytes) + wait.untilTopicCreated(MALFORMED_TOPIC_GROUP, 'malformed') when: - repository.listTopics(GROUP) + repository.listTopics(MALFORMED_TOPIC_GROUP) then: notThrown(MalformedDataException) } -} + + def "should throw exception removing a topic and immediately attempting to recreate it"() { + given: + repository.createTopic(topic(GROUP, 'remove').build()) + wait.untilTopicCreated(GROUP, 'remove') + repository.removeTopic(new TopicName(GROUP, 'remove')) + wait.untilTopicRemoved(GROUP, 'remove') + + when: + repository.createTopic(topic(GROUP, 'remove').build()) + + then: + thrown(TopicDeletedRecentlyException) + } + +} \ No newline at end of file diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/test/RepositoryWaiter.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/test/RepositoryWaiter.groovy index 6f82f3bb53..c180eab199 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/test/RepositoryWaiter.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/test/RepositoryWaiter.groovy @@ -27,6 +27,10 @@ class RepositoryWaiter extends ZookeeperWaiter { untilZookeeperPathIsCreated(paths.topicPath(new TopicName(groupName, topicName))) } + void untilTopicRemoved(String groupName, String topicName) { + untilZookeeperPathIsCreated(paths.topicDeletionTimePath(new TopicName(groupName, topicName))) + } + void untilSubscriptionCreated(TopicName topic, String subscription) { untilZookeeperPathIsCreated(paths.subscriptionPath(topic, subscription)) }