Skip to content

Commit

Permalink
add ability to perform periodic storage checks
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Aug 2, 2024
1 parent 4292174 commit 35ecd5f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;


public class ConsistencyMetrics {
private final MeterRegistry meterRegistry;

ConsistencyMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

public <T> void registerStorageConsistencyGauge(boolean isStorageConsistent) {
double value = isStorageConsistent ? 1 : 0;
meterRegistry.gauge("storage.consistency", value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class MetricsFacade {
private final OffsetCommitsMetrics offsetCommitsMetrics;
private final MaxRateMetrics maxRateMetrics;
private final BrokerMetrics brokerMetrics;
private final ConsistencyMetrics consistencyMetrics;

public MetricsFacade(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
Expand All @@ -45,6 +46,7 @@ public MetricsFacade(MeterRegistry meterRegistry) {
this.offsetCommitsMetrics = new OffsetCommitsMetrics(meterRegistry);
this.maxRateMetrics = new MaxRateMetrics(meterRegistry);
this.brokerMetrics = new BrokerMetrics(meterRegistry);
this.consistencyMetrics = new ConsistencyMetrics(meterRegistry);
}

public TopicMetrics topics() {
Expand Down Expand Up @@ -107,6 +109,10 @@ public BrokerMetrics broker() {
return brokerMetrics;
}

public ConsistencyMetrics consistency() {
return consistencyMetrics;
}

public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) {
Collection<Meter> meters = Search.in(meterRegistry)
.tags(subscriptionTags(subscription))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import org.springframework.boot.context.properties.ConfigurationProperties;

import java.time.Duration;

@ConfigurationProperties(prefix = "consistency-checker")
public class ConsistencyCheckerProperties {

private int threadPoolSize = 2;
private boolean periodicCheckEnabled = false;
private Duration refreshInterval = Duration.ofMinutes(15);

public int getThreadPoolSize() {
return threadPoolSize;
Expand All @@ -14,4 +18,22 @@ public int getThreadPoolSize() {
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}


public boolean isPeriodicCheckEnabled() {
return periodicCheckEnabled;
}

public void setPeriodicCheckEnabled(boolean periodicCheckEnabled) {
this.periodicCheckEnabled = periodicCheckEnabled;
}


public Duration getRefreshInterval() {
return refreshInterval;
}

public void setRefreshInterval(Duration refreshInterval) {
this.refreshInterval = refreshInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.Group;
import pl.allegro.tech.hermes.api.InconsistentGroup;
Expand All @@ -13,6 +15,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.domain.group.GroupNotExistsException;
import pl.allegro.tech.hermes.domain.group.GroupRepository;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
Expand All @@ -31,6 +34,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand All @@ -39,32 +44,55 @@

@Component
public class DcConsistencyService {
private static final Logger logger = LoggerFactory.getLogger(DcConsistencyService.class);

private final ExecutorService executor;
private final ScheduledExecutorService scheduler;
private final List<DatacenterBoundRepositoryHolder<GroupRepository>> groupRepositories;
private final List<DatacenterBoundRepositoryHolder<TopicRepository>> topicRepositories;
private final List<DatacenterBoundRepositoryHolder<SubscriptionRepository>> subscriptionRepositories;
private final ObjectMapper objectMapper;
private final MetricsFacade metricsFacade;

public DcConsistencyService(RepositoryManager repositoryManager,
ObjectMapper objectMapper,
ConsistencyCheckerProperties properties) {
ConsistencyCheckerProperties properties,
MetricsFacade metricsFacade) {
this.groupRepositories = repositoryManager.getRepositories(GroupRepository.class);
this.topicRepositories = repositoryManager.getRepositories(TopicRepository.class);
this.subscriptionRepositories = repositoryManager.getRepositories(SubscriptionRepository.class);
this.objectMapper = objectMapper;
this.metricsFacade = metricsFacade;
this.executor = Executors.newFixedThreadPool(
properties.getThreadPoolSize(),
new ThreadFactoryBuilder()
.setNameFormat("consistency-checker-%d")
.build()
);
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("consistency-checker-scheduler-%d")
.build()
);
if (properties.isPeriodicCheckEnabled()) {
scheduler.scheduleAtFixedRate(this::reportConsistency, 0, properties.getRefreshInterval().getSeconds(), TimeUnit.SECONDS);
}
}

@PreDestroy
public void stop() {
executor.shutdown();
}

private void reportConsistency() {
long start = System.currentTimeMillis();
Set<String> groups = listAllGroupNames();
List<InconsistentGroup> inconsistentGroups = listInconsistentGroups(groups);
long durationSeconds = (System.currentTimeMillis() - start) / 1000;
logger.info("Consistency check finished in {}s, number of inconsistent groups: {}", durationSeconds, inconsistentGroups.size());
metricsFacade.consistency().registerStorageConsistencyGauge(inconsistentGroups.isEmpty());
}

public List<InconsistentGroup> listInconsistentGroups(Set<String> groupNames) {
List<InconsistentGroup> inconsistentGroups = new ArrayList<>();
for (MetadataCopies copies : listCopiesOfGroups(groupNames)) {
Expand Down Expand Up @@ -208,4 +236,6 @@ private <T> T resolveFuture(Future<T> future) {
throw new ConsistencyCheckingException("Fetching metadata failed", e);
}
}


}

0 comments on commit 35ecd5f

Please sign in to comment.