diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index e8871e7a1391..d530d937d42f 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -70,7 +70,7 @@ func TestIngesterWAL(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -113,7 +113,7 @@ func TestIngesterWAL(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -127,7 +127,7 @@ func TestIngesterWAL(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -150,7 +150,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -196,7 +196,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { require.NoError(t, err) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -253,7 +253,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -274,7 +274,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester, ensuring we replayed from WAL. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -295,7 +295,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -316,7 +316,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester, ensuring we can replay from the checkpoint as well. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -452,7 +452,7 @@ func Test_SeriesIterator(t *testing.T) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) for i := 0; i < 3; i++ { - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) @@ -499,7 +499,7 @@ func Benchmark_SeriesIterator(b *testing.B) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) for i := range instances { - inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil) + inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.NoError(b, inst.Push(context.Background(), &logproto.PushRequest{ @@ -591,7 +591,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -663,7 +663,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { require.NoError(t, err) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index e4fc748f2560..6fd52bafa066 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -278,7 +278,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger()) + ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 640c64eee6b6..6d27d349c93f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/storage/types" @@ -242,10 +243,12 @@ type Ingester struct { streamRateCalculator *StreamRateCalculator writeLogManager *writefailures.Manager + + customStreamsTracker push.UsageTracker } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } @@ -273,6 +276,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con terminateOnShutdown: false, streamRateCalculator: NewStreamRateCalculator(), writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester"), + customStreamsTracker: customStreamsTracker, } i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) @@ -863,7 +867,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager) + inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b31053a5ded1..035a62e5a641 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -57,7 +57,7 @@ func TestPrepareShutdownMarkerPathNotSet(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -80,7 +80,7 @@ func TestPrepareShutdown(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -141,7 +141,7 @@ func TestIngester_GetStreamRates_Correctness(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -173,7 +173,7 @@ func BenchmarkGetStreamRatesAllocs(b *testing.B) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(b, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -197,7 +197,7 @@ func TestIngester(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -382,7 +382,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -740,7 +740,7 @@ func Test_InMemoryLabels(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -794,7 +794,7 @@ func TestIngester_GetDetectedLabels(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -857,7 +857,7 @@ func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -1224,7 +1224,7 @@ func TestStats(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) i.instances["test"] = defaultInstance(t) @@ -1251,7 +1251,7 @@ func TestVolume(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) i.instances["test"] = defaultInstance(t) @@ -1330,7 +1330,7 @@ func createIngesterServer(t *testing.T, ingesterConfig Config) (ingesterClient, limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) listener := bufconn.Listen(1024 * 1024) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index eb98f8a39b63..a4436b9d4191 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -141,6 +141,7 @@ func newInstance( extractorWrapper log.SampleExtractorWrapper, streamRateCalculator *StreamRateCalculator, writeFailures *writefailures.Manager, + customStreamsTracker push.UsageTracker, ) (*instance, error) { invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards)) if err != nil { @@ -174,6 +175,8 @@ func newInstance( writeFailures: writeFailures, schemaconfig: &c, + + customStreamsTracker: customStreamsTracker, } i.mapper = NewFPMapper(i.getLabelsFromFingerprint) return i, err @@ -241,7 +244,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { continue } - _, appendErr = s.Push(ctx, reqStream.Entries, record, 0, false, rateLimitWholeStream) + _, appendErr = s.Push(ctx, reqStream.Entries, record, 0, false, rateLimitWholeStream, i.customStreamsTracker) s.chunkMtx.Unlock() } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 88b613aa8db2..7f7dc30361d6 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -73,7 +73,7 @@ func TestLabelsCollisions(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) // avoid entries from the future. @@ -101,7 +101,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) const ( @@ -153,7 +153,7 @@ func TestGetStreamRates(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.NoError(t, err) const ( @@ -247,7 +247,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) lbls := makeRandomLabels() @@ -292,7 +292,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { cfg.SyncMinUtilization = 0.20 cfg.IndexShards = indexShards - instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) currentTime := time.Now() @@ -501,7 +501,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -545,7 +545,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() - inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) expr, err := syntax.ParseLogSelector(`{namespace="foo",pod="bar",instance=~"10.*"}`, true) require.NoError(b, err) t, err := newTailer("foo", expr, nil, 10) @@ -1095,7 +1095,8 @@ func TestStreamShardingUsage(t *testing.T) { }) t.Run("invalid push returns error", func(t *testing.T) { - i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + tracker := &mockUsageTracker{} + i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, tracker) ctx := context.Background() err = i.Push(ctx, &logproto.PushRequest{ @@ -1111,10 +1112,11 @@ func TestStreamShardingUsage(t *testing.T) { }, }) require.Error(t, err) + require.Equal(t, 3.0, tracker.discardedBytes) }) t.Run("valid push returns no error", func(t *testing.T) { - i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil) + i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) ctx := context.Background() err = i.Push(ctx, &logproto.PushRequest{ @@ -1449,6 +1451,7 @@ func defaultInstance(t *testing.T) *instance { nil, NewStreamRateCalculator(), nil, + nil, ) require.Nil(t, err) insertData(t, instance) @@ -1535,3 +1538,16 @@ func (f fakeQueryServer) Send(res *logproto.QueryResponse) error { return f(res) } func (f fakeQueryServer) Context() context.Context { return context.TODO() } + +type mockUsageTracker struct { + discardedBytes float64 +} + +// DiscardedBytesAdd implements push.UsageTracker. +func (m *mockUsageTracker) DiscardedBytesAdd(_ context.Context, _ string, _ string, _ labels.Labels, value float64) { + m.discardedBytes += value +} + +// ReceivedBytesAdd implements push.UsageTracker. +func (*mockUsageTracker) ReceivedBytesAdd(_ context.Context, _ string, _ time.Duration, _ labels.Labels, _ float64) { +} diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index a93151e0e6fc..e8b1c244871b 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -168,7 +168,7 @@ func (r *ingesterRecoverer) Push(userID string, entries wal.RefEntries) error { } // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) - bytesAdded, err := s.(*stream).Push(context.Background(), entries.Entries, nil, entries.Counter, true, false) + bytesAdded, err := s.(*stream).Push(context.Background(), entries.Entries, nil, entries.Counter, true, false, r.ing.customStreamsTracker) r.ing.replayController.Add(int64(bytesAdded)) if err != nil && err == ErrEntriesExist { r.ing.metrics.duplicateEntriesTotal.Add(float64(len(entries.Entries))) diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index fd8f05136d6f..9176ff3c6ad2 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -228,7 +228,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) mkSample := func(i int) *logproto.PushRequest { @@ -262,7 +262,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { require.Equal(t, false, iter.Next()) // create a new ingester now - i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil) require.NoError(t, err) // recover the checkpointed series diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 6bf75dfa1ac5..0aa3c41ea619 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester/wal" "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -181,6 +182,8 @@ func (s *stream) Push( lockChunk bool, // Whether nor not to ingest all at once or not. It is a per-tenant configuration. rateLimitWholeStream bool, + + usageTracker push.UsageTracker, ) (int, error) { if lockChunk { s.chunkMtx.Lock() @@ -199,7 +202,7 @@ func (s *stream) Push( return 0, ErrEntriesExist } - toStore, invalid := s.validateEntries(entries, isReplay, rateLimitWholeStream) + toStore, invalid := s.validateEntries(ctx, entries, isReplay, rateLimitWholeStream, usageTracker) if rateLimitWholeStream && hasRateLimitErr(invalid) { return 0, errorForFailedEntries(s, invalid, len(entries)) } @@ -213,7 +216,7 @@ func (s *stream) Push( s.metrics.chunkCreatedStats.Inc(1) } - bytesAdded, storedEntries, entriesWithErr := s.storeEntries(ctx, toStore) + bytesAdded, storedEntries, entriesWithErr := s.storeEntries(ctx, toStore, usageTracker) s.recordAndSendToTailers(record, storedEntries) if len(s.chunks) != prevNumChunks { @@ -313,7 +316,7 @@ func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.E } } -func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry) (int, []logproto.Entry, []entryWithError) { +func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usageTracker push.UsageTracker) (int, []logproto.Entry, []entryWithError) { if sp := opentracing.SpanFromContext(ctx); sp != nil { sp.LogKV("event", "stream started to store entries", "labels", s.labelsString) defer sp.LogKV("event", "stream finished to store entries") @@ -350,11 +353,12 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry) (in bytesAdded += len(entries[i].Line) storedEntries = append(storedEntries, entries[i]) } - s.reportMetrics(outOfOrderSamples, outOfOrderBytes, 0, 0) + s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker) return bytesAdded, storedEntries, invalid } -func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWholeStream bool) ([]logproto.Entry, []entryWithError) { +func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, isReplay, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]logproto.Entry, []entryWithError) { + var ( outOfOrderSamples, outOfOrderBytes int rateLimitedSamples, rateLimitedBytes int @@ -427,11 +431,11 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh } s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes) - s.reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes) + s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes, usageTracker) return toStore, failedEntriesWithError } -func (s *stream) reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes int) { +func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes int, usageTracker push.UsageTracker) { if outOfOrderSamples > 0 { name := validation.OutOfOrder if s.unorderedWrites { @@ -439,10 +443,16 @@ func (s *stream) reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSa } validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples)) validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes)) + if usageTracker != nil { + usageTracker.DiscardedBytesAdd(ctx, s.tenant, name, s.labels, float64(outOfOrderBytes)) + } } if rateLimitedSamples > 0 { validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples)) validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes)) + if usageTracker != nil { + usageTracker.DiscardedBytesAdd(ctx, s.tenant, validation.StreamRateLimit, s.labels, float64(rateLimitedBytes)) + } } } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index af877bf88da9..e4dd4a37ab35 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -73,7 +73,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { _, err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(int64(numLogs), 0), Line: "log"}, - }, recordPool.GetRecord(), 0, true, false) + }, recordPool.GetRecord(), 0, true, false, nil) require.NoError(t, err) newLines := make([]logproto.Entry, numLogs) @@ -94,7 +94,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { fmt.Fprintf(&expected, "user 'fake', total ignored: %d out of %d for stream: {foo=\"bar\"}", numLogs, numLogs) expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String()) - _, err = s.Push(context.Background(), newLines, recordPool.GetRecord(), 0, true, false) + _, err = s.Push(context.Background(), newLines, recordPool.GetRecord(), 0, true, false, nil) require.Error(t, err) require.Equal(t, expectErr.Error(), err.Error()) }) @@ -128,7 +128,7 @@ func TestPushDeduplication(t *testing.T) { {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, - }, recordPool.GetRecord(), 0, true, false) + }, recordPool.GetRecord(), 0, true, false, nil) require.NoError(t, err) require.Len(t, s.chunks, 1) require.Equal(t, s.chunks[0].chunk.Size(), 2, @@ -164,7 +164,7 @@ func TestPushRejectOldCounter(t *testing.T) { {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, - }, recordPool.GetRecord(), 0, true, false) + }, recordPool.GetRecord(), 0, true, false, nil) require.NoError(t, err) require.Len(t, s.chunks, 1) require.Equal(t, s.chunks[0].chunk.Size(), 2, @@ -173,13 +173,13 @@ func TestPushRejectOldCounter(t *testing.T) { // fail to push with a counter <= the streams internal counter _, err = s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, - }, recordPool.GetRecord(), 2, true, false) + }, recordPool.GetRecord(), 2, true, false, nil) require.Equal(t, ErrEntriesExist, err) // succeed with a greater counter _, err = s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, - }, recordPool.GetRecord(), 3, true, false) + }, recordPool.GetRecord(), 3, true, false, nil) require.Nil(t, err) } @@ -270,9 +270,12 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { {Line: "observability", Timestamp: time.Now().AddDate(-1 /* year */, 0 /* month */, 0 /* day */)}, {Line: "short", Timestamp: time.Now()}, } - _, failed := s.validateEntries(entries, false, true) + tracker := &mockUsageTracker{} + + _, failed := s.validateEntries(context.Background(), entries, false, true, tracker) require.NotEmpty(t, failed) require.False(t, hasRateLimitErr(failed)) + require.Equal(t, 13.0, tracker.discardedBytes) } func TestUnorderedPush(t *testing.T) { @@ -340,7 +343,7 @@ func TestUnorderedPush(t *testing.T) { if x.cutBefore { _ = s.cutChunk(context.Background()) } - written, err := s.Push(context.Background(), x.entries, recordPool.GetRecord(), 0, true, false) + written, err := s.Push(context.Background(), x.entries, recordPool.GetRecord(), 0, true, false, nil) if x.err { require.NotNil(t, err) } else { @@ -407,9 +410,11 @@ func TestPushRateLimit(t *testing.T) { {Timestamp: time.Unix(1, 0), Line: "aaaaaaaaab"}, } // Counter should be 2 now since the first line will be deduped. - _, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, true) + tracker := &mockUsageTracker{} + _, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, true, tracker) require.Error(t, err) require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error()) + require.Equal(t, 20.0, tracker.discardedBytes) } func TestPushRateLimitAllOrNothing(t *testing.T) { @@ -446,10 +451,12 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { } // Both entries have errors because rate limiting is done all at once - _, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, true) + tracker := &mockUsageTracker{} + _, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, true, tracker) require.Error(t, err) require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[0].Line))}).Error()) require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error()) + require.Equal(t, 20.0, tracker.discardedBytes) } func TestReplayAppendIgnoresValidityWindow(t *testing.T) { @@ -484,7 +491,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { } // Push a first entry (it doesn't matter if we look like we're replaying or not) - _, err = s.Push(context.Background(), entries, nil, 1, true, false) + _, err = s.Push(context.Background(), entries, nil, 1, true, false, nil) require.Nil(t, err) // Create a sample outside the validity window @@ -493,11 +500,11 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { } // Pretend it's not a replay, ensure we error - _, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, false) + _, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, false, nil) require.NotNil(t, err) // Now pretend it's a replay. The same write should succeed. - _, err = s.Push(context.Background(), entries, nil, 2, true, false) + _, err = s.Push(context.Background(), entries, nil, 2, true, false, nil) require.Nil(t, err) } @@ -542,7 +549,7 @@ func Benchmark_PushStream(b *testing.B) { for n := 0; n < b.N; n++ { rec := recordPool.GetRecord() - _, err := s.Push(ctx, e, rec, 0, true, false) + _, err := s.Push(ctx, e, rec, 0, true, false, nil) require.NoError(b, err) recordPool.PutRecord(rec) } diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 441c688612d9..b39f42957360 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -4,11 +4,11 @@ import ( "encoding/binary" "hash/fnv" "sync" - "sync/atomic" "time" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/labels" + "go.uber.org/atomic" "golang.org/x/net/context" "github.com/grafana/loki/v3/pkg/logproto" diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 3561f89a2318..0280bd514d3c 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -583,7 +583,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) { level.Warn(util_log.Logger).Log("msg", "The config setting shutdown marker path is not set. The /ingester/prepare_shutdown endpoint won't work") } - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging, t.Cfg.MetricsNamespace, logger) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging, t.Cfg.MetricsNamespace, logger, t.UsageTracker) if err != nil { return }