Skip to content

Commit

Permalink
Revert changes to the internal kafka component
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL committed Jul 31, 2023
1 parent e6bfe7a commit 53bcf25
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 29 deletions.
14 changes: 0 additions & 14 deletions internal/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,7 @@ func (k *Kafka) Subscribe(ctx context.Context) error {
running: make(chan struct{}),
}

ctx, cancel := context.WithCancel(ctx)

k.wg.Add(2)
go func() {
defer k.wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-k.closeCh:
}
}()

go func() {
defer k.wg.Done()

k.logger.Debugf("Subscribed and listening to topics: %s", topics)

for {
Expand Down
18 changes: 4 additions & 14 deletions internal/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kafka
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -49,15 +48,10 @@ type Kafka struct {
DefaultConsumeRetryEnabled bool
consumeRetryEnabled bool
consumeRetryInterval time.Duration

wg sync.WaitGroup
closed atomic.Bool
closeCh chan struct{}
}

func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{
closeCh: make(chan struct{}),
logger: logger,
subscribeTopics: make(TopicHandlerConfig),
subscribeLock: sync.Mutex{},
Expand Down Expand Up @@ -140,19 +134,15 @@ func (k *Kafka) Init(_ context.Context, metadata map[string]string) error {
return nil
}

func (k *Kafka) Close() error {
defer k.wg.Wait()
if k.closed.CompareAndSwap(false, true) {
close(k.closeCh)
}

func (k *Kafka) Close() (err error) {
k.closeSubscriptionResources()

if k.producer != nil {
return k.producer.Close()
err = k.producer.Close()
k.producer = nil
}

return nil
return err
}

// EventHandler is the handler used to handle the subscribed event.
Expand Down
2 changes: 1 addition & 1 deletion internal/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
if m.OidcScopes != "" {
m.internalOidcScopes = strings.Split(m.OidcScopes, ",")
} else {
k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This could result in full authorization access for this client.")
k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This is a security risk for token reuse.")
m.internalOidcScopes = []string{"openid"}
}
k.logger.Debug("Configuring SASL token authentication via OIDC.")
Expand Down

0 comments on commit 53bcf25

Please sign in to comment.