Skip to content

Commit

Permalink
Merge branch 'master' into allow-additional-queries-for-prometheus-me…
Browse files Browse the repository at this point in the history
…trics
  • Loading branch information
faderskd authored Oct 10, 2023
2 parents 7598d81 + a476171 commit 89e57a6
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package pl.allegro.tech.hermes.common.di.factories;

import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.prometheus.PrometheusConfig;
Expand Down Expand Up @@ -31,6 +35,7 @@ public PrometheusMeterRegistry provide() {
if (parameters.zookeeperReporterEnabled()) {
registerZookeeperReporter(meterRegistry);
}
registerJvmMetrics(meterRegistry);
return meterRegistry;
}

Expand All @@ -55,4 +60,10 @@ private void registerZookeeperReporter(PrometheusMeterRegistry meterRegistry) {
new ZookeeperCounterReporter(meterRegistry, counterStorage, prefix)
.start(parameters.zookeeperReportPeriod().toSeconds(), TimeUnit.SECONDS);
}

private void registerJvmMetrics(MeterRegistry meterRegistry) {
new JvmMemoryMetrics().bindTo(meterRegistry);
new JvmGcMetrics().bindTo(meterRegistry);
new JvmThreadMetrics().bindTo(meterRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package pl.allegro.tech.hermes.common.metric.executor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -14,8 +11,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.newScheduledThreadPool;

public class InstrumentedExecutorServiceFactory {

private final ThreadPoolMetrics threadPoolMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ private Retryer<MessageSendingResult> createRetryer(final MessageBatch batch,
.retryIfRuntimeException()
.retryIfResult(result -> consuming && !result.succeeded() && shouldRetryOnClientError(retryClientErrors, result))
.withWaitStrategy(fixedWait(messageBackoff, MILLISECONDS))
.withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis)
.withStopStrategy(attempt -> attempt.getDelaySinceFirstAttempt() > messageTtlMillis
|| Thread.currentThread().isInterrupted())
.withRetryListener(getRetryListener(result -> {
batch.incrementRetryCounter();
markSendingResult(batch, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

public interface Consumer {

/**
* Consume **must** make sure that interrupted status is restored as it is needed for stopping unhealthy consumers.
* Swallowing the interrupt by consume or any of its dependencies will result in consumer being marked
* as unhealthy and will prevent commits despite messages being sent to subscribers.
*/
void consume(Runnable signalsInterrupt);

void initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public void consume(Runnable signalsInterrupt) {
} else {
inflightSemaphore.release();
}
} catch (InterruptedException e) {
logger.info("Restoring interrupted status {}", subscription.getQualifiedName(), e);
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Consumer loop failed for {}", subscription.getQualifiedName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.apache.http.protocol.HTTP;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.EndpointAddress;
import pl.allegro.tech.hermes.api.EndpointAddressResolverMetadata;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
Expand All @@ -29,6 +31,8 @@

public class JettyMessageBatchSender implements MessageBatchSender {

private static final Logger logger = LoggerFactory.getLogger(JettyMessageBatchSender.class);

private final BatchHttpRequestFactory requestFactory;
private final EndpointAddressResolver resolver;
private final SendingResultHandlers resultHandlers;
Expand Down Expand Up @@ -64,6 +68,10 @@ private MessageSendingResult send(MessageBatch batch, URI address, int requestTi
ContentResponse response = request.send();
return resultHandlers.handleSendingResultForBatch(response);
} catch (TimeoutException | ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
logger.info("Restoring interrupted status", e);
}
throw new HttpBatchSenderException("Failed to send message batch", e);
}
}
Expand Down

0 comments on commit 89e57a6

Please sign in to comment.