Skip to content

Commit

Permalink
Merge branch 'main' into tpatterson/pipeline-wrapper-cache-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle authored May 6, 2024
2 parents 9f297f8 + db7c05c commit 6bc60a6
Show file tree
Hide file tree
Showing 39 changed files with 3,037 additions and 174 deletions.
1 change: 1 addition & 0 deletions .github/workflows/helm-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- main
- helm-5.48
paths:
- 'production/helm/loki/Chart.yaml'

Expand Down
33 changes: 25 additions & 8 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,19 +543,19 @@ The `alibabacloud_storage_config` block configures the connection to Alibaba Clo

```yaml
# Name of OSS bucket.
# CLI flag: -common.storage.oss.bucketname
# CLI flag: -<prefix>.storage.oss.bucketname
[bucket: <string> | default = ""]
# oss Endpoint to connect to.
# CLI flag: -common.storage.oss.endpoint
# CLI flag: -<prefix>.storage.oss.endpoint
[endpoint: <string> | default = ""]
# alibabacloud Access Key ID
# CLI flag: -common.storage.oss.access-key-id
# CLI flag: -<prefix>.storage.oss.access-key-id
[access_key_id: <string> | default = ""]
# alibabacloud Secret Access Key
# CLI flag: -common.storage.oss.secret-access-key
# CLI flag: -<prefix>.storage.oss.secret-access-key
[secret_access_key: <string> | default = ""]
```

