From 74ace41dfcd1b0d80940e0ade255aa27a92b5dcb Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 4 Oct 2024 08:58:15 -0400 Subject: [PATCH] validate config when starting distributor --- pkg/loki/modules.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 155466bcdedd..d9c111cc8649 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -322,6 +322,10 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { func (t *Loki) initDistributor() (services.Service, error) { t.Cfg.Distributor.KafkaConfig = t.Cfg.KafkaConfig + if t.Cfg.Distributor.KafkaEnabled && !t.Cfg.Ingester.KafkaIngestion.Enabled { + return nil, errors.New("kafka is enabled in distributor but not in ingester") + } + var err error logger := log.With(util_log.Logger, "component", "distributor") t.distributor, err = distributor.New( @@ -1754,7 +1758,7 @@ func (t *Loki) initAnalytics() (services.Service, error) { // The Ingest Partition Ring is responsible for watching the available ingesters and assigning partitions to incoming requests. func (t *Loki) initPartitionRing() (services.Service, error) { - if !t.Cfg.Ingester.KafkaIngestion.Enabled && !t.Cfg.Distributor.KafkaEnabled { + if !t.Cfg.Ingester.KafkaIngestion.Enabled { return nil, nil }