Skip to content

Commit

Permalink
Merge branch 'main' of github.com:grafana/loki into poyzannur/bug-rep…
Browse files Browse the repository at this point in the history
…ort-issues/9042
  • Loading branch information
poyzannur committed Jan 23, 2024
2 parents 27ec48a + de251c3 commit 64aa16c
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 24 deletions.
9 changes: 9 additions & 0 deletions docs/sources/setup/install/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,15 @@ null
<td><pre lang="json">
null
</pre>
</td>
</tr>
<tr>
<td>gateway.nginxConfig.enableIPv6</td>
<td>bool</td>
<td>Enable listener for IPv6, disable on IPv4-only systems</td>
<td><pre lang="json">
true
</pre>
</td>
</tr>
<tr>
Expand Down
17 changes: 12 additions & 5 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
"-ingester.wal-enabled=false",
"-query-scheduler.use-scheduler-ring=false",
"-store.index-cache-read.embedded-cache.enabled=true",
"-querier.split-queries-by-interval=24h",
}

tenantID := randStringRunes()
Expand Down Expand Up @@ -1173,7 +1174,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
)
require.NoError(t, clu.Run())

now := time.Now()
now := time.Date(2024, time.January, 19, 12, 0, 0, 0, time.UTC)

cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
Expand All @@ -1195,16 +1196,22 @@ func TestBloomFiltersEndToEnd(t *testing.T) {

lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"`
// ingest logs from 10 different pods
// from now-60m to now-55m
// each line contains a random, unique string
// that string is used to verify filtering using bloom gateway
uniqueStrings := make([]string, 600)
uniqueStrings := make([]string, 5*60)
for i := 0; i < len(uniqueStrings); i++ {
id := randStringRunes()
id = fmt.Sprintf("%s-%d", id, i)
uniqueStrings[i] = id
pod := fmt.Sprintf("pod-%d", i%10)
line := fmt.Sprintf(lineTpl, id)
err := cliDistributor.PushLogLine(line, now.Add(-1*time.Hour).Add(time.Duration(i-len(uniqueStrings))*time.Second), nil, map[string]string{"pod": pod})
err := cliDistributor.PushLogLine(
line,
now.Add(-1*time.Hour).Add(time.Duration(i)*time.Second),
nil,
map[string]string{"pod": pod},
)
require.NoError(t, err)
}

Expand All @@ -1225,8 +1232,8 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
// use bloom gateway to perform needle in the haystack queries
randIdx := rand.Intn(len(uniqueStrings))
q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[randIdx])
end := now.Add(-1 * time.Second)
start := end.Add(-24 * time.Hour)
start := now.Add(-90 * time.Minute)
end := now.Add(-30 * time.Minute)
resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(context.Background(), q, start, end)
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions operator/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

