Skip to content

Commit

Permalink
Loki: Implement initial phase of limited_log_push_errors (#9556)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Adds an initial implementation of `limited_log_push_errors`.
This initial implementation encompass:
- Runtime per-tenant configuration
- Simple per-tenant rate-limiting based on error message size

Notable features that will be added in future phases:
- Instead of a single error with the final string, give to the manager
the list of all entries
- Once it supports per-entry error, make the rate-limiting separated
per-reason
- Hash the entry error and avoid repeating errors by caching seen errors
in memory
  • Loading branch information
DylanGuedes authored May 31, 2023
1 parent c9a3ff5 commit 7d67b63
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 1 deletion.
7 changes: 7 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,13 @@ rate_store:
# If enabled, detailed logs and spans will be emitted.
# CLI flag: -distributor.rate-store.debug
[debug: <boolean> | default = false]
# Experimental. Customize the logging of write failures.
write_failures_logging:
# Experimental and subject to change. Log volume allowed (per second).
# Default: 1KB.
# CLI flag: -distributor.write-failures-logging.rate
[rate: <int> | default = 1KB]
```

### querier
Expand Down
11 changes: 11 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/loki/pkg/analytics"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
Expand Down Expand Up @@ -63,13 +64,18 @@ type Config struct {
// For testing.
factory ring_client.PoolFactory `yaml:"-"`

// RateStore customizes the rate storing used by stream sharding.
RateStore RateStoreConfig `yaml:"rate_store"`

// WriteFailuresLoggingCfg customizes write failures logging behavior.
WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Experimental. Customize the logging of write failures."`
}

// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)
}

// RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
Expand Down Expand Up @@ -105,6 +111,10 @@ type Distributor struct {
// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
labelCache *lru.Cache

// Push failures rate limiter.
writeFailuresManager *writefailures.Manager

// metrics
ingesterAppends *prometheus.CounterVec
ingesterAppendFailures *prometheus.CounterVec
Expand Down Expand Up @@ -184,6 +194,7 @@ func New(
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
writeFailuresManager: writefailures.NewManager(util_log.Logger, cfg.WriteFailuresLogging, configs),
}

if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
"err", err,
)
}
d.writeFailuresManager.Log(tenantID, err)

http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -60,6 +62,8 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
return
}

d.writeFailuresManager.Log(tenantID, err)

resp, ok := httpgrpc.HTTPResponseFromError(err)
if ok {
body := string(resp.Body)
Expand Down
17 changes: 17 additions & 0 deletions pkg/distributor/writefailures/cfg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package writefailures

import (
"flag"

"github.com/grafana/loki/pkg/util/flagext"
)

type Cfg struct {
LogRate flagext.ByteSize `yaml:"rate" category:"experimental"`
}

// RegisterFlags registers distributor-related flags.
func (cfg *Cfg) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
_ = cfg.LogRate.Set("1KB")
fs.Var(&cfg.LogRate, prefix+".rate", "Experimental and subject to change. Log volume allowed (per second). Default: 1KB.")
}
40 changes: 40 additions & 0 deletions pkg/distributor/writefailures/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package writefailures

import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/limiter"

"github.com/grafana/loki/pkg/runtime"
)

type Manager struct {
limiter *limiter.RateLimiter
logger log.Logger
tenantCfgs *runtime.TenantConfigs
}

func NewManager(logger log.Logger, cfg Cfg, tenants *runtime.TenantConfigs) *Manager {
logger = log.With(logger, "path", "write", "insight", "true")

strat := newStrategy(cfg.LogRate.Val(), float64(cfg.LogRate.Val()))

return &Manager{
limiter: limiter.NewRateLimiter(strat, time.Minute),
logger: logger,
tenantCfgs: tenants,
}
}

func (m *Manager) Log(tenantID string, err error) {
if !m.tenantCfgs.LimitedLogPushErrors(tenantID) {
return
}

errMsg := err.Error()
if m.limiter.AllowN(time.Now(), tenantID, len(errMsg)) {
level.Error(m.logger).Log("msg", "write operation failed", "err", errMsg)
}
}
162 changes: 162 additions & 0 deletions pkg/distributor/writefailures/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package writefailures

import (
"bytes"
"fmt"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/util/flagext"
)

func TestWriteFailuresLogging(t *testing.T) {
t.Run("it only logs for the configured tenants and is disabled by default", func(t *testing.T) {
buf := bytes.NewBuffer(nil)
logger := log.NewLogfmtLogger(buf)

f := func(tenantID string) *runtime.Config {
if tenantID == "good-tenant" {
return &runtime.Config{
LimitedLogPushErrors: true,
}
}
if tenantID == "bad-tenant" {
return &runtime.Config{
LimitedLogPushErrors: false,
}
}
return &runtime.Config{}
}

runtimeCfg, err := runtime.NewTenantConfigs(f)
require.NoError(t, err)

manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg)

manager.Log("bad-tenant", fmt.Errorf("bad-tenant contains invalid entry"))
manager.Log("good-tenant", fmt.Errorf("good-tenant contains invalid entry"))
manager.Log("unknown-tenant", fmt.Errorf("unknown-tenant contains invalid entry"))

content := buf.String()
require.NotEmpty(t, content)
require.Contains(t, content, "good-tenant")
require.NotContains(t, content, "bad-tenant")
require.NotContains(t, content, "unknown-tenant")
})
}