Expand Down Expand Up @@ -2236,10 +2236,23 @@ The `frontend_worker` configures the worker - running within the Loki querier -
# CLI flag: -querier.id
[id: <string> | default = ""]
# The grpc_client block configures the gRPC client used to communicate between a
# client and server component in Loki.
# Configures the querier gRPC client used to communicate with the
# query-frontend. Shouldn't be used in conjunction with 'grpc_client_config'.
# The CLI flags prefix for this block configuration is:
# querier.frontend-grpc-client
[query_frontend_grpc_client: <grpc_client>]
# Configures the querier gRPC client used to communicate with the query-frontend
# and with the query-scheduler if 'query_scheduler_grpc_client' isn't defined.
# This shouldn't be used if 'query_frontend_grpc_client' is defined.
# The CLI flags prefix for this block configuration is: querier.frontend-client
[grpc_client_config: <grpc_client>]
# Configures the querier gRPC client used to communicate with the
# query-scheduler. If not defined, 'grpc_client_config' is used instead.
# The CLI flags prefix for this block configuration is:
# querier.scheduler-grpc-client
[query_scheduler_grpc_client: <grpc_client>]
```

### gcs_storage_config
Expand Down Expand Up @@ -2297,6 +2310,8 @@ The `grpc_client` block configures the gRPC client used to communicate between a
- `ingester.client`
- `pattern-ingester.client`
- `querier.frontend-client`
- `querier.frontend-grpc-client`
- `querier.scheduler-grpc-client`
- `query-scheduler.grpc-client-config`
- `ruler.client`
- `tsdb.shipper.index-gateway-client.grpc`
Expand Down Expand Up @@ -2925,8 +2940,10 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name component workload job]]

# Discover and add log levels during ingestion, if not present already. Levels
# would be added to Structured Metadata with name 'level' and one of the values
# from 'debug', 'info', 'warn', 'error', 'critical', 'fatal'.
# would be added to Structured Metadata with name
# level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and
# one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical',
# 'fatal' (case insensitive).
# CLI flag: -validation.discover-log-levels
[discover_log_levels: <boolean> | default = true]

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ require (
github.com/IBM/go-sdk-core/v5 v5.13.1
github.com/IBM/ibm-cos-sdk-go v1.10.0
github.com/axiomhq/hyperloglog v0.0.0-20240124082744-24bca3a5b39b
github.com/buger/jsonparser v1.1.1
github.com/d4l3k/messagediff v1.2.1
github.com/dolthub/swiss v0.2.1
github.com/efficientgo/core v1.0.0-rc.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ github.com/bmatcuk/doublestar v1.3.4/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/caddyserver/caddy v1.0.4/go.mod h1:uruyfVsyMcDb3IOzSKsi1x0wOjy1my/PxOSTcD+24jM=
Expand Down
26 changes: 18 additions & 8 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
Expand All @@ -87,7 +89,7 @@ type Gateway struct {

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreWithMetrics

pendingTasks *atomic.Int64

Expand All @@ -109,7 +111,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, store bloomshipper.StoreWithMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
utillog.WarnExperimentalUse("Bloom Gateway", logger)
g := &Gateway{
cfg: cfg,
Expand Down Expand Up @@ -203,13 +205,15 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, err
}

logger := log.With(g.logger, "tenant", tenantID)

sp, ctx := opentracing.StartSpanFromContext(ctx, "bloomgateway.FilterChunkRefs")
stats, ctx := ContextWithEmptyStats(ctx)
logger := spanlogger.FromContextWithFallback(
ctx,
util_log.WithContext(ctx, g.logger),
)

defer func() {
level.Info(logger).Log(stats.KVArgs()...)
sp.LogKV(stats.KVArgs()...)
sp.Finish()
}()

Expand Down Expand Up @@ -270,7 +274,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
"series_requested", len(req.Refs),
)

if len(seriesByDay) != 1 {
if len(seriesByDay) > 1 {
stats.Status = labelFailure
return nil, errors.New("request time range must span exactly one day")
}
Expand Down Expand Up @@ -319,6 +323,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
preFilterChunks += len(series.Refs)
}

combinedRecorder := v1.NewBloomRecorder(ctx, "combined")
for remaining > 0 {
select {
case <-ctx.Done():
Expand All @@ -330,10 +335,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses)
combinedRecorder.Merge(task.recorder)
remaining--
}
}

combinedRecorder.Report(util_log.WithContext(ctx, g.logger), g.bloomStore.BloomMetrics())
sp.LogKV("msg", "received all responses")

start := time.Now()
Expand All @@ -348,7 +355,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

postFilterSeries := len(filtered)

for _, group := range req.Refs {
for _, group := range filtered {
postFilterChunks += len(group.Refs)
}
g.metrics.requestedSeries.Observe(float64(preFilterSeries))
Expand Down Expand Up @@ -421,7 +428,10 @@ func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
// TODO(owen-d): improve perf. This can be faster with a more specialized impl
// NB(owen-d): `req` is mutated in place for performance, but `responses` is not
// Removals of the outputs must be sorted.
func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs {
func filterChunkRefs(
req *logproto.FilterChunkRefRequest,
responses [][]v1.Output,
) []*logproto.GroupedChunkRefs {
res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))

// dedupe outputs, merging the same series.
Expand Down
28 changes: 19 additions & 9 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ type Task struct {

// log enqueue time so we can observe the time spent in the queue
enqueueTime time.Time

// recorder
recorder *v1.BloomRecorder
}

func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) Task {
return Task{
tenant: tenantID,
recorder: v1.NewBloomRecorder(ctx, "task"),
err: new(wrappedError),
resCh: make(chan v1.Output),
filters: filters,
Expand Down Expand Up @@ -113,6 +117,7 @@ func (t Task) CloseWithError(err error) {
// Copy returns a copy of the existing task but with a new slice of grouped chunk refs
func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
return Task{
recorder: t.recorder,
tenant: t.tenant,
err: t.err,
resCh: t.resCh,
Expand All @@ -126,22 +131,26 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
}
}

func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request] {
func (t Task) RequestIter(
tokenizer *v1.NGramTokenizer,
) v1.Iterator[v1.Request] {
return &requestIterator{
series: v1.NewSliceIter(t.series),
search: v1.FiltersToBloomTest(tokenizer, t.filters...),
channel: t.resCh,
curr: v1.Request{},
recorder: t.recorder,
series: v1.NewSliceIter(t.series),
search: v1.FiltersToBloomTest(tokenizer, t.filters...),
channel: t.resCh,
curr: v1.Request{},
}
}

var _ v1.Iterator[v1.Request] = &requestIterator{}

type requestIterator struct {
series v1.Iterator[*logproto.GroupedChunkRefs]
search v1.BloomTest
channel chan<- v1.Output
curr v1.Request
recorder *v1.BloomRecorder
series v1.Iterator[*logproto.GroupedChunkRefs]
search v1.BloomTest
channel chan<- v1.Output
curr v1.Request
}

// At implements v1.Iterator.
Expand All @@ -162,6 +171,7 @@ func (it *requestIterator) Next() bool {
}
group := it.series.At()
it.curr = v1.Request{
Recorder: it.recorder,
Fp: model.Fingerprint(group.Fingerprint),
Chks: convertToChunkRefs(group.Refs),
Search: it.search,
Expand Down
15 changes: 9 additions & 6 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/grafana/dskit/concurrency"
Expand Down Expand Up @@ -155,11 +154,15 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))

for _, task := range tasks {
if sp := opentracing.SpanFromContext(task.ctx); sp != nil {
md, _ := blockQuerier.Metadata()
blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md)
sp.LogKV("process block", blk.String(), "series", len(task.series))
}
// NB(owen-d): can be helpful for debugging, but is noisy
// and don't feel like threading this through a configuration

// if sp := opentracing.SpanFromContext(task.ctx); sp != nil {
// md, _ := blockQuerier.Metadata()
// blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md)
// blockID := blk.String()
// sp.LogKV("process block", blockID, "series", len(task.series))
// }

it := v1.NewPeekingIter(task.RequestIter(tokenizer))
iters = append(iters, it)
Expand Down
5 changes: 5 additions & 0 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/logql/syntax"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand All @@ -40,6 +41,10 @@ type dummyStore struct {
err error
}

func (s *dummyStore) BloomMetrics() *v1.Metrics {
return v1.NewMetrics(nil)
}

func (s *dummyStore) ResolveMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([][]bloomshipper.MetaRef, []*bloomshipper.Fetcher, error) {
time.Sleep(s.delay)

Expand Down
11 changes: 2 additions & 9 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bloomgateway

import (
"sort"
"time"

"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
Expand All @@ -13,13 +12,8 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

func getDayTime(ts model.Time) time.Time {
return ts.Time().UTC().Truncate(Day)
}

func truncateDay(ts model.Time) model.Time {
// model.minimumTick is time.Millisecond
return ts - (ts % model.Time(24*time.Hour/time.Millisecond))
return model.TimeFromUnix(ts.Time().Truncate(Day).Unix())
}

// getFromThrough assumes a list of ShortRefs sorted by From time
Expand Down Expand Up @@ -125,7 +119,7 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto
})

// All chunks fall outside of the range
if min == len(chunks) || max == 0 {
if min == len(chunks) || max == 0 || min == max {
continue
}

Expand All @@ -135,7 +129,6 @@ func partitionSeriesByDay(from, through model.Time, seriesWithChunks []*logproto
if chunks[max-1].Through > maxTs {
maxTs = chunks[max-1].Through
}
// fmt.Println("day", day, "series", series.Fingerprint, "minTs", minTs, "maxTs", maxTs)

res = append(res, &logproto.GroupedChunkRefs{
Fingerprint: series.Fingerprint,
Expand Down
Loading

0 comments on commit 6bc60a6

Please sign in to comment.