Skip to content

Commit

Permalink
fix: close kafka readers
Browse files Browse the repository at this point in the history
The current code closes the batchPipeline channel when it receive transient
errors to restart the readers. But this does not close Kafka readers so they
continue to read. When this happens, a message will be written to a closed
channel triggering panic in processMessagesIntoBatchPipeline. As that does
not use recover, this will crash the whole process.

This change refactors error handling to ensure that the errors are properly
propagated to the caller while also supporting context cancellation which
allowed to provide extended test coverage. Kafka readers/writers are always
closed on errors.

This change still uses an explicit exit on errors and lets the container
runtime to restart the process.

CLOSE #706
  • Loading branch information
ibukanov committed May 22, 2024
1 parent b66333c commit 25b1c9e
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 58 deletions.
120 changes: 76 additions & 44 deletions kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ var brokers []string
// Processor is a function that is used to process Kafka messages on
type Processor func(context.Context, kafka.Message, *zerolog.Logger) error

// Subset of kafka.Reader methods that we use. This is used for testing.
// Subset of kafka.Reader methods that we use factored as an interface for unit
// testing support.
type messageReader interface {
FetchMessage(ctx context.Context) (kafka.Message, error)
Stats() kafka.ReaderStats
CommitMessages(ctx context.Context, msgs ...kafka.Message) error
}

// TopicMapping represents a kafka topic, how to process it, and where to emit the result.
Expand All @@ -43,8 +45,20 @@ type MessageContext struct {
msg kafka.Message
}

// StartConsumers reads configuration variables and starts the associated kafka consumers
func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error {
func closeWriter(writer io.Closer, logger *zerolog.Logger) {
err := writer.Close()
if err != nil {
logger.Error().Err(err).Msg("failed to close a writer")
}
}

// RunConsumers reads configuration variables, creates the associated kafka
// readers and writer and run them until an error occurred.
func RunConsumers(
ctx context.Context,
providedServer *server.Server,
logger *zerolog.Logger,
) error {
adsRequestRedeemV1Topic := os.Getenv("REDEEM_CONSUMER_TOPIC")
adsResultRedeemV1Topic := os.Getenv("REDEEM_PRODUCER_TOPIC")
adsRequestSignV1Topic := os.Getenv("SIGN_CONSUMER_TOPIC")
Expand All @@ -59,11 +73,13 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
Topic: adsResultRedeemV1Topic,
Dialer: getDialer(logger),
})
defer closeWriter(redeemWriter, logger)
signWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultSignV1Topic,
Dialer: getDialer(logger),
})
defer closeWriter(signWriter, logger)
topicMappings := []TopicMapping{
{
Topic: adsRequestRedeemV1Topic,
Expand All @@ -86,86 +102,102 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
}

reader := newConsumer(topics, adsConsumerGroupV1, logger)
defer reader.Close()

// Each message in batchPipeline is associated with goroutine doing
// CPU-intensive cryptography, so limit the channel capacity by CPU cores
// plus some extra buffer to account for IO that a processor may potentially
// do.
batchPipeline := make(chan *MessageContext, runtime.NumCPU()+2)
ctx := context.Background()
go processMessagesIntoBatchPipeline(ctx, topicMappings, reader, batchPipeline, logger)
for {
err := readAndCommitBatchPipelineResults(ctx, reader, batchPipeline, logger)
if err != nil {
// If readAndCommitBatchPipelineResults returns an error.
close(batchPipeline)
return err
}
}
return readAndCommitBatchPipelineResults(ctx, reader, batchPipeline, logger)
}

