Skip to content

Commit

Permalink
rewrite using events instead of time
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Oct 4, 2024
1 parent 39b57ec commit a5c1b70
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
10 changes: 5 additions & 5 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex
attempts := []func() (time.Duration, error){
// First process fetches until at least the max lag is honored.
func() (time.Duration, error) {
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan)
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
},

// If the target lag hasn't been reached with the first attempt (which stops once at least the max lag
Expand All @@ -287,13 +287,13 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex
timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded)
defer cancel()

return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan)
return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since)
},

// If the target lag hasn't been reached with the previous attempt then we'll move on. However,
// we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored.
func() (time.Duration, error) {
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan)
return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
},
}

Expand Down Expand Up @@ -326,7 +326,7 @@ func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Contex
return nil
}

func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record) (time.Duration, error) {
func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) {
boff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
Expand Down Expand Up @@ -382,7 +382,7 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t

// If it took less than the max desired lag to replay the partition
// then we can stop here, otherwise we'll have to redo it.
if currLag = time.Since(lastProducedOffsetRequestedAt); currLag <= maxLag {
if currLag = timeSince(lastProducedOffsetRequestedAt); currLag <= maxLag {
return currLag, nil
}
}
Expand Down
70 changes: 70 additions & 0 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package partition

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
Expand Down Expand Up @@ -161,3 +163,71 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}

func TestPartitionReader_ProcessCommits(t *testing.T) {
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
consumer := newMockConsumer()

consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

partitionID := int32(0)
partitionReader, err := NewReader(kafkaCfg, partitionID, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)

// Init the client: This usually happens in "start" but we want to manage our own lifecycle for this test.
partitionReader.client, err = kafka.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(),
kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{
kafkaCfg.Topic: {partitionID: kgo.NewOffset().AtStart()},
}),
)
require.NoError(t, err)

stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
Entries: []logproto.Entry{{Timestamp: time.Now(), Line: "test"}},
}

records, err := kafka.Encode(partitionID, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

ctx, cancel := context.WithDeadlineCause(context.Background(), time.Now().Add(10*time.Second), fmt.Errorf("test unexpectedly deadlocked"))
recordsChan := make(chan []Record)
wait := consumer.Start(ctx, recordsChan)

targetLag := time.Second

i := -1
iterations := 5
producer.ProduceSync(context.Background(), records...)
// timeSince acts as a hook for when we check if we've honoured the lag or not. We modify it to respond "no" initially, to force a re-loop, and then "yes" after `iterations`.
// We also inject a new kafka record each time so there is more to consume.
timeSince := func(t time.Time) time.Duration {

Check warning on line 209 in pkg/kafka/partition/reader_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 't' seems to be unused, consider removing or renaming it as _ (revive)
i++
if i < iterations {
producer.ProduceSync(context.Background(), records...)
return targetLag + 1
}
return targetLag - 1
}

_, err = partitionReader.processNextFetchesUntilLagHonored(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince)
assert.NoError(t, err)

// Wait to process all the records
cancel()
wait()

close(recordsChan)
close(consumer.recordsChan)
recordsCount := 0
for receivedRecords := range consumer.recordsChan {
recordsCount += len(receivedRecords)
}
// We expect to have processed all the records, including initial + one per iteration.
assert.Equal(t, iterations+1, recordsCount)
}

0 comments on commit a5c1b70

Please sign in to comment.