Skip to content

Commit

Permalink
Merge branch 'main' into use-correct-release-please-actions-url
Browse files Browse the repository at this point in the history
  • Loading branch information
sebmellen authored May 18, 2024
2 parents edc369a + 88c6711 commit da3bec4
Show file tree
Hide file tree
Showing 339 changed files with 46,040 additions and 641 deletions.
2 changes: 1 addition & 1 deletion docs/sources/get-started/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ This page describes the responsibilities of each of these components.
## Distributor

The **distributor** service is responsible for handling incoming push requests from
clients. It's the first stop in the write path for log data. Once the
clients. It's the first step in the write path for log data. Once the
distributor receives a set of streams in an HTTP request, each stream is validated for correctness
and to ensure that it is within the configured tenant (or global) limits. Each valid stream
is then sent to `n` [ingesters](#ingester) in parallel, where `n` is the [replication factor](#replication-factor) for data.
Expand Down
1 change: 1 addition & 0 deletions docs/sources/send-data/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ These third-party clients also enable sending logs to Loki:
- [push-to-loki.py](https://github.com/sleleko/devops-kb/blob/master/python/push-to-loki.py) (Python 3)
- [python-logging-loki](https://pypi.org/project/python-logging-loki/) (Python 3)
- [nextlog](https://pypi.org/project/nextlog/) (Python 3)
- [Rails Loki Exporter](https://github.com/planninghow/rails-loki-exporter) (Rails)
- [Serilog-Sinks-Loki](https://github.com/JosephWoodward/Serilog-Sinks-Loki) (C#)
- [Vector Loki Sink](https://vector.dev/docs/reference/configuration/sinks/loki/)
- [winston-loki](https://github.com/JaniAnttonen/winston-loki) (JS)
12 changes: 11 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ require (
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0
github.com/shirou/gopsutil/v4 v4.24.0-alpha.1
github.com/thanos-io/objstore v0.0.0-20230829152104-1b257a36f9a3
github.com/willf/bloom v2.0.3+incompatible
go.opentelemetry.io/collector/pdata v1.3.0
Expand All @@ -147,7 +148,16 @@ require (
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
)

require github.com/dlclark/regexp2 v1.4.0 // indirect
require (
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
)

require (
cloud.google.com/go v0.112.0 // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI=
github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
Expand Down Expand Up @@ -1318,6 +1319,7 @@ github.com/linode/linodego v1.29.0 h1:gDSQWAbKMAQX8db9FDCXHhodQPrJmLcmthjx6m+PyV
github.com/linode/linodego v1.29.0/go.mod h1:3k6WvCM10gillgYcnoLqIL23ST27BD9HhMsCJWb3Bpk=
github.com/liquidweb/liquidweb-go v1.6.0/go.mod h1:UDcVnAMDkZxpw4Y7NOHkqoeiGacVLEIG/i5J9cyixzQ=
github.com/lucas-clemente/quic-go v0.13.1/go.mod h1:Vn3/Fb0/77b02SGhQk36KzOUmXgVpFfizUfW5WMaqyU=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down Expand Up @@ -1551,6 +1553,7 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA=
github.com/prometheus/alertmanager v0.27.0 h1:V6nTa2J5V4s8TG4C4HtrBP/WNSebCCTYGGv4qecA/+I=
Expand Down Expand Up @@ -1658,6 +1661,12 @@ github.com/sercand/kuberesolver/v5 v5.1.1/go.mod h1:Fs1KbKhVRnB2aDWN12NjKCB+RgYM
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.20.9+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.22.8/go.mod h1:s648gW4IywYzUfE/KjXxUsqrqx/T2xO5VqOXxONeRfI=
github.com/shirou/gopsutil/v4 v4.24.0-alpha.1 h1:lLPAdP4TpfgJ5byoc3EFwNSKZj8kCnDFHtuWTktWl0s=
github.com/shirou/gopsutil/v4 v4.24.0-alpha.1/go.mod h1:GVpYUxBee6CTWux2/JslZ7fYPwqkQ8YDJSXmGAryYy4=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v0.0.0-20200105231215-408a2507e114/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
Expand Down Expand Up @@ -1748,7 +1757,11 @@ github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLp
github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448 h1:hbyjqt5UnyKeOT3rFVxLxi7iTI6XqR2p4TkwEAQdUiw=
Expand Down Expand Up @@ -1814,6 +1827,8 @@ github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4/go.mod h1:aEV29Xrm
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8=
go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
Expand Down Expand Up @@ -2240,6 +2255,8 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down
4 changes: 4 additions & 0 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ limits_config:
attributes_config:
- action: index_label
attributes: ["service.name"]
log_attributes:
- action: drop
attributes: [email]
storage_config:
named_stores:
Expand Down Expand Up @@ -430,6 +433,7 @@ func (c *Component) run() error {
return err
}

config.LimitsConfig.SetGlobalOTLPConfig(config.Distributor.OTLPConfig)
var err error
c.loki, err = loki.New(config.Config)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func TestOTLPLogsIngestQuery(t *testing.T) {

t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushOTLPLogLine("lineA", now.Add(-45*time.Minute), map[string]any{"trace_id": 1, "user_id": "2"}))
require.NoError(t, cliDistributor.PushOTLPLogLine("lineA", now.Add(-45*time.Minute), map[string]any{"trace_id": 1, "user_id": "2", "email": "[email protected]"}))
require.NoError(t, cliDistributor.PushOTLPLogLine("lineB", now.Add(-45*time.Minute), nil))

require.NoError(t, cliDistributor.PushOTLPLogLine("lineC", now, map[string]any{"order.ids": []any{5, 6}}))
Expand Down
2 changes: 1 addition & 1 deletion nix/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ To build the repo (including running tests), from the root of the repo you can r

Nix is supported on Linux, MacOS, and Windows (WSL2). Check [here](https://nixos.org/download.html#download-nix) for installation instructions for your specific platform.

You will also need to enable the Flakes feature to use Nix with this repo. See this [wiki](https://nixos.wiki/wiki/Flakes) for instructions on enabling Flakes.
You will also need to enable the Flakes feature to use Nix with this repo. See this [wiki](https://wiki.nixos.org/wiki/Flakes) for instructions on enabling Flakes.
37 changes: 37 additions & 0 deletions pkg/analytics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"io"
"math"
"os"
"time"

"github.com/go-kit/log"
Expand All @@ -20,6 +21,8 @@ import (

"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/util/build"

"github.com/shirou/gopsutil/v4/process"
)

const (
Expand Down Expand Up @@ -259,6 +262,7 @@ func (rep *Reporter) running(ctx context.Context) error {
}
return nil
}
rep.startCPUPercentCollection(ctx)
// check every minute if we should report.
ticker := time.NewTicker(reportCheckInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -313,6 +317,39 @@ func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error
return errs.Err()
}

var (
cpuUsageKey = "cpu_usage"
cpuUsage = NewFloat(cpuUsageKey)
cpuCollectionInterval = time.Minute
)

func (rep *Reporter) startCPUPercentCollection(ctx context.Context) {
proc, err := process.NewProcess(int32(os.Getpid()))
if err != nil {
level.Debug(rep.logger).Log("msg", "failed to get process", "err", err)
return
}
go func() {
for {
select {
case <-ctx.Done():
return
default:
percent, err := proc.CPUPercentWithContext(ctx)
if err != nil {
level.Debug(rep.logger).Log("msg", "failed to get cpu percent", "err", err)
} else {
if cpuUsage.Value() < percent {
cpuUsage.Set(percent)
}
}

}
time.Sleep(cpuCollectionInterval)
}
}()
}

// nextReport compute the next report time based on the interval.
// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time.
func nextReport(interval time.Duration, createdAt, now time.Time) time.Time {
Expand Down
14 changes: 14 additions & 0 deletions pkg/analytics/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,17 @@ func TestWrongKV(t *testing.T) {
}()
require.Equal(t, nil, r.running(ctx))
}

func TestStartCPUCollection(t *testing.T) {
cpuCollectionInterval = 1 * time.Second
r, err := NewReporter(Config{Leader: true, Enabled: true}, kv.Config{
Store: "inmemory",
}, nil, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r.startCPUPercentCollection(ctx)
require.Eventually(t, func() bool {
return cpuUsage.Value() > 0
}, 5*time.Second, 1*time.Second)
}
6 changes: 5 additions & 1 deletion pkg/analytics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ func buildMetrics() map[string]interface{} {
"memstats": memstats(),
"num_cpu": runtime.NumCPU(),
"num_goroutine": runtime.NumGoroutine(),
// the highest recorded cpu usage over the interval
"cpu_usage": cpuUsage.Value(),
}
// reset cpu usage
cpuUsage.Set(0)
expvar.Do(func(kv expvar.KeyValue) {
if !strings.HasPrefix(kv.Key, statsPrefix) || kv.Key == statsPrefix+targetKey || kv.Key == statsPrefix+editionKey {
if !strings.HasPrefix(kv.Key, statsPrefix) || kv.Key == statsPrefix+targetKey || kv.Key == statsPrefix+editionKey || kv.Key == statsPrefix+cpuUsageKey {
return
}
var value interface{}
Expand Down
39 changes: 37 additions & 2 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/querier/plan"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
Expand Down Expand Up @@ -61,19 +63,26 @@ func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem st
}
}

type QuerierConfig struct {
// MinTableOffset is derived from the compactor's MinTableOffset
MinTableOffset int
}

// BloomQuerier is a store-level abstraction on top of Client
// It is used by the index gateway to filter ChunkRefs based on given line fiter expression.
type BloomQuerier struct {
c Client
cfg QuerierConfig
logger log.Logger
metrics *querierMetrics
limits Limits
blockResolver BlockResolver
}

func NewQuerier(c Client, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier {
func NewQuerier(c Client, cfg QuerierConfig, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier {
return &BloomQuerier{
c: c,
cfg: cfg,
logger: logger,
metrics: newQuerierMetrics(r, constants.Loki, querierMetricsSubsystem),
limits: limits,
Expand Down Expand Up @@ -101,6 +110,33 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)

// Do not attempt to filter chunks for which there are no blooms
if bq.cfg.MinTableOffset > 0 {
minAge := truncateDay(model.Now()).Add(-1 * config.ObjectStorageIndexRequiredPeriod * time.Duration(bq.cfg.MinTableOffset-1))
if through.After(minAge) {
level.Debug(logger).Log(
"msg", "skip too recent chunks",
"tenant", tenant,
"from", from.Time(),
"through", through.Time(),
"responses", 0,
"preFilterChunks", preFilterChunks,
"postFilterChunks", preFilterChunks,
"filteredChunks", 0,
"preFilterSeries", preFilterSeries,
"postFilterSeries", preFilterSeries,
"filteredSeries", 0,
)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(0)
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
bq.metrics.seriesFiltered.Add(0)

return chunkRefs, nil
}
}

responses := make([][]*logproto.GroupedChunkRefs, 0, 2)
// We can perform requests sequentially, because most of the time the request
// only covers a single day, and if not, it's at most two days.
Expand Down Expand Up @@ -153,7 +189,6 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
"preFilterSeries", preFilterSeries,
"postFilterSeries", postFilterSeries,
"filteredSeries", preFilterSeries-postFilterSeries,
"operation", "bloomquerier.FilterChunkRefs",
)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
Expand Down
9 changes: 5 additions & 4 deletions pkg/bloomgateway/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ var _ BlockResolver = &mockBlockResolver{}
func TestBloomQuerier(t *testing.T) {
logger := log.NewNopLogger()
limits := newLimits()
cfg := QuerierConfig{}
resolver := &mockBlockResolver{}
tenant := "fake"

t.Run("client not called when filters are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -86,7 +87,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("client not called when chunkRefs are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -102,7 +103,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("querier propagates error from client", func(t *testing.T) {
c := &noopClient{err: errors.New("something went wrong")}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
through := model.Now()
Expand All @@ -121,7 +122,7 @@ func TestBloomQuerier(t *testing.T) {

t.Run("client called once for each day of the interval", func(t *testing.T) {
c := &noopClient{}
bq := NewQuerier(c, limits, resolver, nil, logger)
bq := NewQuerier(c, cfg, limits, resolver, nil, logger)

ctx := context.Background()
from := mktime("2024-04-16 22:00")
Expand Down
10 changes: 7 additions & 3 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ var (
)

type errTooFarBehind struct {
// original timestmap of the entry itself.
entryTs time.Time

// cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to.
cutoff time.Time
}

Expand All @@ -32,12 +36,12 @@ func IsErrTooFarBehind(err error) bool {
return ok
}

func ErrTooFarBehind(cutoff time.Time) error {
return &errTooFarBehind{cutoff: cutoff}
func ErrTooFarBehind(entryTs, cutoff time.Time) error {
return &errTooFarBehind{entryTs: entryTs, cutoff: cutoff}
}

func (m *errTooFarBehind) Error() string {
return "entry too far behind, oldest acceptable timestamp is: " + m.cutoff.Format(time.RFC3339)
return fmt.Sprintf("entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s", m.entryTs.Format(time.RFC3339), m.cutoff.Format(time.RFC3339))
}

func IsOutOfOrderErr(err error) bool {
Expand Down
Loading

0 comments on commit da3bec4

Please sign in to comment.