// readAndCommitBatchPipelineResults does a blocking read of the batchPipeline channel and
// then does a blocking read of the done field in the MessageContext in the batchPipeline.
// When an error appears it means that the channel was closed or a temporary error was
// encountered. In the case of a temporary error, the application returns an error without
// committing so that the next reader gets the same message to try again.
// readAndCommitBatchPipelineResults receives messages from the batchPipeline
// channel and commits them until ctx is cancelled, batchPipeline is closed or a
// message error is received.
func readAndCommitBatchPipelineResults(
ctx context.Context,
reader *kafka.Reader,
reader messageReader,
batchPipeline chan *MessageContext,
logger *zerolog.Logger,
) error {
msgCtx := <-batchPipeline
<-msgCtx.done
for {
var msgCtx *MessageContext
select {
case <-ctx.Done():
return ctx.Err()
case msgCtx = <-batchPipeline:
break
}
if msgCtx == nil {
// processMessagesIntoBatchPipeline closes the channel, signal to
// the caller that readAndCommitBatchPipelineResults completed with
// an error so the the reading restarts.
return io.EOF
}
select {
case <-ctx.Done():
return ctx.Err()
case <-msgCtx.done:
break
}

if msgCtx.err != nil {
logger.Error().Err(msgCtx.err).Msg("temporary failure encountered")
return fmt.Errorf("temporary failure encountered: %w", msgCtx.err)
}
logger.Info().Msgf("Committing offset %d", msgCtx.msg.Offset)
if err := reader.CommitMessages(ctx, msgCtx.msg); err != nil {
logger.Error().Err(err).Msg("failed to commit")
return errors.New("failed to commit")
if msgCtx.err != nil {
return fmt.Errorf("temporary failure encountered: %w", msgCtx.err)
}
logger.Info().Msgf("Committing offset %d", msgCtx.msg.Offset)
if err := reader.CommitMessages(ctx, msgCtx.msg); err != nil {
return fmt.Errorf("failed to commit - %w", err)
}
}
return nil
}

// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely,
// pushes a MessageContext into the batchPipeline to maintain message order, and
// then spawns a goroutine that will process the message and push to errorResult
// of the MessageContext when the processing completes.
// processMessagesIntoBatchPipeline fetches messages from Kafka, pushes a
// MessageContext into the batchPipeline to maintain message order, and then
// spawns a goroutine that will process the message and closes the done channel
// of the MessageContext when the processing completes. This returns when the
// reader is closed or ctx is cancelled.
func processMessagesIntoBatchPipeline(ctx context.Context,
topicMappings []TopicMapping,
reader messageReader,
batchPipeline chan *MessageContext,
logger *zerolog.Logger,
) {
// Loop forever
// Signal to runMessageProcessor() that processing stopped.
defer close(batchPipeline)
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
// Indicates batch has no more messages. End the loop for
// this batch and fetch another.
if ctxErr := ctx.Err(); ctxErr != nil {
// cancelled context, log error if it is not related to the
// cancellation.
if !errors.Is(err, ctxErr) {
logger.Error().Err(err).Msg("FetchMessage error")
}
return
}
if err == io.EOF {
logger.Info().Msg("Batch complete")
} else if errors.Is(err, context.DeadlineExceeded) {
logger.Error().Err(err).Msg("batch item error")
panic("failed to fetch kafka messages and closed channel")
logger.Info().Msg("Kafka reader closed")
return
}
// There are other possible errors, but the underlying consumer
// group handler handle retryable failures well. If further
// investigation is needed you can review the handler here:
// https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L729
logger.Error().Err(err).Msg("FetchMessage error")
continue
}
msgCtx := &MessageContext{
done: make(chan struct{}),
msg: msg,
}
// If batchPipeline has been closed by an error in readAndCommitBatchPipelineResults,
// this write will panic, which is desired behavior, as the rest of the context
// will also have died and will be restarted from kafka/main.go
batchPipeline <- msgCtx
select {
case <-ctx.Done():
return
case batchPipeline <- msgCtx:
break
}
logger.Debug().Msgf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset)
logger.Debug().Msgf("Reader Stats: %#v", reader.Stats())
logger.Debug().Msgf("topicMappings: %+v", topicMappings)
Expand Down
Loading

0 comments on commit 25b1c9e

Please sign in to comment.