Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelHollands authored Apr 2, 2024
2 parents dd0b383 + 71602eb commit 4c3268f
Show file tree
Hide file tree
Showing 28 changed files with 853 additions and 537 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* [11970](https://github.com/grafana/loki/pull/11897) **masslessparticle** Ksonnet: Introduces memory limits to the compactor configuration to avoid unbounded memory usage.
* [12318](https://github.com/grafana/loki/pull/12318) **DylanGuedes** Memcached: Add mTLS support.
* [12392](https://github.com/grafana/loki/pull/12392) **sandeepsukhani** Detect name of service emitting logs and add it as a label.
* [12398](https://github.com/grafana/loki/pull/12398) **kolesnikovae** LogQL: Introduces pattern match filter operators.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type CriConfig struct {
MaxPartialLineSizeTruncate bool `mapstructure:"max_partial_line_size_truncate"`
}

// validateDropConfig validates the DropConfig for the dropStage
// validateCriConfig validates the CriConfig for the cri stage
func validateCriConfig(cfg *CriConfig) error {
if cfg.MaxPartialLines == 0 {
cfg.MaxPartialLines = MaxPartialLinesSize
Expand Down
4 changes: 2 additions & 2 deletions clients/pkg/logentry/stages/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var testNonAppLogLine = `

var plName = "testPipeline"

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
// TestLimitWaitPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitWaitPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitWaitYaml), &plName, registry)
Expand All @@ -78,7 +78,7 @@ func TestLimitWaitPipeline(t *testing.T) {
assert.Equal(t, out[0].Line, testMatchLogLineApp1)
}

// TestLimitPipeline is used to verify we properly parse the yaml config and create a working pipeline
// TestLimitDropPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestLimitDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util_log.Logger, loadConfig(testLimitDropYaml), &plName, registry)
Expand Down
15 changes: 5 additions & 10 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2351,7 +2351,8 @@ tsdb_shipper:

[ingesterdbretainperiod: <duration>]

# Configures Bloom Shipper.
# Configures the bloom shipper component, which contains the store abstraction
# to fetch bloom filters from and put them to object storage.
bloom_shipper:
# Working directory to store downloaded bloom blocks. Supports multiple
# directories, separated by comma.
Expand All @@ -2363,15 +2364,9 @@ bloom_shipper:
# CLI flag: -bloom.max-query-page-size
[max_query_page_size: <int> | default = 64MiB]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
[workers_count: <int> | default = 16]

# Maximum number of task in queue per tenant per bloom-gateway. Enqueuing
# the tasks above this limit will fail an error.
# CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant
[max_tasks_enqueued_per_tenant: <int> | default = 10000]
# The amount of maximum concurrent bloom blocks downloads.
# CLI flag: -bloom.download-parallelism
[download_parallelism: <int> | default = 16]

blocks_cache:
# Cache for bloom blocks. Soft limit of the cache in bytes. Exceeding this
Expand Down
1 change: 1 addition & 0 deletions docs/sources/send-data/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ These third-party clients also enable sending logs to Loki:
- [promtail-client](https://github.com/afiskon/promtail-client) (Go)
- [push-to-loki.py](https://github.com/sleleko/devops-kb/blob/master/python/push-to-loki.py) (Python 3)
- [python-logging-loki](https://pypi.org/project/python-logging-loki/) (Python 3)
- [nextlog](https://pypi.org/project/nextlog/) (Python 3)
- [Serilog-Sinks-Loki](https://github.com/JosephWoodward/Serilog-Sinks-Loki) (C#)
- [Vector Loki Sink](https://vector.dev/docs/reference/configuration/sinks/loki/)
- [winston-loki](https://github.com/JaniAnttonen/winston-loki) (JS)
2 changes: 0 additions & 2 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ storage_config:
cache_location: {{.dataPath}}/tsdb-cache
bloom_shipper:
working_directory: {{.dataPath}}/bloom-shipper
blocks_downloading_queue:
workers_count: 1
bloom_gateway:
enabled: false
Expand Down
6 changes: 2 additions & 4 deletions pkg/bloomcompactor/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,8 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B
Directory: workDir,
},
BloomShipperConfig: config.Config{
WorkingDirectory: []string{workDir},
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},
WorkingDirectory: []string{workDir},
DownloadParallelism: 1,
BlocksCache: config.BlocksCacheConfig{
SoftLimit: 1 << 20,
HardLimit: 2 << 20,
Expand Down
6 changes: 2 additions & 4 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {
}
storageCfg := storage.Config{
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: []string{t.TempDir()},
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
WorkingDirectory: []string{t.TempDir()},
DownloadParallelism: 1,
BlocksCache: bloomshipperconfig.BlocksCacheConfig{
SoftLimit: flagext.Bytes(10 << 20),
HardLimit: flagext.Bytes(20 << 20),
Expand Down
45 changes: 45 additions & 0 deletions pkg/logql/log/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logql/log/pattern"
"github.com/grafana/loki/pkg/util"
)

Expand All @@ -23,6 +24,8 @@ const (
LineMatchNotEqual
LineMatchRegexp
LineMatchNotRegexp
LineMatchPattern
LineMatchNotPattern
)

func (t LineMatchType) String() string {
Expand All @@ -35,6 +38,10 @@ func (t LineMatchType) String() string {
return "|~"
case LineMatchNotRegexp:
return "!~"
case LineMatchPattern:
return "|>"
case LineMatchNotPattern:
return "!>"
default:
return ""
}
Expand Down Expand Up @@ -553,6 +560,10 @@ func NewFilter(match string, mt LineMatchType) (Filterer, error) {
return newContainsFilter([]byte(match), false), nil
case LineMatchNotEqual:
return NewNotFilter(newContainsFilter([]byte(match), false)), nil
case LineMatchPattern:
return newPatternFilterer([]byte(match), true)
case LineMatchNotPattern:
return newPatternFilterer([]byte(match), false)
default:
return nil, fmt.Errorf("unknown matcher: %v", match)
}
Expand Down Expand Up @@ -783,3 +794,37 @@ func (s *RegexSimplifier) simplifyConcatAlternate(reg *syntax.Regexp, literal []
}
return nil, false
}

type patternFilter struct {
matcher *pattern.Matcher
pattern []byte
}

func newPatternFilterer(p []byte, match bool) (MatcherFilterer, error) {
m, err := pattern.ParseLineFilter(p)
if err != nil {
return nil, err
}
filter := &patternFilter{
matcher: m,
pattern: p,
}
if !match {
return NewNotFilter(filter), nil
}
return filter, nil
}

func (f *patternFilter) Filter(line []byte) bool { return f.matcher.Test(line) }

func (f *patternFilter) Matches(test Checker) bool {
return test.Test(f.pattern, false, false)
}

func (f *patternFilter) ToStage() Stage {
return StageFunc{
process: func(_ int64, line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, f.Filter(line)
},
}
}
2 changes: 1 addition & 1 deletion pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte
func (l *LogfmtParser) RequiredLabelNames() []string { return []string{} }

type PatternParser struct {
matcher pattern.Matcher
matcher *pattern.Matcher
names []string
}

Expand Down
33 changes: 24 additions & 9 deletions pkg/logql/log/pattern/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ func (e expr) validate() error {
return ErrNoCapture
}
// Consecutive captures are not allowed.
if err := e.validateNoConsecutiveCaptures(); err != nil {
return err
}
caps := e.captures()
uniq := map[string]struct{}{}
for _, c := range caps {
if _, ok := uniq[c]; ok {
return fmt.Errorf("duplicate capture name (%s): %w", c, ErrInvalidExpr)
}
uniq[c] = struct{}{}
}
return nil
}

func (e expr) validateNoConsecutiveCaptures() error {
for i, n := range e {
if i+1 >= len(e) {
break
Expand All @@ -30,21 +45,21 @@ func (e expr) validate() error {
}
}
}
return nil
}

caps := e.captures()
uniq := map[string]struct{}{}
for _, c := range caps {
if _, ok := uniq[c]; ok {
return fmt.Errorf("duplicate capture name (%s): %w", c, ErrInvalidExpr)
func (e expr) validateNoNamedCaptures() error {
for i, n := range e {
if c, ok := e[i].(capture); ok && !c.isUnnamed() {
return fmt.Errorf("%w: found '%s'", ErrCaptureNotAllowed, n.String())
}
uniq[c] = struct{}{}
}
return nil
}

func (e expr) captures() (captures []string) {
for _, n := range e {
if c, ok := n.(capture); ok && !c.isUnamed() {
if c, ok := n.(capture); ok && !c.isUnnamed() {
captures = append(captures, c.Name())
}
}
Expand All @@ -65,8 +80,8 @@ func (c capture) Name() string {
return string(c)
}

func (c capture) isUnamed() bool {
return string(c) == underscore
func (c capture) isUnnamed() bool {
return len(c) == 1 && c[0] == underscore[0]
}

type literals []byte
Expand Down
6 changes: 5 additions & 1 deletion pkg/logql/log/pattern/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ func init() {
}

func parseExpr(input string) (expr, error) {
return parseExprBytes([]byte(input))
}

func parseExprBytes(input []byte) (expr, error) {
l := newLexer()
l.setData([]byte(input))
l.setData(input)
e := exprNewParser().Parse(l)
if e != 0 || len(l.errs) > 0 {
return nil, l.errs[0]
Expand Down
Loading

0 comments on commit 4c3268f

Please sign in to comment.