Skip to content

Commit

Permalink
Merge branch 'main' into tomasmota-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasmota authored Jul 24, 2024
2 parents 4024458 + 2642718 commit 7f466df
Show file tree
Hide file tree
Showing 17 changed files with 2,477 additions and 297 deletions.
25 changes: 25 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# querier.
[querier: <querier>]

querier_rf1:
# Enable the RF1 querier. If set, replaces the usual querier with a RF-1
# querier when using 'ALL' target.
# CLI flag: -querier-rf1.enabled
[enabled: <boolean> | default = false]

# Time to wait before sending more than the minimum successful query requests.
# CLI flag: -querier-rf1.extra-query-delay
[extra_query_delay: <duration> | default = 0s]

engine:
# The maximum amount of time to look back for log lines. Used only for
# instant log queries.
# CLI flag: -querier-rf1.engine.max-lookback-period
[max_look_back_period: <duration> | default = 30s]

# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier-rf1.max-concurrent
[max_concurrent: <int> | default = 4]

# When true, querier limits sent via a header are enforced.
# CLI flag: -querier-rf1.per-request-limits-enabled
[per_request_limits_enabled: <boolean> | default = false]

# The query_scheduler block configures the Loki query scheduler. When configured
# it separates the tenant query queues from the query-frontend.
[query_scheduler: <query_scheduler>]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package metastore

import (
"context"
"slices"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
)

func (m *Metastore) ListBlocksForQuery(
ctx context.Context,
request *metastorepb.ListBlocksForQueryRequest,
) (
*metastorepb.ListBlocksForQueryResponse, error,
) {
return m.state.listBlocksForQuery(ctx, request)
}

func (m *metastoreState) listBlocksForQuery(
_ context.Context,
request *metastorepb.ListBlocksForQueryRequest,
) (
*metastorepb.ListBlocksForQueryResponse, error,
) {
if len(request.TenantId) == 0 {
return nil, status.Error(codes.InvalidArgument, "tenant_id is required")
}

if request.StartTime > request.EndTime {
return nil, status.Error(codes.InvalidArgument, "start_time must be less than or equal to end_time")
}
var resp metastorepb.ListBlocksForQueryResponse
m.segmentsMutex.Lock()
defer m.segmentsMutex.Unlock()

for _, segment := range m.segments {
for _, tenants := range segment.TenantStreams {
if tenants.TenantId == request.TenantId && inRange(segment.MinTime, segment.MaxTime, request.StartTime, request.EndTime) {
resp.Blocks = append(resp.Blocks, cloneBlockForQuery(segment))
break
}
}
}
slices.SortFunc(resp.Blocks, func(a, b *metastorepb.BlockMeta) int {
return strings.Compare(a.Id, b.Id)
})
return &resp, nil
}

func inRange(blockStart, blockEnd, queryStart, queryEnd int64) bool {
return blockStart <= queryEnd && blockEnd >= queryStart
}

func cloneBlockForQuery(b *metastorepb.BlockMeta) *metastorepb.BlockMeta {
res := &metastorepb.BlockMeta{
Id: b.Id,
MinTime: b.MinTime,
MaxTime: b.MaxTime,
CompactionLevel: b.CompactionLevel,
FormatVersion: b.FormatVersion,
IndexRef: metastorepb.DataRef{
Offset: b.IndexRef.Offset,
Length: b.IndexRef.Length,
},
TenantStreams: make([]*metastorepb.TenantStreams, 0, len(b.TenantStreams)),
}
for _, svc := range b.TenantStreams {
res.TenantStreams = append(res.TenantStreams, &metastorepb.TenantStreams{
TenantId: svc.TenantId,
MinTime: svc.MinTime,
MaxTime: svc.MaxTime,
})
}
return res
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package metastore

import (
"context"
"testing"

"github.com/stretchr/testify/require"

metastorepb "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
)

func TestMetastore_ListBlocksForQuery(t *testing.T) {
block1, block2, block3 := &metastorepb.BlockMeta{
Id: "block1",
MinTime: 0,
MaxTime: 100,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenant1",
MinTime: 0,
MaxTime: 50,
},
},
}, &metastorepb.BlockMeta{
Id: "block2",
MinTime: 100,
MaxTime: 200,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenant1",
MinTime: 100,
MaxTime: 150,
},
},
}, &metastorepb.BlockMeta{
Id: "block3",
MinTime: 200,
MaxTime: 300,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenant2",
MinTime: 200,
MaxTime: 250,
},
{
TenantId: "tenant1",
MinTime: 200,
MaxTime: 250,
},
},
}
m := &Metastore{
state: &metastoreState{
segments: map[string]*metastorepb.BlockMeta{
"block1": block1,
"block2": block2,
"block3": block3,
},
},
}

tests := []struct {
name string
request *metastorepb.ListBlocksForQueryRequest
expectedResponse *metastorepb.ListBlocksForQueryResponse
}{
{
name: "Matching tenant and time range",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 0,
EndTime: 100,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{
Blocks: []*metastorepb.BlockMeta{
block1,
block2,
},
},
},
{
name: "Matching tenant but partial time range",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 50,
EndTime: 150,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{
Blocks: []*metastorepb.BlockMeta{
block1,
block2,
},
},
},
{
name: "Non-matching tenant",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant3",
StartTime: 0,
EndTime: 100,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{},
},
{
name: "Matching one tenant but not the other",
request: &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 100,
EndTime: 550,
},
expectedResponse: &metastorepb.ListBlocksForQueryResponse{
Blocks: []*metastorepb.BlockMeta{
block1,
block2,
block3,
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resp, err := m.ListBlocksForQuery(context.Background(), test.request)
require.NoError(t, err)
require.Equal(t, test.expectedResponse, resp)
})
}
}
Loading

0 comments on commit 7f466df

Please sign in to comment.