diff --git a/docs/modules/operation/pages/deep-dive/kafka-producer/configure-kafka.adoc b/docs/modules/operation/pages/deep-dive/kafka-producer/configure-kafka.adoc index 81f88cd2ae23..14bd4f369258 100644 --- a/docs/modules/operation/pages/deep-dive/kafka-producer/configure-kafka.adoc +++ b/docs/modules/operation/pages/deep-dive/kafka-producer/configure-kafka.adoc @@ -30,7 +30,6 @@ Set this to an empty string to disable forwarding nodes. Set this to an empty string to disable forwarding topologies. | nodes - | topologyVertexTopic | Name of the topic used for topology vertices. | vertices @@ -74,6 +73,11 @@ Set this to `false` to prevent suppressing these alarms. | startAlarmSyncWithCleanState | Set this to `true` to force the Kafka Streams client to start with a clean state on every boot. | false + +| alarmSync +| Set this to `false` to disable synchronization of the alarms topics. +This is automatically disabled when alarm forwarding is not enabled. +| true |=== == Configure filtering @@ -169,3 +173,24 @@ admin@opennms()> config:update ---- In the example above, we set the vertex and edge topics to be different by default. + +== Configure the alarm sync streams client + +When producing alarms to a topic, we also automatically enable a Kafka Stream client to help synchronize the contents of the topic and ensure eventual consistency with the database. + +The streams client takes different properties than the producer and requires a separate configuration map. + +We automatically pull known properties from the producer configuration, allowing it to work without further configuration in most cases. + +If your producer takes a special configuration directive, or you would like to tune the behavior of the stream client, you can set properties in `org.opennms.features.kafka.producer.streams`: + +[source, console] +---- +$ ssh -p 8101 admin@localhost +... +admin@opennms()> config:edit org.opennms.features.kafka.producer.streams +admin@opennms()> config:property-set default.dsl.store rocksDB +admin@opennms()> config:update +---- + +Any property set in `org.opennms.features.kafka.producer.streams` will override those inherited from `org.opennms.features.kafka.producer.client`. diff --git a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java index 38f5633c514f..09925891cacc 100644 --- a/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java +++ b/features/kafka/producer/src/main/java/org/opennms/features/kafka/producer/datasync/KafkaAlarmDataSync.java @@ -47,6 +47,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SecurityConfig; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StoreQueryParameters; @@ -285,18 +288,25 @@ public synchronized AlarmSyncResults handleAlarmSnapshot(List alarms) return results; } + + private Properties loadStreamsProperties() throws IOException { final Properties streamsProperties = new Properties(); // Default values streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-datasync"); Path kafkaDir = Paths.get(System.getProperty("karaf.data"), "kafka"); streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, kafkaDir.toString()); - // Copy kafka server info from client properties + // Copy common properties from client configuration, which should save the user from having to configure + // properties for the stream client 99% of time final Dictionary clientProperties = configAdmin.getConfiguration(OpennmsKafkaProducer.KAFKA_CLIENT_PID).getProperties(); if (clientProperties != null) { - streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, clientProperties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); + copyPropIfNonNull(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clientProperties, streamsProperties); + copyPropIfNonNull(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clientProperties, streamsProperties); + copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, clientProperties, streamsProperties); + copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, clientProperties, streamsProperties); } - // Add all of the stream properties, overriding the bootstrap servers if set + + // Now add all of the stream properties, overriding any of the properties inherited from the producer config final Dictionary properties = configAdmin.getConfiguration(KAFKA_STREAMS_PID).getProperties(); if (properties != null) { final Enumeration keys = properties.keys(); @@ -311,6 +321,13 @@ private Properties loadStreamsProperties() throws IOException { return streamsProperties; } + private static void copyPropIfNonNull(String propName, Dictionary sourceMap, Properties targetMap) { + Object propValue = sourceMap.get(propName); + if (propValue != null) { + targetMap.put(propName, propValue); + } + } + public void setAlarmTopic(String alarmTopic) { this.alarmTopic = alarmTopic; }