From dab0fdab2f4cd88909bf39e524a0476d4f15c887 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 21 Aug 2024 17:28:34 +0200 Subject: [PATCH] Keep track of HTTPServer instances (#40) Signed-off-by: Mickael Maison --- .../io/strimzi/kafka/metrics/HttpServers.java | 91 +++++++++++++++++++ .../KafkaPrometheusMetricsReporter.java | 8 +- .../io/strimzi/kafka/metrics/Listener.java | 74 +++++++++++++++ .../PrometheusMetricsReporterConfig.java | 76 ++-------------- .../kafka/metrics/HttpServersTest.java | 62 +++++++++++++ .../strimzi/kafka/metrics/ListenerTest.java | 54 +++++++++++ .../PrometheusMetricsReporterConfigTest.java | 67 +++++--------- .../YammerPrometheusMetricsReporterTest.java | 9 +- 8 files changed, 321 insertions(+), 120 deletions(-) create mode 100644 src/main/java/io/strimzi/kafka/metrics/HttpServers.java create mode 100644 src/main/java/io/strimzi/kafka/metrics/Listener.java create mode 100644 src/test/java/io/strimzi/kafka/metrics/HttpServersTest.java create mode 100644 src/test/java/io/strimzi/kafka/metrics/ListenerTest.java diff --git a/src/main/java/io/strimzi/kafka/metrics/HttpServers.java b/src/main/java/io/strimzi/kafka/metrics/HttpServers.java new file mode 100644 index 0000000..ce218f1 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/metrics/HttpServers.java @@ -0,0 +1,91 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import io.prometheus.metrics.exporter.httpserver.HTTPServer; +import io.prometheus.metrics.model.registry.PrometheusRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Class to keep track of all the HTTP servers started by all the Kafka components in a JVM. + */ +public class HttpServers { + + private final static Logger LOG = LoggerFactory.getLogger(HttpServers.class); + private static final Map SERVERS = new HashMap<>(); + + /** + * Get or create a new HTTP server if there isn't an existing instance for the specified listener. + * @param listener The host and port + * @param registry The Prometheus registry to expose + * @return A ServerCounter instance + * @throws IOException if the HTTP server does not exist and cannot be started + */ + public synchronized static ServerCounter getOrCreate(Listener listener, PrometheusRegistry registry) throws IOException { + ServerCounter serverCounter = SERVERS.get(listener); + if (serverCounter == null) { + serverCounter = new ServerCounter(listener, registry); + SERVERS.put(listener, serverCounter); + } + serverCounter.count.incrementAndGet(); + return serverCounter; + } + + /** + * Release an HTTP server instance. If no other components hold this instance, it is closed. + * @param serverCounter The HTTP server instance to release + */ + public synchronized static void release(ServerCounter serverCounter) { + if (serverCounter.close()) { + SERVERS.remove(serverCounter.listener); + } + } + + /** + * Class used to keep track of the HTTP server started on a listener. + */ + public static class ServerCounter { + + private final AtomicInteger count; + private final HTTPServer server; + private final Listener listener; + + private ServerCounter(Listener listener, PrometheusRegistry registry) throws IOException { + this.count = new AtomicInteger(); + this.server = HTTPServer.builder() + .hostname(listener.host) + .port(listener.port) + .registry(registry) + .buildAndStart(); + LOG.debug("Started HTTP server on http://{}:{}", listener.host, server.getPort()); + this.listener = listener; + } + + /** + * The port this HTTP server instance is listening on. If the listener port is 0, this returns the actual port + * that is used. + * @return The port number + */ + public int port() { + return server.getPort(); + } + + private synchronized boolean close() { + int remaining = count.decrementAndGet(); + if (remaining == 0) { + server.close(); + LOG.debug("Stopped HTTP server on http://{}:{}", listener.host, server.getPort()); + return true; + } + return false; + } + } +} diff --git a/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java b/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java index 1879717..f6a1134 100644 --- a/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java +++ b/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java @@ -5,7 +5,6 @@ package io.strimzi.kafka.metrics; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.prometheus.metrics.exporter.httpserver.HTTPServer; import io.prometheus.metrics.instrumentation.jvm.JvmMetrics; import io.prometheus.metrics.model.registry.PrometheusRegistry; import io.prometheus.metrics.model.snapshots.PrometheusNaming; @@ -36,7 +35,7 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter { @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method private PrometheusMetricsReporterConfig config; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method - private Optional httpServer; + private Optional httpServer; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method private String prefix; @@ -57,7 +56,7 @@ public void configure(Map map) { config = new PrometheusMetricsReporterConfig(map, registry); collector = new KafkaMetricsCollector(); // Add JVM metrics - JvmMetrics.builder().register(registry); + JvmMetrics.builder().register(); httpServer = config.startHttpServer(); LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config); } @@ -88,6 +87,7 @@ public void metricRemoval(KafkaMetric metric) { @Override public void close() { registry.unregister(collector); + httpServer.ifPresent(HttpServers::release); } @Override @@ -111,6 +111,6 @@ public void contextChange(MetricsContext metricsContext) { // for testing Optional getPort() { - return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().getPort() : null); + return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().port() : null); } } diff --git a/src/main/java/io/strimzi/kafka/metrics/Listener.java b/src/main/java/io/strimzi/kafka/metrics/Listener.java new file mode 100644 index 0000000..b43fa09 --- /dev/null +++ b/src/main/java/io/strimzi/kafka/metrics/Listener.java @@ -0,0 +1,74 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig.LISTENER_CONFIG; + +/** + * Class parsing and handling the listener specified via {@link PrometheusMetricsReporterConfig#LISTENER_CONFIG} for + * the HTTP server used to expose the metrics. + */ +public class Listener { + + private static final Pattern PATTERN = Pattern.compile("http://\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)"); + + final String host; + final int port; + + /* test */ Listener(String host, int port) { + this.host = host; + this.port = port; + } + + static Listener parseListener(String listener) { + Matcher matcher = PATTERN.matcher(listener); + if (matcher.matches()) { + String host = matcher.group(1); + int port = Integer.parseInt(matcher.group(2)); + return new Listener(host, port); + } else { + throw new ConfigException(LISTENER_CONFIG, listener, "Listener must be of format http://[host]:[port]"); + } + } + + @Override + public String toString() { + return "http://" + host + ":" + port; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Listener listener = (Listener) o; + return port == listener.port && Objects.equals(host, listener.host); + } + + @Override + public int hashCode() { + return Objects.hash(host, port); + } + + /** + * Validator to check the user provided listener configuration + */ + static class ListenerValidator implements ConfigDef.Validator { + + @Override + public void ensureValid(String name, Object value) { + Matcher matcher = PATTERN.matcher(String.valueOf(value)); + if (!matcher.matches()) { + throw new ConfigException(name, value, "Listener must be of format http://[host]:[port]"); + } + } + } +} diff --git a/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java b/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java index baa7d53..351810d 100644 --- a/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java +++ b/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java @@ -4,7 +4,6 @@ */ package io.strimzi.kafka.metrics; -import io.prometheus.metrics.exporter.httpserver.HTTPServer; import io.prometheus.metrics.model.registry.PrometheusRegistry; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -13,12 +12,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.BindException; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -65,7 +61,7 @@ public class PrometheusMetricsReporterConfig extends AbstractConfig { private static final String ALLOWLIST_CONFIG_DOC = "A comma separated list of regex patterns to specify the metrics to collect."; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(LISTENER_CONFIG, ConfigDef.Type.STRING, LISTENER_CONFIG_DEFAULT, new ListenerValidator(), ConfigDef.Importance.HIGH, LISTENER_CONFIG_DOC) + .define(LISTENER_CONFIG, ConfigDef.Type.STRING, LISTENER_CONFIG_DEFAULT, new Listener.ListenerValidator(), ConfigDef.Importance.HIGH, LISTENER_CONFIG_DOC) .define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, ALLOWLIST_CONFIG_DOC) .define(LISTENER_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, LISTENER_ENABLE_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, LISTENER_ENABLE_CONFIG_DOC); @@ -140,80 +136,20 @@ public String toString() { /** * Start the HTTP server for exposing metrics. * - * @return An optional HTTPServer instance if started successfully, otherwise empty. + * @return An optional ServerCounter instance if {@link #LISTENER_ENABLE_CONFIG} is enabled, otherwise empty. */ - public synchronized Optional startHttpServer() { + public synchronized Optional startHttpServer() { if (!listenerEnabled) { LOG.info("HTTP server listener not enabled"); return Optional.empty(); } try { - HTTPServer httpServer = HTTPServer.builder() - .hostname(listener.host) - .port(listener.port) - .registry(registry) - .buildAndStart(); - LOG.info("HTTP server started on listener http://{}:{}", listener.host, httpServer.getPort()); - return Optional.of(httpServer); - } catch (BindException be) { - LOG.info("HTTP server already started"); - return Optional.empty(); + HttpServers.ServerCounter server = HttpServers.getOrCreate(listener, registry); + LOG.info("HTTP server listening on http://{}:{}", listener.host, server.port()); + return Optional.of(server); } catch (IOException ioe) { LOG.error("Failed starting HTTP server", ioe); throw new RuntimeException(ioe); } } - - static class Listener { - - private static final Pattern PATTERN = Pattern.compile("http://\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)"); - - final String host; - final int port; - - Listener(String host, int port) { - this.host = host; - this.port = port; - } - - static Listener parseListener(String listener) { - Matcher matcher = PATTERN.matcher(listener); - if (matcher.matches()) { - String host = matcher.group(1); - int port = Integer.parseInt(matcher.group(2)); - return new Listener(host, port); - } else { - throw new ConfigException(LISTENER_CONFIG, listener, "Listener must be of format http://[host]:[port]"); - } - } - - @Override - public String toString() { - return "http://" + host + ":" + port; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Listener listener = (Listener) o; - return port == listener.port && Objects.equals(host, listener.host); - } - - @Override - public int hashCode() { - return Objects.hash(host, port); - } - } - - static class ListenerValidator implements ConfigDef.Validator { - - @Override - public void ensureValid(String name, Object value) { - Matcher matcher = Listener.PATTERN.matcher(String.valueOf(value)); - if (!matcher.matches()) { - throw new ConfigException(name, value, "Listener must be of format http://[host]:[port]"); - } - } - } } diff --git a/src/test/java/io/strimzi/kafka/metrics/HttpServersTest.java b/src/test/java/io/strimzi/kafka/metrics/HttpServersTest.java new file mode 100644 index 0000000..23b436e --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/HttpServersTest.java @@ -0,0 +1,62 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import io.prometheus.metrics.model.registry.PrometheusRegistry; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HttpServersTest { + + private final PrometheusRegistry registry = new PrometheusRegistry(); + + @Test + public void testLifecycle() throws IOException { + Listener listener1 = Listener.parseListener("http://localhost:0"); + HttpServers.ServerCounter server1 = HttpServers.getOrCreate(listener1, registry); + assertTrue(listenerStarted(listener1.host, server1.port())); + + Listener listener2 = Listener.parseListener("http://localhost:0"); + HttpServers.ServerCounter server2 = HttpServers.getOrCreate(listener2, registry); + assertTrue(listenerStarted(listener2.host, server2.port())); + assertSame(server1, server2); + + Listener listener3 = Listener.parseListener("http://127.0.0.1:0"); + HttpServers.ServerCounter server3 = HttpServers.getOrCreate(listener3, registry); + assertTrue(listenerStarted(listener3.host, server3.port())); + + HttpServers.release(server1); + assertTrue(listenerStarted(listener1.host, server1.port())); + assertTrue(listenerStarted(listener2.host, server2.port())); + assertTrue(listenerStarted(listener3.host, server3.port())); + + HttpServers.release(server2); + assertFalse(listenerStarted(listener1.host, server1.port())); + assertFalse(listenerStarted(listener2.host, server2.port())); + assertTrue(listenerStarted(listener3.host, server3.port())); + + HttpServers.release(server3); + assertFalse(listenerStarted(listener3.host, server3.port())); + } + + private boolean listenerStarted(String host, int port) { + try { + URL url = new URL("http://" + host + ":" + port + "/metrics"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("HEAD"); + con.connect(); + return con.getResponseCode() == HttpURLConnection.HTTP_OK; + } catch (IOException ioe) { + return false; + } + } +} diff --git a/src/test/java/io/strimzi/kafka/metrics/ListenerTest.java b/src/test/java/io/strimzi/kafka/metrics/ListenerTest.java new file mode 100644 index 0000000..eaf461c --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/ListenerTest.java @@ -0,0 +1,54 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +import static io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig.LISTENER_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ListenerTest { + + @Test + public void testListenerParseListener() { + assertEquals(new Listener("", 8080), Listener.parseListener("http://:8080")); + assertEquals(new Listener("123", 8080), Listener.parseListener("http://123:8080")); + assertEquals(new Listener("::1", 8080), Listener.parseListener("http://::1:8080")); + assertEquals(new Listener("::1", 8080), Listener.parseListener("http://[::1]:8080")); + assertEquals(new Listener("random", 8080), Listener.parseListener("http://random:8080")); + + assertThrows(ConfigException.class, () -> Listener.parseListener("http")); + assertThrows(ConfigException.class, () -> Listener.parseListener("http://")); + assertThrows(ConfigException.class, () -> Listener.parseListener("http://random")); + assertThrows(ConfigException.class, () -> Listener.parseListener("http://random:")); + assertThrows(ConfigException.class, () -> Listener.parseListener("http://:-8080")); + assertThrows(ConfigException.class, () -> Listener.parseListener("http://random:-8080")); + assertThrows(ConfigException.class, () -> Listener.parseListener("http://:8080random")); + assertThrows(ConfigException.class, () -> Listener.parseListener("randomhttp://:8080random")); + assertThrows(ConfigException.class, () -> Listener.parseListener("randomhttp://:8080")); + } + + @Test + public void testValidator() { + Listener.ListenerValidator validator = new Listener.ListenerValidator(); + validator.ensureValid(LISTENER_CONFIG, "http://:0"); + validator.ensureValid(LISTENER_CONFIG, "http://123:8080"); + validator.ensureValid(LISTENER_CONFIG, "http://::1:8080"); + validator.ensureValid(LISTENER_CONFIG, "http://[::1]:8080"); + validator.ensureValid(LISTENER_CONFIG, "http://random:8080"); + + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://random")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://random:")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://:-8080")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://random:-8080")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://:8080random")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "randomhttp://:8080random")); + assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "randomhttp://:8080")); + } +} diff --git a/src/test/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfigTest.java b/src/test/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfigTest.java index dd3b9b5..52e0044 100644 --- a/src/test/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfigTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfigTest.java @@ -4,11 +4,11 @@ */ package io.strimzi.kafka.metrics; -import io.prometheus.metrics.exporter.httpserver.HTTPServer; import io.prometheus.metrics.model.registry.PrometheusRegistry; import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; +import java.net.BindException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -20,6 +20,7 @@ import static java.util.Collections.singletonMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -59,66 +60,46 @@ public void testAllowList() { () -> new PrometheusMetricsReporterConfig(singletonMap(ALLOWLIST_CONFIG, "hello\\,world"), null)); } - @Test - public void testListenerParseListener() { - assertEquals(new PrometheusMetricsReporterConfig.Listener("", 8080), PrometheusMetricsReporterConfig.Listener.parseListener("http://:8080")); - assertEquals(new PrometheusMetricsReporterConfig.Listener("123", 8080), PrometheusMetricsReporterConfig.Listener.parseListener("http://123:8080")); - assertEquals(new PrometheusMetricsReporterConfig.Listener("::1", 8080), PrometheusMetricsReporterConfig.Listener.parseListener("http://::1:8080")); - assertEquals(new PrometheusMetricsReporterConfig.Listener("::1", 8080), PrometheusMetricsReporterConfig.Listener.parseListener("http://[::1]:8080")); - assertEquals(new PrometheusMetricsReporterConfig.Listener("random", 8080), PrometheusMetricsReporterConfig.Listener.parseListener("http://random:8080")); - - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("http")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("http://")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("http://random")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("http://random:")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("http://:-8080")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("http://random:-8080")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("http://:8080random")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("randomhttp://:8080random")); - assertThrows(ConfigException.class, () -> PrometheusMetricsReporterConfig.Listener.parseListener("randomhttp://:8080")); - } - - @Test - public void testValidator() { - PrometheusMetricsReporterConfig.ListenerValidator validator = new PrometheusMetricsReporterConfig.ListenerValidator(); - validator.ensureValid(LISTENER_CONFIG, "http://:0"); - validator.ensureValid(LISTENER_CONFIG, "http://123:8080"); - validator.ensureValid(LISTENER_CONFIG, "http://::1:8080"); - validator.ensureValid(LISTENER_CONFIG, "http://[::1]:8080"); - validator.ensureValid(LISTENER_CONFIG, "http://random:8080"); - - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://random")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://random:")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://:-8080")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://random:-8080")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "http://:8080random")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "randomhttp://:8080random")); - assertThrows(ConfigException.class, () -> validator.ensureValid(LISTENER_CONFIG, "randomhttp://:8080")); - } - @Test public void testIsListenerEnabled() { Map props = new HashMap<>(); props.put(LISTENER_ENABLE_CONFIG, "true"); props.put(LISTENER_CONFIG, "http://:0"); PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry()); - Optional httpServerOptional = config.startHttpServer(); + Optional httpServerOptional = config.startHttpServer(); assertTrue(config.isListenerEnabled()); assertTrue(httpServerOptional.isPresent()); - httpServerOptional.ifPresent(HTTPServer::close); + HttpServers.release(httpServerOptional.get()); } @Test public void testIsListenerDisabled() { Map props = singletonMap(LISTENER_ENABLE_CONFIG, false); PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry()); - Optional httpServerOptional = config.startHttpServer(); + Optional httpServerOptional = config.startHttpServer(); assertTrue(httpServerOptional.isEmpty()); assertFalse(config.isListenerEnabled()); } + + @Test + public void testStartHttpServer() { + Map props = new HashMap<>(); + props.put(LISTENER_CONFIG, "http://:0"); + PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry()); + Optional httpServerOptional = config.startHttpServer(); + assertTrue(httpServerOptional.isPresent()); + + PrometheusMetricsReporterConfig config2 = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry()); + Optional httpServerOptional2 = config2.startHttpServer(); + assertTrue(httpServerOptional2.isPresent()); + + props = new HashMap<>(); + props.put(LISTENER_CONFIG, "http://:" + httpServerOptional.get().port()); + PrometheusMetricsReporterConfig config3 = new PrometheusMetricsReporterConfig(props, new PrometheusRegistry()); + Exception exc = assertThrows(RuntimeException.class, config3::startHttpServer); + assertInstanceOf(BindException.class, exc.getCause()); + } } diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java index 852c327..7844dae 100644 --- a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java @@ -9,7 +9,6 @@ import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; -import io.prometheus.metrics.exporter.httpserver.HTTPServer; import io.prometheus.metrics.model.registry.PrometheusRegistry; import kafka.utils.VerifiableProperties; import org.junit.jupiter.api.BeforeEach; @@ -45,8 +44,10 @@ public void testLifeCycle() throws Exception { configs.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_type.*"); reporter.init(new VerifiableProperties(configs)); - try (HTTPServer httpServer = reporter.config.startHttpServer().orElseThrow()) { - int port = httpServer.getPort(); + HttpServers.ServerCounter httpServer = null; + try { + httpServer = reporter.config.startHttpServer().orElseThrow(); + int port = httpServer.port(); assertEquals(0, getMetrics(port).size()); // Adding a metric not matching the allowlist does nothing @@ -64,6 +65,8 @@ public void testLifeCycle() throws Exception { removeMetric("group", "type", "name"); metrics = getMetrics(port); assertEquals(0, metrics.size()); + } finally { + if (httpServer != null) HttpServers.release(httpServer); } }