Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring consumer groups #1650

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion docker/latest/management/management.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ schema.repository:

console:
configurationLocation: console/config-local.json
configurationType: classpath_resource
configurationType: classpath_resource
15 changes: 15 additions & 0 deletions docs/docs/configuration/monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Monitoring
## Ensure all topic partitions are assigned to a consumer group in a subscription

To use this monitoring you have to enable it in the `hermes-management/.../application.yaml` file.
The subscriptions that have unassigned partitions are available through endpoint `/monitoring/consumer-groups`.

```yaml
monitoringConsumerGroups:
enabled: true
numberOfThreads: 6
scanEvery: 120s
```

The `numberOfThreads` and `scanEvery` parameters are just examples. For the best performance of this monitoring, you need to configure them yourself.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package pl.allegro.tech.hermes.api;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class TopicAndSubscription {

private final Topic topic;
private final String subscription;

public TopicAndSubscription(Topic topic, String subscription) {
this.topic = topic;
this.subscription = subscription;
}

@JsonIgnore
public Topic getTopic() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this method ? it is not used anywhere

return topic;
}

public String getTopicName() {
return topic.getQualifiedName();
}

public String getSubscription() {
return subscription;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.allegro.tech.hermes.api.endpoints;

import pl.allegro.tech.hermes.api.TopicAndSubscription;

import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

@Path("monitoring")
public interface MonitoringEndpoint {

@GET
@Produces(APPLICATION_JSON)
@Path("/consumer-groups")
List<TopicAndSubscription> monitorConsumerGroups();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package pl.allegro.tech.hermes.management.api;

import org.springframework.beans.factory.annotation.Autowired;
import pl.allegro.tech.hermes.api.TopicAndSubscription;
import pl.allegro.tech.hermes.management.infrastructure.monitoring.MonitoringCache;

import java.util.List;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

@Path("monitoring")
public class MonitoringEndpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should create corresponding interface for this endpoint in module hermes-api, full package name is pl.allegro.tech.hermes.api.endpoints, just see the existing ones 😄


private final MonitoringCache monitoringCache;

@Autowired
public MonitoringEndpoint(MonitoringCache monitoringCache) {
this.monitoringCache = monitoringCache;
}

@GET
@Produces(APPLICATION_JSON)
@Path("/consumer-groups")
public List<TopicAndSubscription> monitorConsumerGroups() {
return monitoringCache.getSubscriptionsWithUnassignedPartitions();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: unnecessary blank lines above

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
TopicProperties.class,
MetricsProperties.class,
HttpClientProperties.class,
ConsistencyCheckerProperties.class})
ConsistencyCheckerProperties.class,
MonitoringProperties.class})
public class ManagementConfiguration {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package pl.allegro.tech.hermes.management.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.management.config.kafka.KafkaClustersProperties;
import pl.allegro.tech.hermes.management.config.kafka.KafkaNamesMappers;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;
import pl.allegro.tech.hermes.management.infrastructure.monitoring.MonitoringCache;
import pl.allegro.tech.hermes.management.infrastructure.monitoring.MonitoringServicesCreator;

@Configuration
public class MonitoringConfiguration {

@Bean
MonitoringServicesCreator monitoringServicesCreator(KafkaClustersProperties kafkaClustersProperties,
KafkaNamesMappers kafkaNamesMappers) {
return new MonitoringServicesCreator(kafkaClustersProperties, kafkaNamesMappers);
}

@Bean
MonitoringCache monitoringCache(MonitoringProperties monitoringProperties,
SubscriptionService subscriptionService,
TopicService topicService,
MonitoringServicesCreator monitoringServicesCreator) {
return new MonitoringCache(monitoringProperties, subscriptionService, topicService, monitoringServicesCreator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.allegro.tech.hermes.management.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

import java.time.Duration;

@ConfigurationProperties(prefix = "monitoring-consumer-groups")
public class MonitoringProperties {

private boolean enabled = false;
private int numberOfThreads = 6;
private Duration scanEvery = Duration.ofMinutes(2);

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public int getNumberOfThreads() {
return numberOfThreads;
}

public void setNumberOfThreads(int numberOfThreads) {
this.numberOfThreads = numberOfThreads;
}

public Duration getScanEvery() {
return scanEvery;
}

public void setScanEvery(Duration scanEvery) {
this.scanEvery = scanEvery;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package pl.allegro.tech.hermes.management.config.kafka;

import org.apache.kafka.clients.admin.AdminClient;

import java.util.Properties;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;

public class AdminClientFactory {

public static AdminClient brokerAdminClient(KafkaProperties kafkaProperties) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapKafkaServer());
props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL);
props.put(REQUEST_TIMEOUT_MS_CONFIG, kafkaProperties.getKafkaServerRequestTimeoutMillis());
if (kafkaProperties.getSasl().isEnabled()) {
props.put(SASL_MECHANISM, kafkaProperties.getSasl().getMechanism());
props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSasl().getProtocol());
props.put(SASL_JAAS_CONFIG, kafkaProperties.getSasl().getJaasConfig());
}
return AdminClient.create(props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
Expand All @@ -36,18 +37,11 @@

import java.time.Clock;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static java.util.stream.Collectors.toList;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;

@Configuration
@EnableConfigurationProperties(KafkaClustersProperties.class)
Expand All @@ -72,14 +66,14 @@ public class KafkaConfiguration implements MultipleDcKafkaNamesMappersFactory {
MultiDatacenterRepositoryCommandExecutor multiDcExecutor;

@Bean
MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository,
Clock clock, JsonAvroConverter jsonAvroConverter) {
ClustersProvider clustersProvider(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository,
JsonAvroConverter jsonAvroConverter) {
List<DatacenterBoundRepositoryHolder<SubscriptionOffsetChangeIndicator>> repositories =
zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class);

List<BrokersClusterService> clusters = kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> {
KafkaNamesMapper kafkaNamesMapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName());
AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties);
AdminClient brokerAdminClient = AdminClientFactory.brokerAdminClient(kafkaProperties);
BrokerStorage storage = brokersStorage(brokerAdminClient);
BrokerTopicManagement brokerTopicManagement =
new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper);
Expand All @@ -102,8 +96,13 @@ brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
}).collect(toList());

return new ClustersProvider(clusters);
}

@Bean
MultiDCAwareService multiDCAwareService(ClustersProvider clustersProvider, Clock clock) {
return new MultiDCAwareService(
clusters,
clustersProvider,
clock,
ofMillis(subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()),
ofSeconds(subscriptionProperties.getOffsetsMovedTimeoutInSeconds()),
Expand Down Expand Up @@ -169,17 +168,4 @@ private KafkaConsumerPool kafkaConsumersPool(KafkaProperties kafkaProperties, Br

return new KafkaConsumerPool(config, brokerStorage, configuredBootstrapServers);
}

private AdminClient brokerAdminClient(KafkaProperties kafkaProperties) {
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapKafkaServer());
props.put(SECURITY_PROTOCOL_CONFIG, DEFAULT_SECURITY_PROTOCOL);
props.put(REQUEST_TIMEOUT_MS_CONFIG, kafkaProperties.getKafkaServerRequestTimeoutMillis());
if (kafkaProperties.getSasl().isEnabled()) {
props.put(SASL_MECHANISM, kafkaProperties.getSasl().getMechanism());
props.put(SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSasl().getProtocol());
props.put(SASL_JAAS_CONFIG, kafkaProperties.getSasl().getJaasConfig());
}
return AdminClient.create(props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pl.allegro.tech.hermes.management.infrastructure.kafka;

import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;

import java.util.List;

public class ClustersProvider {
private final List<BrokersClusterService> clusters;

public ClustersProvider(List<BrokersClusterService> clusters) {
this.clusters = clusters;
}

public List<BrokersClusterService> getClusters() {
return clusters;
}
}
Loading