Skip to content

Commit

Permalink
NMS-15859: Inherit basic SSL config props from the producer to the st…
Browse files Browse the repository at this point in the history
…ream client.
  • Loading branch information
j-white authored Aug 2, 2023
1 parent dbd91ec commit bc81514
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ Set this to an empty string to disable forwarding nodes.
Set this to an empty string to disable forwarding topologies.
| nodes


| topologyVertexTopic
| Name of the topic used for topology vertices.
| vertices
Expand Down Expand Up @@ -74,6 +73,11 @@ Set this to `false` to prevent suppressing these alarms.
| startAlarmSyncWithCleanState
| Set this to `true` to force the Kafka Streams client to start with a clean state on every boot.
| false

| alarmSync
| Set this to `false` to disable synchronization of the alarms topics.
This is automatically disabled when alarm forwarding is not enabled.
| true
|===

== Configure filtering
Expand Down Expand Up @@ -169,3 +173,24 @@ admin@opennms()> config:update
----

In the example above, we set the vertex and edge topics to be different by default.

== Configure the alarm sync streams client

When producing alarms to a topic, we also automatically enable a Kafka Stream client to help synchronize the contents of the topic and ensure eventual consistency with the database.

The streams client takes different properties than the producer and requires a separate configuration map.

We automatically pull known properties from the producer configuration, allowing it to work without further configuration in most cases.

If your producer takes a special configuration directive, or you would like to tune the behavior of the stream client, you can set properties in `org.opennms.features.kafka.producer.streams`:

[source, console]
----
$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer.streams
admin@opennms()> config:property-set default.dsl.store rocksDB
admin@opennms()> config:update
----

Any property set in `org.opennms.features.kafka.producer.streams` will override those inherited from `org.opennms.features.kafka.producer.client`.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
Expand Down Expand Up @@ -285,18 +288,25 @@ public synchronized AlarmSyncResults handleAlarmSnapshot(List<OnmsAlarm> alarms)
return results;
}



private Properties loadStreamsProperties() throws IOException {
final Properties streamsProperties = new Properties();
// Default values
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-datasync");
Path kafkaDir = Paths.get(System.getProperty("karaf.data"), "kafka");
streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, kafkaDir.toString());
// Copy kafka server info from client properties
// Copy common properties from client configuration, which should save the user from having to configure
// properties for the stream client 99% of time
final Dictionary<String, Object> clientProperties = configAdmin.getConfiguration(OpennmsKafkaProducer.KAFKA_CLIENT_PID).getProperties();
if (clientProperties != null) {
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, clientProperties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
copyPropIfNonNull(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clientProperties, streamsProperties);
copyPropIfNonNull(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clientProperties, streamsProperties);
copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, clientProperties, streamsProperties);
copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, clientProperties, streamsProperties);
}
// Add all of the stream properties, overriding the bootstrap servers if set

// Now add all of the stream properties, overriding any of the properties inherited from the producer config
final Dictionary<String, Object> properties = configAdmin.getConfiguration(KAFKA_STREAMS_PID).getProperties();
if (properties != null) {
final Enumeration<String> keys = properties.keys();
Expand All @@ -311,6 +321,13 @@ private Properties loadStreamsProperties() throws IOException {
return streamsProperties;
}

private static void copyPropIfNonNull(String propName, Dictionary<String, Object> sourceMap, Properties targetMap) {
Object propValue = sourceMap.get(propName);
if (propValue != null) {
targetMap.put(propName, propValue);
}
}

public void setAlarmTopic(String alarmTopic) {
this.alarmTopic = alarmTopic;
}
Expand Down

0 comments on commit bc81514

Please sign in to comment.