Skip to content

Commit

Permalink
Merge branch 'main' into doanbutar-patch-3
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Mar 26, 2024
2 parents dbbce19 + c0c7a19 commit 36533ef
Show file tree
Hide file tree
Showing 129 changed files with 6,302 additions and 950 deletions.
35 changes: 21 additions & 14 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2695,18 +2695,18 @@ ring:
# CLI flag: -bloom-compactor.compaction-interval
[compaction_interval: <duration> | default = 10m]
# How many index periods (days) to wait before building bloom filters for a
# table. This can be used to lower cost by not re-writing data to object storage
# too frequently since recent data changes more often.
# CLI flag: -bloom-compactor.min-table-compaction-period
[min_table_compaction_period: <int> | default = 1]
# The maximum number of index periods (days) to build bloom filters for a table.
# This can be used to lower cost by not trying to compact older data which
# doesn't change. This can be optimized by aligning it with the maximum
# `reject_old_samples_max_age` setting of any tenant.
# CLI flag: -bloom-compactor.max-table-compaction-period
[max_table_compaction_period: <int> | default = 7]
# Newest day-table offset (from today, inclusive) to compact. Increase to lower
# cost by not re-writing data to object storage too frequently since recent data
# changes more often at the cost of not having blooms available as quickly.
# CLI flag: -bloom-compactor.min-table-offset
[min_table_offset: <int> | default = 1]
# Oldest day-table offset (from today, inclusive) to compact. This can be used
# to lower cost by not trying to compact older data which doesn't change. This
# can be optimized by aligning it with the maximum `reject_old_samples_max_age`
# setting of any tenant.
# CLI flag: -bloom-compactor.max-table-offset
[max_table_offset: <int> | default = 2]

# Number of workers to run in parallel for compaction.
# CLI flag: -bloom-compactor.worker-parallelism
Expand Down Expand Up @@ -2871,11 +2871,18 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -querier.tsdb-max-query-parallelism
[tsdb_max_query_parallelism: <int> | default = 128]

# Maximum number of bytes assigned to a single sharded query. Also expressible
# in human readable forms (1GB, etc).
# Target maximum number of bytes assigned to a single sharded query. Also
# expressible in human readable forms (1GB, etc). Note: This is a _target_ and
# not an absolute limit. The actual limit can be higher, but the query planner
# will try to build shards up to this limit.
# CLI flag: -querier.tsdb-max-bytes-per-shard
[tsdb_max_bytes_per_shard: <int> | default = 600MB]

# sharding strategy to use in query planning. Suggested to use bounded once all
# nodes can recognize it.
# CLI flag: -limits.tsdb-sharding-strategy
[tsdb_sharding_strategy: <string> | default = "power_of_two"]

# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
Expand Down
28 changes: 26 additions & 2 deletions docs/sources/release-notes/v2-9.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,38 @@ Grafana Labs is excited to announce the release of Loki 2.9.0 Here's a summary o
- The `-ingester.unordered-writes` CLI flag is deprecated and will always default to `true` in the next major release.
- For the full list of deprecations, see CHANGELOG.md


## Bug fixes

### 2.9.5 (2024-02-28)

