-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monitoring consumer groups #1650
base: master
Are you sure you want to change the base?
Changes from all commits
ee01501
21c6592
121295a
55b756c
84b24f7
b298422
8508a28
1d8a41e
1715077
9154378
3c4a6b5
f2bd48e
31bc8e8
69f0588
0040e00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# Monitoring | ||
## Ensure all topic partitions are assigned to a consumer group in a subscription | ||
|
||
To use this monitoring you have to enable it in the `hermes-management/.../application.yaml` file. | ||
The subscriptions that have unassigned partitions are available through endpoint `/monitoring/consumer-groups`. | ||
|
||
```yaml | ||
monitoringConsumerGroups: | ||
enabled: true | ||
numberOfThreads: 6 | ||
scanEvery: 120s | ||
``` | ||
|
||
The `numberOfThreads` and `scanEvery` parameters are just examples. For the best performance of this monitoring, you need to configure them yourself. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package pl.allegro.tech.hermes.api; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
|
||
public class TopicAndSubscription { | ||
|
||
private final Topic topic; | ||
private final String subscription; | ||
|
||
public TopicAndSubscription(Topic topic, String subscription) { | ||
this.topic = topic; | ||
this.subscription = subscription; | ||
} | ||
|
||
@JsonIgnore | ||
public Topic getTopic() { | ||
return topic; | ||
} | ||
|
||
public String getTopicName() { | ||
return topic.getQualifiedName(); | ||
} | ||
|
||
public String getSubscription() { | ||
return subscription; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package pl.allegro.tech.hermes.api.endpoints; | ||
|
||
import jakarta.ws.rs.GET; | ||
import jakarta.ws.rs.Path; | ||
import jakarta.ws.rs.Produces; | ||
import pl.allegro.tech.hermes.api.TopicAndSubscription; | ||
|
||
import java.util.List; | ||
|
||
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; | ||
|
||
@Path("monitoring") | ||
public interface MonitoringEndpoint { | ||
|
||
@GET | ||
@Produces(APPLICATION_JSON) | ||
@Path("/consumer-groups") | ||
List<TopicAndSubscription> monitorConsumerGroups(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package pl.allegro.tech.hermes.management.api; | ||
|
||
import jakarta.ws.rs.GET; | ||
import jakarta.ws.rs.Path; | ||
import jakarta.ws.rs.Produces; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import pl.allegro.tech.hermes.api.TopicAndSubscription; | ||
import pl.allegro.tech.hermes.management.infrastructure.monitoring.MonitoringCache; | ||
|
||
import java.util.List; | ||
|
||
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; | ||
|
||
@Path("monitoring") | ||
public class MonitoringEndpoint { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should create corresponding interface for this endpoint in module |
||
|
||
private final MonitoringCache monitoringCache; | ||
|
||
@Autowired | ||
public MonitoringEndpoint(MonitoringCache monitoringCache) { | ||
this.monitoringCache = monitoringCache; | ||
} | ||
|
||
@GET | ||
@Produces(APPLICATION_JSON) | ||
@Path("/consumer-groups") | ||
public List<TopicAndSubscription> monitorConsumerGroups() { | ||
return monitoringCache.getSubscriptionsWithUnassignedPartitions(); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: unnecessary blank lines above |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package pl.allegro.tech.hermes.management.config; | ||
|
||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import pl.allegro.tech.hermes.management.config.kafka.KafkaClustersProperties; | ||
import pl.allegro.tech.hermes.management.config.kafka.KafkaNamesMappers; | ||
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService; | ||
import pl.allegro.tech.hermes.management.domain.topic.TopicService; | ||
import pl.allegro.tech.hermes.management.infrastructure.monitoring.MonitoringCache; | ||
import pl.allegro.tech.hermes.management.infrastructure.monitoring.MonitoringServicesCreator; | ||
|
||
@Configuration | ||
public class MonitoringConfiguration { | ||
|
||
@Bean | ||
MonitoringServicesCreator monitoringServicesCreator(KafkaClustersProperties kafkaClustersProperties, | ||
KafkaNamesMappers kafkaNamesMappers) { | ||
return new MonitoringServicesCreator(kafkaClustersProperties, kafkaNamesMappers); | ||
} | ||
|
||
@Bean | ||
MonitoringCache monitoringCache(MonitoringProperties monitoringProperties, | ||
SubscriptionService subscriptionService, | ||
TopicService topicService, | ||
MonitoringServicesCreator monitoringServicesCreator) { | ||
return new MonitoringCache(monitoringProperties, subscriptionService, topicService, monitoringServicesCreator); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package pl.allegro.tech.hermes.management.config; | ||
|
||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
|
||
import java.time.Duration; | ||
|
||
@ConfigurationProperties(prefix = "monitoring-consumer-groups") | ||
public class MonitoringProperties { | ||
|
||
private boolean enabled = false; | ||
private int numberOfThreads = 6; | ||
private Duration scanEvery = Duration.ofMinutes(2); | ||
|
||
public boolean isEnabled() { | ||
return enabled; | ||
} | ||
|
||
public void setEnabled(boolean enabled) { | ||
this.enabled = enabled; | ||
} | ||
|
||
public int getNumberOfThreads() { | ||
return numberOfThreads; | ||
} | ||
|
||
public void setNumberOfThreads(int numberOfThreads) { | ||
this.numberOfThreads = numberOfThreads; | ||
} | ||
|
||
public Duration getScanEvery() { | ||
return scanEvery; | ||
} | ||
|
||
public void setScanEvery(Duration scanEvery) { | ||
this.scanEvery = scanEvery; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package pl.allegro.tech.hermes.management.config.kafka; | ||
|
||
import org.apache.kafka.clients.admin.AdminClient; | ||
|
||
import java.util.Properties; | ||
|
||
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; | ||
import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; | ||
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; | ||
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; | ||
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; | ||
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; | ||
|
||
public class AdminClientFactory { | ||
|
||
public static AdminClient brokerAdminClient(KafkaProperties kafkaProperties) { | ||
Properties props = new Properties(); | ||
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBrokerList()); | ||
props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL); | ||
props.put(REQUEST_TIMEOUT_MS_CONFIG, kafkaProperties.getKafkaServerRequestTimeoutMillis()); | ||
if (kafkaProperties.getAuthentication().isEnabled()) { | ||
props.put(SASL_MECHANISM, kafkaProperties.getAuthentication().getMechanism()); | ||
props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getAuthentication().getProtocol()); | ||
props.put(SASL_JAAS_CONFIG, kafkaProperties.getAuthentication().getJaasConfig()); | ||
} | ||
return AdminClient.create(props); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package pl.allegro.tech.hermes.management.infrastructure.kafka; | ||
|
||
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; | ||
|
||
import java.util.List; | ||
|
||
public class ClustersProvider { | ||
private final List<BrokersClusterService> clusters; | ||
|
||
public ClustersProvider(List<BrokersClusterService> clusters) { | ||
this.clusters = clusters; | ||
} | ||
|
||
public List<BrokersClusterService> getClusters() { | ||
return clusters; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this method ? it is not used anywhere