Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NMS-15859: Inherit basic SSL config props from the producer to the stream client. #6412

Merged
merged 5 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ Set this to an empty string to disable forwarding nodes.
Set this to an empty string to disable forwarding topologies.
| nodes


| topologyVertexTopic
| Name of the topic used for topology vertices.
| vertices
Expand Down Expand Up @@ -74,6 +73,11 @@ Set this to `false` to prevent suppressing these alarms.
| startAlarmSyncWithCleanState
| Set this to `true` to force the Kafka Streams client to start with a clean state on every boot.
| false

| alarmSync
| Set this to `false` to disable synchronization of the alarms topics.
This is automatically disabled when alarm forwarding is not enabled.
| true
|===

== Configure filtering
Expand Down Expand Up @@ -169,3 +173,25 @@ admin@opennms()> config:update
----

In the example above, we set the vertex and edge topics to be different by default.


christianpape marked this conversation as resolved.
Show resolved Hide resolved
== Configure the alarm sync streams client

When producing alarms to a topic, we also automatically enable a Kafka Stream client to help synchronize the contents of the topic and ensure eventual consistency with the database.

The streams client takes different properties than the producer and requires a separate configuration map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"streams client" or "stream client"? it's singular when mentioned in the previous sentence. (and captialized)


We automatically pull known properties from the producer configuration, allowing it to work without further configuration in most cases.

If your producer takes a special configuration directive, or you would like to tune the behavior of the stream client, you can set properties in `org.opennms.features.kafka.producer.streams`:

[source, console]
----
$ ssh -p 8101 admin@localhost
...
admin@opennms()> config:edit org.opennms.features.kafka.producer.streams
admin@opennms()> config:property-set default.dsl.store rocksDB
admin@opennms()> config:update
----

Any property set in `org.opennms.features.kafka.producer.streams` will override those inherited from `org.opennms.features.kafka.producer.client`.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
Expand Down Expand Up @@ -285,18 +288,25 @@ public synchronized AlarmSyncResults handleAlarmSnapshot(List<OnmsAlarm> alarms)
return results;
}



private Properties loadStreamsProperties() throws IOException {
final Properties streamsProperties = new Properties();
// Default values
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "alarm-datasync");
Path kafkaDir = Paths.get(System.getProperty("karaf.data"), "kafka");
streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, kafkaDir.toString());
// Copy kafka server info from client properties
// Copy common properties from client configuration, which should save the user from having to configure
// properties for the stream client 99% of time
final Dictionary<String, Object> clientProperties = configAdmin.getConfiguration(OpennmsKafkaProducer.KAFKA_CLIENT_PID).getProperties();
if (clientProperties != null) {
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, clientProperties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
copyPropIfNonNull(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clientProperties, streamsProperties);
copyPropIfNonNull(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clientProperties, streamsProperties);
copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, clientProperties, streamsProperties);
copyPropIfNonNull(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, clientProperties, streamsProperties);
}
// Add all of the stream properties, overriding the bootstrap servers if set

// Now add all of the stream properties, overriding any of the properties inherited from the producer config
final Dictionary<String, Object> properties = configAdmin.getConfiguration(KAFKA_STREAMS_PID).getProperties();
if (properties != null) {
final Enumeration<String> keys = properties.keys();
Expand All @@ -311,6 +321,13 @@ private Properties loadStreamsProperties() throws IOException {
return streamsProperties;
}

private static void copyPropIfNonNull(String propName, Dictionary<String, Object> sourceMap, Properties targetMap) {
Object propValue = sourceMap.get(propName);
if (propValue != null) {
targetMap.put(propName, propValue);
}
}

public void setAlarmTopic(String alarmTopic) {
this.alarmTopic = alarmTopic;
}
Expand Down
Loading