- [11513](https://github.com/grafana/loki/pull/11513) **btaani**: Add a custom metric that collects Lokistacks requiring a schema upgrade
- [11718](https://github.com/grafana/loki/pull/11718) **periklis**: Upgrade k8s.io, sigs.k8s.io and openshift deps
- [11671](https://github.com/grafana/loki/pull/11671) **JoaoBraveCoding**: Update mixins to fix structured metadata dashboards
- [11624](https://github.com/grafana/loki/pull/11624) **xperimental**: React to changes in ConfigMap used for storage CA
Expand Down
25 changes: 25 additions & 0 deletions operator/docs/lokistack/sop.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,28 @@ The query queue is currently under high load.
### Steps

- Increase the number of queriers

## Lokistack Storage Schema Warning

### Impact

The LokiStack warns on a newer object storage schema being available for configuration.

### Summary

The schema configuration does not contain the most recent schema version and needs an update.

### Severity

`Warning`

### Access Required

- Console access to the cluster
- Edit access to the namespace where the LokiStack is deployed:
- OpenShift
- `openshift-logging` (LokiStack)

### Steps

- Add a new object storage schema V13 with a future EffectiveDate
4 changes: 2 additions & 2 deletions operator/internal/handlers/lokistack_create_or_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func CreateOrUpdateLokiStack(
return kverrors.New("failed to configure lokistack resources", "name", req.NamespacedName)
}

// 1x.extra-small is used only for development, so the metrics will not
// 1x.demo is used only for development, so the metrics will not
// be collected.
if opts.Stack.Size != lokiv1.SizeOneXExtraSmall && opts.Stack.Size != lokiv1.SizeOneXDemo {
if opts.Stack.Size != lokiv1.SizeOneXDemo {
metrics.Collect(&opts.Stack, opts.Name)
}

Expand Down
10 changes: 10 additions & 0 deletions operator/internal/manifests/internal/alerts/prometheus-alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,13 @@ groups:
for: 15m
labels:
severity: warning
- alert: LokistackSchemaUpgradesRequired
annotations:
message: |-
Object storage schema needs upgrade.
summary: "The applied storage schema config is old and should be upgraded."
runbook_url: "[[ .RunbookURL ]]#Lokistack-Schema-Upgrades-Required"
expr: sum by(stack_id) (lokistack_warnings_count) > 0
labels:
severity: warning
resource: '{{ $labels.stack_id}}'
20 changes: 20 additions & 0 deletions operator/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ var (
},
[]string{"size", "stack_id"},
)

lokistackWarningsCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "lokistack_warnings_count",
Help: "Counts the number of warnings set on a LokiStack.",
},
[]string{"reason", "stack_id"},
)
)

// RegisterMetricCollectors registers the prometheus collectors with the k8 default metrics
Expand All @@ -60,6 +68,7 @@ func RegisterMetricCollectors() {
userDefinedLimitsMetric,
globalStreamLimitMetric,
averageTenantStreamLimitMetric,
lokistackWarningsCount,
}

for _, collector := range metricCollectors {
Expand Down Expand Up @@ -104,6 +113,17 @@ func Collect(spec *lokiv1.LokiStackSpec, stackName string) {
setGlobalStreamLimitMetric(size, stackName, globalRate)
setAverageTenantStreamLimitMetric(size, stackName, tenantRate)
}

if len(spec.Storage.Schemas) > 0 && spec.Storage.Schemas[len(spec.Storage.Schemas)-1].Version != lokiv1.ObjectStorageSchemaV13 {
setLokistackSchemaUpgradesRequired(stackName, true)
}
}

func setLokistackSchemaUpgradesRequired(identifier string, active bool) {
lokistackWarningsCount.With(prometheus.Labels{
"reason": string(lokiv1.ReasonStorageNeedsSchemaUpdate),
"stack_id": identifier,
}).Set(boolValue(active))
}

func setDeploymentMetric(size lokiv1.LokiStackSizeType, identifier string, active bool) {
Expand Down
23 changes: 9 additions & 14 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,9 @@ func (it *FilterIter[T]) Err() error {
return nil
}

// FilterRequest extends v1.Request with an error channel
type FilterRequest struct {
v1.Request
Error chan<- error
}

// taskMergeIterator implements v1.Iterator
type taskMergeIterator struct {
curr FilterRequest
curr v1.Request
heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day time.Time
Expand All @@ -178,7 +172,7 @@ type taskMergeIterator struct {
func newTaskMergeIterator(day time.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] {
it := &taskMergeIterator{
tasks: tasks,
curr: FilterRequest{},
curr: v1.Request{},
day: day,
tokenizer: tokenizer,
}
Expand Down Expand Up @@ -210,16 +204,17 @@ func (it *taskMergeIterator) Next() bool {
group := it.heap.At()
task := it.tasks[group.Index()]

it.curr.Fp = model.Fingerprint(group.Value().Fingerprint)
it.curr.Chks = convertToChunkRefs(group.Value().Refs)
it.curr.Searches = convertToSearches(task.Request.Filters, it.tokenizer)
it.curr.Response = task.ResCh
it.curr.Error = task.ErrCh
it.curr = v1.Request{
Fp: model.Fingerprint(group.Value().Fingerprint),
Chks: convertToChunkRefs(group.Value().Refs),
Searches: convertToSearches(task.Request.Filters, it.tokenizer),
Response: task.ResCh,
}
return true
}

func (it *taskMergeIterator) At() v1.Request {
return it.curr.Request
return it.curr
}

func (it *taskMergeIterator) Err() error {
Expand Down
120 changes: 120 additions & 0 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,123 @@ func TestBlockReset(t *testing.T) {

require.Equal(t, rounds[0], rounds[1])
}

// This test is a basic roundtrip test for the merge builder.
// It creates one set of blocks with the same (duplicate) data, and another set of blocks with
// disjoint data. It then merges the two sets of blocks and ensures that the merged blocks contain
// one copy of the first set (duplicate data) and one copy of the second set (disjoint data).
func TestMergeBuilder_Roundtrip(t *testing.T) {
numSeries := 100
numKeysPerSeries := 100
minTs, maxTs := model.Time(0), model.Time(10000)
xs, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, minTs, maxTs)

var data [][]*SeriesWithBloom

// First, we build the blocks

sets := []int{
2, // 2 blocks containint the same data
1, // 1 block containing disjoint data
}

for i, copies := range sets {
for j := 0; j < copies; j++ {
// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)

writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)

builder, err := NewBlockBuilder(
BlockOptions{
schema: Schema{
version: DefaultSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
writer,
)

require.Nil(t, err)
// each set of copies gets a different slice of the data
minIdx, maxIdx := i*len(xs)/len(sets), (i+1)*len(xs)/len(sets)
itr := NewSliceIter[SeriesWithBloom](xs[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader)
querier := NewBlockQuerier(block)

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
var tmp []*SeriesWithBloom
for querier.Next() {
tmp = append(tmp, querier.At())
}
data = append(data, tmp)
}
}

// we keep 2 copies of the data as iterators. One for the blocks, and one for the "store"
// which will force it to reference the same series
var blocks []PeekingIterator[*SeriesWithBloom]
var store []PeekingIterator[*SeriesWithBloom]

for _, x := range data {
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](x)))
store = append(store, NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](x)))
}

orderedStore := NewHeapIterForSeriesWithBloom(store...)
dedupedStore := NewDedupingIter[*SeriesWithBloom, *Series](
func(a *SeriesWithBloom, b *Series) bool {
return a.Series.Fingerprint == b.Fingerprint
},
func(swb *SeriesWithBloom) *Series {
return swb.Series
},
func(a *SeriesWithBloom, b *Series) *Series {
if len(a.Series.Chunks) > len(b.Chunks) {
return a.Series
}
return b
},
NewPeekingIter[*SeriesWithBloom](orderedStore),
)

// build the new block from the old ones
indexBuf, bloomBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomBuf)
reader := NewByteReader(indexBuf, bloomBuf)
mb := NewMergeBuilder(
blocks,
dedupedStore,
func(s *Series, b *Bloom) error {
// We're not actually indexing new data in this test
return nil
},
)
builder, err := NewBlockBuilder(NewBlockOptions(4, 0), writer)
require.Nil(t, err)

checksum, err := mb.Build(builder)
require.Nil(t, err)
require.Equal(t, uint32(0x779633b5), checksum)

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader))
sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs))

