Skip to content

Commit

Permalink
Merge branch 'main' into GH_12155
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored May 2, 2024
2 parents 9108c06 + fd2301f commit 57ba2f4
Show file tree
Hide file tree
Showing 20 changed files with 383 additions and 145 deletions.
75 changes: 14 additions & 61 deletions docs/sources/alert/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ groups:
annotations:
summary: High request latency
- name: credentials_leak
rules:
rules:
- alert: http-credentials-leaked
annotations:
annotations:
message: "{{ $labels.job }} is leaking http basic auth credentials."
expr: 'sum by (cluster, job, pod) (count_over_time({namespace="prod"} |~ "http(s?)://(\\w+):(\\w+)@" [5m]) > 0)'
for: 10m
labels:
labels:
severity: critical
```
Expand Down Expand Up @@ -160,7 +160,7 @@ Here is an example of a remote-write configuration for sending data to a local P
```yaml
ruler:
... other settings ...
remote_write:
enabled: true
client:
Expand All @@ -186,13 +186,13 @@ We don't always control the source code of applications we run. Load balancers a
Sometimes you want to know whether _any_ instance of something has occurred. Alerting based on logs can be a great way to handle this, such as finding examples of leaked authentication credentials:
```yaml
- name: credentials_leak
rules:
rules:
- alert: http-credentials-leaked
annotations:
annotations:
message: "{{ $labels.job }} is leaking http basic auth credentials."
expr: 'sum by (cluster, job, pod) (count_over_time({namespace="prod"} |~ "http(s?)://(\\w+):(\\w+)@" [5m]) > 0)'
for: 10m
labels:
labels:
severity: critical
```

Expand All @@ -208,76 +208,29 @@ As an example, we can use LogQL v2 to help Loki to monitor _itself_, alerting us

## Interacting with the Ruler