func TestWriteFailuresRateLimiting(t *testing.T) {
buf := bytes.NewBuffer(nil)
logger := log.NewLogfmtLogger(buf)

f := func(tenantID string) *runtime.Config {
return &runtime.Config{
LimitedLogPushErrors: true,
}
}
runtimeCfg, err := runtime.NewTenantConfigs(f)
require.NoError(t, err)

t.Run("with zero rate limiting", func(t *testing.T) {
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(0)}, runtimeCfg)

manager.Log("known-tenant", fmt.Errorf("known-tenant entry error"))

content := buf.String()
require.Empty(t, content)
})

t.Run("bytes exceeded on single message", func(t *testing.T) {
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg)

errorStr := strings.Builder{}
for i := 0; i < 1001; i++ {
errorStr.WriteRune('z')
}

manager.Log("known-tenant", fmt.Errorf(errorStr.String()))

content := buf.String()
require.Empty(t, content)
})

t.Run("valid bytes", func(t *testing.T) {
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1002)}, runtimeCfg)

errorStr := strings.Builder{}
for i := 0; i < 1001; i++ {
errorStr.WriteRune('z')
}

manager.Log("known-tenant", fmt.Errorf(errorStr.String()))

content := buf.String()
require.NotEmpty(t, content)
require.Contains(t, content, errorStr.String())
})

t.Run("limit is reset after a second", func(t *testing.T) {
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg)

errorStr1 := strings.Builder{}
errorStr2 := strings.Builder{}
errorStr3 := strings.Builder{}
for i := 0; i < 999; i++ {
errorStr1.WriteRune('z')
errorStr2.WriteRune('w')
errorStr2.WriteRune('y')
}

manager.Log("known-tenant", fmt.Errorf(errorStr1.String()))
manager.Log("known-tenant", fmt.Errorf(errorStr2.String())) // more than 1KB/s
time.Sleep(time.Second)
manager.Log("known-tenant", fmt.Errorf(errorStr3.String()))

content := buf.String()
require.NotEmpty(t, content)
require.Contains(t, content, errorStr1.String())
require.NotContains(t, content, errorStr2.String())
require.Contains(t, content, errorStr3.String())
})

t.Run("limit is per-tenant", func(t *testing.T) {
runtimeCfg, err := runtime.NewTenantConfigs(f)
require.NoError(t, err)
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg)

errorStr1 := strings.Builder{}
errorStr2 := strings.Builder{}
errorStr3 := strings.Builder{}
for i := 0; i < 998; i++ {
errorStr1.WriteRune('z')
errorStr2.WriteRune('w')
errorStr3.WriteRune('y')
}

manager.Log("tenant1", fmt.Errorf("1%s", errorStr1.String()))
manager.Log("tenant2", fmt.Errorf("2%s", errorStr1.String()))

manager.Log("tenant1", fmt.Errorf("1%s", errorStr2.String())) // limit exceeded for tenant1, Str2 shouldn't be present.
manager.Log("tenant3", fmt.Errorf("3%s", errorStr1.String())) // all fine with tenant3.

time.Sleep(time.Second)
manager.Log("tenant1", fmt.Errorf("1%s", errorStr3.String())) // tenant1 is fine again.
manager.Log("tenant3", fmt.Errorf("3%s", errorStr1.String())) // all fine with tenant3.

content := buf.String()
require.NotEmpty(t, content)
require.Contains(t, content, "1z")
require.Contains(t, content, "2z")

require.NotContains(t, content, "1w") // Str2
require.Contains(t, content, "3z")

require.Contains(t, content, "1y")
require.Contains(t, content, "3z")
})
}
21 changes: 21 additions & 0 deletions pkg/distributor/writefailures/strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package writefailures

type strategy struct {
burst int
rate float64
}

func newStrategy(burst int, rate float64) *strategy {
return &strategy{
burst: burst,
rate: rate,
}
}

func (s *strategy) Burst(tenantID string) int {
return s.burst
}

func (s *strategy) Limit(tenantID string) float64 {
return s.rate
}
1 change: 0 additions & 1 deletion pkg/runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ type Config struct {
LogPushRequestStreams bool `yaml:"log_push_request_streams"`

// LimitedLogPushErrors is to be implemented and will allow logging push failures at a controlled pace.
// TODO(dylanguedes): implement it.
LimitedLogPushErrors bool `yaml:"limited_log_push_errors"`
}

Expand Down

0 comments on commit 7d67b63

Please sign in to comment.