EqualIterators[*SeriesWithBloom](
t,
func(a, b *SeriesWithBloom) {
require.Equal(t, a.Series.Fingerprint, b.Series.Fingerprint)
},
sourceItr,
mergedBlockQuerier,
)
}
30 changes: 30 additions & 0 deletions pkg/storage/bloom/v1/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ func TestMergeBlockQuerier_NonOverlapping(t *testing.T) {
require.False(t, mbq.Next())
}

func TestMergeBlockQuerier_Duplicate(t *testing.T) {
var (
numSeries = 100
numKeysPerSeries = 10000
numQueriers = 2
queriers []PeekingIterator[*SeriesWithBloom]
data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
)
for i := 0; i < numQueriers; i++ {
queriers = append(
queriers,
NewPeekingIter[*SeriesWithBloom](
NewSliceIter[*SeriesWithBloom](
PointerSlice[SeriesWithBloom](data),
),
),
)
}

mbq := NewHeapIterForSeriesWithBloom(queriers...)

for i := 0; i < numSeries*2; i++ {
require.True(t, mbq.Next())
exp := data[i/2].Series.Fingerprint
got := mbq.At().Series.Fingerprint
require.Equal(t, exp, got, "on iteration %d", i)
}
require.False(t, mbq.Next())
}

func TestMergeBlockQuerier_Overlapping(t *testing.T) {
var (
numSeries = 100
Expand Down
4 changes: 4 additions & 0 deletions production/helm/loki/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Entries should include a reference to the pull request that introduced the chang
[//]: # (<AUTOMATED_UPDATES_LOCATOR> : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.)


## 5.41.8

- [BUGFIX] Fix gateway: add possibility to disable listening on ipv6 to prevent crash on ipv4-only system.

## 5.41.7

- [FEATURE] Add support to disable specific alert rules
Expand Down
Loading

0 comments on commit 64aa16c

Please sign in to comment.