### Cortextool
Because the rule files are identical to Prometheus rule files, we can interact with the Loki Ruler via [`cortextool`](https://github.com/grafana/cortex-tools#rules). The CLI is in early development, but it works with both Loki and Cortex. Pass the `--backend=loki` option when using it with Loki.

{{% admonition type="note" %}}
Not all commands in cortextool currently support Loki.
{{% /admonition %}}
### Lokitool
Because the rule files are identical to Prometheus rule files, we can interact with the Loki Ruler via `lokitool`.

{{% admonition type="note" %}}
cortextool was intended to run against multi-tenant Loki, commands need an `--id=` flag set to the Loki instance ID or set the environment variable `CORTEX_TENANT_ID`. If Loki is running in single tenant mode, the required ID is `fake`.
lokitool is intended to run against multi-tenant Loki. The commands need an `--id=` flag set to the Loki instance ID or set the environment variable `LOKI_TENANT_ID`. If Loki is running in single tenant mode, the required ID is `fake`.
{{% /admonition %}}

An example workflow is included below:

```sh
# lint the rules.yaml file ensuring it's valid and reformatting it if necessary
cortextool rules lint --backend=loki ./output/rules.yaml
lokitool rules lint ./output/rules.yaml
# diff rules against the currently managed ruleset in Loki
cortextool rules diff --rule-dirs=./output --backend=loki
lokitool rules diff --rule-dirs=./output
# ensure the remote ruleset matches your local ruleset, creating/updating/deleting remote rules which differ from your local specification.
cortextool rules sync --rule-dirs=./output --backend=loki
lokitool rules sync --rule-dirs=./output
# print the remote ruleset
cortextool rules print --backend=loki
lokitool rules print
```

### Cortextool Github Actions
There is also a [github action](https://github.com/grafana/cortex-rules-action) available for `cortex-tool`, so you can add it into your CI/CD pipelines!

For instance, you can sync rules on master builds via
```yaml
name: sync-cortex-rules-and-alerts
on:
push:
branches:
- master
env:
CORTEX_ADDRESS: '<fill me in>'
CORTEX_TENANT_ID: '<fill me in>'
CORTEX_API_KEY: ${{ secrets.API_KEY }}
RULES_DIR: 'output/'
jobs:
sync-loki-alerts:
runs-on: ubuntu-18.04
steps:
- name: Lint Rules
uses: grafana/[email protected]
env:
ACTION: 'lint'
with:
args: --backend=loki
- name: Diff rules
uses: grafana/[email protected]
env:
ACTION: 'diff'
with:
args: --backend=loki
- name: Sync rules
if: ${{ !contains(steps.diff-rules.outputs.detailed, 'no changes detected') }}
uses: grafana/[email protected]
env:
ACTION: 'sync'
with:
args: --backend=loki
- name: Print rules
uses: grafana/[email protected]
env:
ACTION: 'print'
```
### Terraform

With the [Terraform provider for Loki](https://registry.terraform.io/providers/fgouteroux/loki/latest), you can manage alerts and recording rules in Terraform HCL format:
Expand Down
10 changes: 5 additions & 5 deletions docs/sources/query/template_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,26 +303,26 @@ Example:

Use this function to test to see if one string is contained inside of another.

Signature: `contains(s string, src string) bool`
Signature: `contains(src string, s string,) bool`

Examples:

```template
{{ if contains .err "ErrTimeout" }} timeout {{end}}
{{ if contains "ErrTimeout" .err }} timeout {{end}}
{{ if contains "he" "hello" }} yes {{end}}
```

## eq

Use this function to test to see if one string has exact matching inside of another.

Signature: `eq(s string, src string) bool`
Signature: `eq(src string, s string) bool`

Examples:

```template
{{ if eq .err "ErrTimeout" }} timeout {{end}}
{{ if eq "he" "hello" }} yes {{end}}
{{ if eq "ErrTimeout" .err}} timeout {{end}}
{{ if eq "hello" "hello" }} yes {{end}}
```

## hasPrefix and hasSuffix
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/ncw/swift v1.0.53
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
Expand Down
7 changes: 1 addition & 6 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
tasks := make([]Task, 0, len(seriesByDay))
responses := make([][]v1.Output, 0, len(seriesByDay))
for _, seriesForDay := range seriesByDay {
task, err := NewTask(ctx, tenantID, seriesForDay, filters, blocks)
if err != nil {
return nil, err
}
task := newTask(ctx, tenantID, seriesForDay, filters, blocks)
// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))
tasks = append(tasks, task)
Expand All @@ -298,7 +295,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
for _, task := range tasks {
task := task
task.enqueueTime = time.Now()
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))

// TODO(owen-d): gracefully handle full queues
if err := g.queue.Enqueue(tenantID, nil, task, func() {
Expand Down Expand Up @@ -329,7 +325,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
stats.Status = "cancel"
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
stats.Status = labelFailure
return nil, errors.Wrap(task.Err(), "request failed")
Expand Down
31 changes: 6 additions & 25 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package bloomgateway

import (
"context"
"math/rand"
"sync"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
Expand All @@ -20,10 +18,6 @@ const (
Day = 24 * time.Hour
)

var (
entropy = rand.New(rand.NewSource(time.Now().UnixNano()))
)

type tokenSettings struct {
nGramLen int
}
Expand All @@ -45,10 +39,8 @@ func (e *wrappedError) Set(err error) {

// Task is the data structure that is enqueued to the internal queue and dequeued by query workers
type Task struct {
// ID is a lexcographically sortable unique identifier of the task
ID ulid.ULID
// Tenant is the tenant ID
Tenant string
// tenant is the tenant ID
tenant string

// channel to write partial responses to
resCh chan v1.Output
Expand Down Expand Up @@ -79,18 +71,9 @@ type Task struct {
enqueueTime time.Time
}

// NewTask returns a new Task that can be enqueued to the task queue.
// In addition, it returns a result and an error channel, as well
// as an error if the instantiation fails.
func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) (Task, error) {
key, err := ulid.New(ulid.Now(), entropy)
if err != nil {
return Task{}, err
}

task := Task{
ID: key,
Tenant: tenantID,
func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) Task {
return Task{
tenant: tenantID,
err: new(wrappedError),
resCh: make(chan v1.Output),
filters: filters,
Expand All @@ -101,7 +84,6 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filt
ctx: ctx,
done: make(chan struct{}),
}
return task, nil
}

// Bounds implements Bounded
Expand Down Expand Up @@ -130,9 +112,8 @@ func (t Task) CloseWithError(err error) {

// Copy returns a copy of the existing task but with a new slice of grouped chunk refs
func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
// do not copy ID to distinguish it as copied task
return Task{
Tenant: t.Tenant,
tenant: t.tenant,
err: t.err,
resCh: t.resCh,
filters: t.filters,
Expand Down
8 changes: 3 additions & 5 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func TestTask(t *testing.T) {
},
}
swb := partitionRequest(req)[0]
task, err := NewTask(context.Background(), "tenant", swb, nil, nil)
require.NoError(t, err)
task := newTask(context.Background(), "tenant", swb, nil, nil)
from, through := task.Bounds()
require.Equal(t, ts.Add(-1*time.Hour), from)
require.Equal(t, ts, through)
Expand All @@ -45,8 +44,7 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F
tasks := make([]Task, 0, len(requests))
for _, r := range requests {
for _, swb := range partitionRequest(r) {
task, err := NewTask(context.Background(), tenant, swb, nil, nil)
require.NoError(t, err)
task := newTask(context.Background(), tenant, swb, nil, nil)
tasks = append(tasks, task)
}
}
Expand All @@ -63,7 +61,7 @@ func TestTask_RequestIterator(t *testing.T) {
interval: bloomshipper.Interval{Start: 0, End: math.MaxInt64},
series: []*logproto.GroupedChunkRefs{},
}
task, _ := NewTask(context.Background(), tenant, swb, []syntax.LineFilterExpr{}, nil)
task := newTask(context.Background(), tenant, swb, []syntax.LineFilterExpr{}, nil)
it := task.RequestIter(tokenizer)
// nothing to iterate over
require.False(t, it.Next())
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
}

func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error {
tenant := tasks[0].Tenant
tenant := tasks[0].tenant
level.Info(p.logger).Log(
"msg", "process tasks with bounds",
"tenant", tenant,
Expand Down Expand Up @@ -157,7 +157,7 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
for _, task := range tasks {
if sp := opentracing.SpanFromContext(task.ctx); sp != nil {
md, _ := blockQuerier.Metadata()
blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md)
blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md)
sp.LogKV("process block", blk.String(), "series", len(task.series))
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestProcessor(t *testing.T) {
}

t.Log("series", len(swb.series))
task, _ := NewTask(ctx, "fake", swb, filters, nil)
task := newTask(ctx, "fake", swb, filters, nil)
tasks := []Task{task}

results := atomic.NewInt64(0)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestProcessor(t *testing.T) {
}

t.Log("series", len(swb.series))
task, _ := NewTask(ctx, "fake", swb, filters, blocks)
task := newTask(ctx, "fake", swb, filters, blocks)
tasks := []Task{task}

results := atomic.NewInt64(0)
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestProcessor(t *testing.T) {
}

t.Log("series", len(swb.series))
task, _ := NewTask(ctx, "fake", swb, filters, nil)
task := newTask(ctx, "fake", swb, filters, nil)
tasks := []Task{task}

results := atomic.NewInt64(0)
Expand Down
1 change: 0 additions & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (w *worker) running(_ context.Context) error {
w.queue.ReleaseRequests(items)
return errors.Errorf("failed to cast dequeued item to Task: %v", item)
}
level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID)
_ = w.pending.Dec()
w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds())
FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime))
Expand Down
3 changes: 3 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ func ParseIndexShardsQuery(r *http.Request) (*RangeQuery, datasize.ByteSize, err
return nil, 0, err
}
targetBytes, err := parseBytes(r, "targetBytesPerShard", true)
if targetBytes <= 0 {
return nil, 0, errors.New("targetBytesPerShard must be a positive value")
}
return parsed, targetBytes, err
}

Expand Down
1 change: 0 additions & 1 deletion pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (d *Drain) Match(content string) *LogCluster {
}

func (d *Drain) getContentAsTokens(content string) []string {
content = strings.TrimSpace(content)
for _, extraDelimiter := range d.config.ExtraDelimiters {
content = strings.Replace(content, extraDelimiter, " ", -1)
}
Expand Down
Loading

0 comments on commit 57ba2f4

Please sign in to comment.