* Bump base images and Go dependencies to address CVEs ([#12092](https://github.com/grafana/loki/issues/12092)) ([eee3598](https://github.com/grafana/loki/commit/eee35983f38fe04543b169ffa8ece76c23c4217b)).

For a full list of all changes and fixes, refer to the [CHANGELOG](https://github.com/grafana/loki/blob/release-2.9.x/CHANGELOG.md).

### 2.9.4 (2024-01-24)

- Fixed a couple of data races that can cause panics due to concurrent read-write access of tenant configs.
- Fixed a bug in the log results cache.
- Fixed the cache to atomically check background cache size limit correctly.
- Fixed the discrepancy between the semantics of logs and metrics queries.
- Fixed promtail default scrape config causing CPU and memory load.
- Update golang.org/x/crypto to v0.18.0.

For a full list of all changes and fixes, refer to the [CHANGELOG](https://github.com/grafana/loki/blob/release-2.9.x/CHANGELOG.md).

### 2.9.3 (2023-12-11)

* Upgrade otelhttp from 0.40.0 -> 0.44.0 and base alpine image from 3.18.3 -> 3.18.5 to fix a few CVES (CVE-2023-45142, CVE-2022-21698, CVE-2023-5363).
* Fix querying ingester for label values with a matcher (previously didn't respect the matcher).
* Ensure all lifecycler cfgs ref a valid IPv6 addr and port combination.

For a full list of all changes and fixes, refer to the [CHANGELOG](https://github.com/grafana/loki/blob/release-2.9.x/CHANGELOG.md).

### 2.9.2 (2023-10-16)

* Upgrade go to v1.21.3, golang.org/x/net to v0.17.0 and grpc-go to v1.56.3 to patch CVE-2023-39325 / CVE-2023-44487

For a full list of all changes and fixes, look at the [CHANGELOG](https://github.com/grafana/loki/blob/release-2.9.x/CHANGELOG.md).
For a full list of all changes and fixes, refer to the [CHANGELOG](https://github.com/grafana/loki/blob/release-2.9.x/CHANGELOG.md).

### 2.9.1 (2023-09-14)

Expand Down
2 changes: 1 addition & 1 deletion operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1916,8 +1916,8 @@ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
20 changes: 8 additions & 12 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ type Compactor struct {

sharding util_ring.TenantSharding

metrics *Metrics
btMetrics *v1.Metrics
metrics *Metrics
}

func New(
Expand All @@ -67,7 +66,7 @@ func New(
fetcherProvider stores.ChunkFetcherProvider,
sharding util_ring.TenantSharding,
limits Limits,
store bloomshipper.Store,
store bloomshipper.StoreWithMetrics,
logger log.Logger,
r prometheus.Registerer,
) (*Compactor, error) {
Expand All @@ -78,6 +77,7 @@ func New(
sharding: sharding,
limits: limits,
bloomStore: store,
metrics: NewMetrics(r, store.BloomMetrics()),
}

tsdbStore, err := NewTSDBStores(schemaCfg, storeCfg, clientMetrics, logger)
Expand All @@ -86,10 +86,6 @@ func New(
}
c.tsdbStore = tsdbStore

// initialize metrics
c.btMetrics = v1.NewMetrics(prometheus.WrapRegistererWithPrefix("loki_bloom_tokenizer_", r))
c.metrics = NewMetrics(r, c.btMetrics)

chunkLoader := NewStoreChunkLoader(
fetcherProvider,
c.metrics,
Expand Down Expand Up @@ -258,12 +254,12 @@ func (c *Compactor) runOne(ctx context.Context) error {
func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
// adjust the minimum by one to make it inclusive, which is more intuitive
// for a configuration variable
adjustedMin := min(c.cfg.MinTableCompactionPeriod - 1)
minCompactionPeriod := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod
maxCompactionPeriod := time.Duration(c.cfg.MaxTableCompactionPeriod) * config.ObjectStorageIndexRequiredPeriod
adjustedMin := min(c.cfg.MinTableOffset - 1)
minCompactionDelta := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod
maxCompactionDelta := time.Duration(c.cfg.MaxTableOffset) * config.ObjectStorageIndexRequiredPeriod

from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
from := ts.Add(-maxCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionDelta).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
Expand Down
25 changes: 12 additions & 13 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ type Config struct {
// section and the ingester configuration by default).
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinTableCompactionPeriod int `yaml:"min_table_compaction_period"`
MaxTableCompactionPeriod int `yaml:"max_table_compaction_period"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`

MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
}
Expand All @@ -40,15 +40,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.IntVar(&cfg.WorkerParallelism, "bloom-compactor.worker-parallelism", 1, "Number of workers to run in parallel for compaction.")
// TODO(owen-d): This is a confusing name. Rename it to `min_table_offset`
f.IntVar(&cfg.MinTableCompactionPeriod, "bloom-compactor.min-table-compaction-period", 1, "How many index periods (days) to wait before building bloom filters for a table. This can be used to lower cost by not re-writing data to object storage too frequently since recent data changes more often.")
f.IntVar(&cfg.MinTableOffset, "bloom-compactor.min-table-offset", 1, "Newest day-table offset (from today, inclusive) to compact. Increase to lower cost by not re-writing data to object storage too frequently since recent data changes more often at the cost of not having blooms available as quickly.")
// TODO(owen-d): ideally we'd set this per tenant based on their `reject_old_samples_max_age` setting,
// but due to how we need to discover tenants, we can't do that yet. Tenant+Period discovery is done by
// iterating the table periods in object storage and looking for tenants within that period.
// In order to have this done dynamically, we'd need to account for tenant specific overrides, which are also
// dynamically reloaded.
// I'm doing it the simple way for now.
// TODO(owen-d): This is a confusing name. Rename it to `max_table_offset`
f.IntVar(&cfg.MaxTableCompactionPeriod, "bloom-compactor.max-table-compaction-period", 7, "The maximum number of index periods (days) to build bloom filters for a table. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxTableOffset, "bloom-compactor.max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.")
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
Expand All @@ -67,8 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
if cfg.MinTableCompactionPeriod > cfg.MaxTableCompactionPeriod {
return fmt.Errorf("min_compaction_age must be less than or equal to max_compaction_age")
if cfg.MinTableOffset > cfg.MaxTableOffset {
return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset)
}
if cfg.Ring.ReplicationFactor != ringReplicationFactor {
return errors.New("Replication factor must not be changed as it will not take effect")
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

b.curr = v1.NewBlock(reader)
b.curr = v1.NewBlock(reader, b.metrics.bloomMetrics)
return true
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

res = append(res, v1.NewBlock(reader))
res = append(res, v1.NewBlock(reader, v1.NewMetrics(nil)))
ref := genBlockRef(data[minIdx].Series.Fingerprint, data[maxIdx-1].Series.Fingerprint)
t.Log("create block", ref)
refs = append(refs, ref)
Expand All @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b),
BlockQuerier: v1.NewBlockQuerier(b, false),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := block.Querier()
bq := v1.NewBlockQuerier(block, false)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
24 changes: 7 additions & 17 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
)

const (
Expand Down Expand Up @@ -121,34 +122,22 @@ func (b *BloomTSDBStore) LoadTSDB(
}
}()

return NewTSDBSeriesIter(ctx, idx, bounds)
return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
}

// TSDBStore is an interface for interacting with the TSDB,
// modeled off a relevant subset of the `tsdb.TSDBIndex` struct
type forSeries interface {
ForSeries(
ctx context.Context,
fpFilter index.FingerprintFilter,
from model.Time,
through model.Time,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
matchers ...*labels.Matcher,
) error
}

func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)

if err := f.ForSeries(
ctx,
user,
bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return
return true
default:
res := &v1.Series{
Fingerprint: fp,
Expand All @@ -163,6 +152,7 @@ func NewTSDBSeriesIter(ctx context.Context, f forSeries, bounds v1.FingerprintBo
}

series = append(series, res)
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
Expand Down
9 changes: 5 additions & 4 deletions pkg/bloomcompactor/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type forSeriesTestImpl []*v1.Series

func (f forSeriesTestImpl) ForSeries(
_ context.Context,
_ string,
_ index.FingerprintFilter,
_ model.Time,
_ model.Time,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) bool,
_ ...*labels.Matcher,
) error {
for i := range f {
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestTSDBSeriesIter(t *testing.T) {
},
}
srcItr := v1.NewSliceIter(input)
itr, err := NewTSDBSeriesIter(context.Background(), forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators[*v1.Series](
Expand All @@ -78,7 +79,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) {
t.Run("expires on creation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{
{}, // a single entry
}, v1.NewBounds(0, math.MaxUint64))
require.Error(t, err)
Expand All @@ -87,7 +88,7 @@ func TestTSDBSeriesIter_Expiry(t *testing.T) {

t.Run("expires during consumption", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
itr, err := NewTSDBSeriesIter(ctx, forSeriesTestImpl{
itr, err := NewTSDBSeriesIter(ctx, "", forSeriesTestImpl{
{},
{},
}, v1.NewBounds(0, math.MaxUint64))
Expand Down
Loading

0 comments on commit 36533ef

Please sign in to comment.