diff --git a/internal/component/kafka/consumer.go b/internal/component/kafka/consumer.go index 2238c96d73..1601fd7418 100644 --- a/internal/component/kafka/consumer.go +++ b/internal/component/kafka/consumer.go @@ -284,21 +284,7 @@ func (k *Kafka) Subscribe(ctx context.Context) error { running: make(chan struct{}), } - ctx, cancel := context.WithCancel(ctx) - - k.wg.Add(2) - go func() { - defer k.wg.Done() - defer cancel() - select { - case <-ctx.Done(): - case <-k.closeCh: - } - }() - go func() { - defer k.wg.Done() - k.logger.Debugf("Subscribed and listening to topics: %s", topics) for { diff --git a/internal/component/kafka/kafka.go b/internal/component/kafka/kafka.go index 35ba783f90..517eb46531 100644 --- a/internal/component/kafka/kafka.go +++ b/internal/component/kafka/kafka.go @@ -16,7 +16,6 @@ package kafka import ( "context" "sync" - "sync/atomic" "time" "github.com/Shopify/sarama" @@ -49,15 +48,10 @@ type Kafka struct { DefaultConsumeRetryEnabled bool consumeRetryEnabled bool consumeRetryInterval time.Duration - - wg sync.WaitGroup - closed atomic.Bool - closeCh chan struct{} } func NewKafka(logger logger.Logger) *Kafka { return &Kafka{ - closeCh: make(chan struct{}), logger: logger, subscribeTopics: make(TopicHandlerConfig), subscribeLock: sync.Mutex{}, @@ -140,19 +134,15 @@ func (k *Kafka) Init(_ context.Context, metadata map[string]string) error { return nil } -func (k *Kafka) Close() error { - defer k.wg.Wait() - if k.closed.CompareAndSwap(false, true) { - close(k.closeCh) - } - +func (k *Kafka) Close() (err error) { k.closeSubscriptionResources() if k.producer != nil { - return k.producer.Close() + err = k.producer.Close() + k.producer = nil } - return nil + return err } // EventHandler is the handler used to handle the subscribed event. diff --git a/internal/component/kafka/metadata.go b/internal/component/kafka/metadata.go index 7c6049855c..0a9cf96fa5 100644 --- a/internal/component/kafka/metadata.go +++ b/internal/component/kafka/metadata.go @@ -177,7 +177,7 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) if m.OidcScopes != "" { m.internalOidcScopes = strings.Split(m.OidcScopes, ",") } else { - k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This could result in full authorization access for this client.") + k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This is a security risk for token reuse.") m.internalOidcScopes = []string{"openid"} } k.logger.Debug("Configuring SASL token authentication via OIDC.")