Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/foundation-2022' into foundation…
Browse files Browse the repository at this point in the history
…-2023
  • Loading branch information
opennms-bamboo committed Jul 12, 2023
2 parents 8666653 + b008cd4 commit 7668335
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2023 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2023 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <[email protected]>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.core.ipc.sink.kafka.server;

import org.opennms.core.ipc.sink.api.MessageConsumerManager;
import org.opennms.core.ipc.sink.common.SinkStrategy;
import org.opennms.core.logging.Logging;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ConfigurationCondition;
import org.springframework.context.annotation.ImportResource;
import org.springframework.core.type.AnnotatedTypeMetadata;

import static org.opennms.core.ipc.common.kafka.KafkaSinkConstants.KAFKA_COMMON_CONFIG_SYS_PROP_PREFIX;
import static org.opennms.core.ipc.common.kafka.KafkaSinkConstants.KAFKA_CONFIG_SYS_PROP_PREFIX;
import static org.opennms.core.ipc.sink.common.SinkStrategy.Strategy.KAFKA;

@Configuration
@Conditional(ConditionalKafkaOffsetContext.Condition.class)
@ImportResource("/META-INF/opennms/applicationContext-ipc-offset-provider.xml")
public class ConditionalKafkaOffsetContext {

private static final Logger LOG = LoggerFactory.getLogger(ConditionalKafkaOffsetContext.class);

static class Condition implements ConfigurationCondition {
@Override
public ConfigurationPhase getConfigurationPhase() {
return ConfigurationPhase.PARSE_CONFIGURATION;
}

@Override
public boolean matches(final ConditionContext context, final AnnotatedTypeMetadata metadata) {
final boolean kafkaSinkEnabled = KAFKA.equals(SinkStrategy.getSinkStrategy());
if (!kafkaSinkEnabled) {
return false;
}
// Default to enabled, and require an explicit value to disable
boolean disabled = Boolean.getBoolean(KAFKA_CONFIG_SYS_PROP_PREFIX + "offset.disabled");
if (!disabled) {
disabled = Boolean.getBoolean(KAFKA_COMMON_CONFIG_SYS_PROP_PREFIX + "offset.disabled");
}
final var offsetEnabled = !disabled;
try (Logging.MDCCloseable mdc = Logging.withPrefixCloseable(MessageConsumerManager.LOG_PREFIX)) {
if (offsetEnabled) {
LOG.debug("Kafka offset provider is enabled.");
} else {
LOG.debug("Kafka offset provider is disabled.");
}
}
return offsetEnabled;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:onmsgi="http://xmlns.opennms.org/xsd/spring/onms-osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd
http://xmlns.opennms.org/xsd/spring/onms-osgi http://xmlns.opennms.org/xsd/spring/onms-osgi.xsd
">

<context:annotation-config />

<bean id="kafkaOffsetProvider" class="org.opennms.core.ipc.sink.kafka.server.offset.KafkaOffsetProvider" init-method="start" destroy-method="stop" >
<property name="metricRegistry" ref="kafkaLagMetricRegistry"/>
</bean>

<!-- Kafka Lag Metrics -->
<bean id="kafkaLagMetricRegistry" class="com.codahale.metrics.MetricRegistry"/>
<onmsgi:service ref="kafkaLagMetricRegistry" interface="com.codahale.metrics.MetricSet" >
<onmsgi:service-properties>
<entry>
<key><value>name</value></key>
<value>Kafka Lag</value>
</entry>
<entry>
<key><value>description</value></key>
<value>Metrics related to Kafka Lag</value>
</entry>
</onmsgi:service-properties>
</onmsgi:service>


</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
<onmsgi:service ref="kafkaMessageConsumerManager" interface="org.opennms.core.ipc.sink.api.MessageConsumerManager" />


<bean id="kafkaOffsetProvider" class="org.opennms.core.ipc.sink.kafka.server.offset.KafkaOffsetProvider" init-method="start" destroy-method="stop" >
<property name="metricRegistry" ref="kafkaLagMetricRegistry"/>
</bean>

<!-- Sink Metrics -->
<bean id="kafkaSinkMetricRegistry" class="com.codahale.metrics.MetricRegistry"/>
<onmsgi:service ref="kafkaSinkMetricRegistry" interface="com.codahale.metrics.MetricSet" >
Expand All @@ -40,19 +36,6 @@
</onmsgi:service-properties>
</onmsgi:service>

<!-- Kafka Lag Metrics -->
<bean id="kafkaLagMetricRegistry" class="com.codahale.metrics.MetricRegistry"/>
<onmsgi:service ref="kafkaLagMetricRegistry" interface="com.codahale.metrics.MetricSet" >
<onmsgi:service-properties>
<entry>
<key><value>name</value></key>
<value>Kafka Lag</value>
</entry>
<entry>
<key><value>description</value></key>
<value>Metrics related to Kafka Lag</value>
</entry>
</onmsgi:service-properties>
</onmsgi:service>


</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@
<!-- Conditionally load the Kafka Sink -->
<bean class="org.opennms.core.ipc.sink.kafka.server.ConditionalKafkaSinkContext"/>

<!-- Conditionally load the Kafka Offset -->
<bean class="org.opennms.core.ipc.sink.kafka.server.ConditionalKafkaOffsetContext"/>
</beans>
14 changes: 13 additions & 1 deletion docs/modules/reference/pages/configuration/tuning-kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,16 @@ single-topic=false
----
org.opennms.core.ipc.rpc.kafka.single-topic=false
----
Create this file if it does not already exist.
Create this file if it does not already exist.

== Disable the Kafka offset provider

{page-component-title} Core automatically monitors consumer offsets on Kafka to provide lag-related metrics.

To disable the monitoring of these metrics you can set the following property:

.Disable offset provider on `$\{OPENNMS_HOME}/etc/opennms.properties.d/kafka.properties`
[source, properties]
----
org.opennms.core.ipc.kafka.offset.disabled=true
----

0 comments on commit 7668335

Please sign in to comment.