Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into sidecar-branch
  • Loading branch information
Sheikh-Abubaker committed Jan 20, 2024
2 parents 20c874b + 7c60390 commit b899cc7
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [11195](https://github.com/grafana/loki/pull/11195) **canuteson** Generate tsdb_shipper storage_config even if using_boltdb_shipper is false
* [9831](https://github.com/grafana/loki/pull/9831) **sijmenhuizenga**: Fix Promtail excludepath not evaluated on newly added files.
* [11551](https://github.com/grafana/loki/pull/11551) **dannykopping** Do not reflect label names in request metrics' "route" label.
* [11563](https://github.com/grafana/loki/pull/11563) **ptqa** Fix duplicate logs from docker containers.
* [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules.
* [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations.
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/targets/docker/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (t *Target) process(r io.Reader, logStream string) {
}
t.metrics.dockerEntries.Inc()
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix())
t.since = ts.Unix()
}
}

Expand Down
34 changes: 32 additions & 2 deletions clients/pkg/promtail/targets/docker/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ func Test_DockerTarget(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
switch path := r.URL.Path; {
case strings.HasSuffix(path, "/logs"):
dat, err := os.ReadFile("testdata/flog.log")
var filePath string
if strings.Contains(r.URL.RawQuery, "since=0") {
filePath = "testdata/flog.log"
} else {
filePath = "testdata/flog_after_restart.log"
}
dat, err := os.ReadFile(filePath)
require.NoError(t, err)
_, err = w.Write(dat)
require.NoError(t, err)
Expand Down Expand Up @@ -59,7 +65,7 @@ func Test_DockerTarget(t *testing.T) {
})
require.NoError(t, err)

_, err = NewTarget(
target, err := NewTarget(
NewMetrics(prometheus.NewRegistry()),
logger,
entryHandler,
Expand Down Expand Up @@ -92,4 +98,28 @@ func Test_DockerTarget(t *testing.T) {
actualLines = append(actualLines, entry.Line)
}
require.ElementsMatch(t, actualLines, expectedLines)

// restart target to simulate container restart
target.startIfNotRunning()
entryHandler.Clear()
require.Eventually(t, func() bool {
return len(entryHandler.Received()) >= 5
}, 5*time.Second, 100*time.Millisecond)

receivedAfterRestart := entryHandler.Received()
sort.Slice(receivedAfterRestart, func(i, j int) bool {
return receivedAfterRestart[i].Timestamp.Before(receivedAfterRestart[j].Timestamp)
})
actualLinesAfterRestart := make([]string, 0, 5)
for _, entry := range receivedAfterRestart[:5] {
actualLinesAfterRestart = append(actualLinesAfterRestart, entry.Line)
}
expectedLinesAfterRestart := []string{
"243.115.12.215 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /morph/exploit/granular HTTP/1.0\" 500 26468",
"221.41.123.237 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /user-centric/whiteboard HTTP/2.0\" 205 22487",
"89.111.144.144 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /open-source/e-commerce HTTP/1.0\" 401 11092",
"62.180.191.187 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /cultivate/integrate/technologies HTTP/2.0\" 302 12979",
"156.249.2.192 - - [09/Dec/2023:09:16:57 +0000] \"POST /revolutionize/mesh/metrics HTTP/2.0\" 401 5297",
}
require.ElementsMatch(t, actualLinesAfterRestart, expectedLinesAfterRestart)
}
Binary file not shown.
19 changes: 6 additions & 13 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ of line filter expressions.
|
bloomgateway.Worker
|
bloomshipper.Store
|
bloomshipper.Shipper
|
bloomshipper.BloomFileClient
Expand Down Expand Up @@ -171,9 +169,9 @@ type Gateway struct {
workerMetrics *workerMetrics
queueMetrics *queue.Metrics

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomShipper bloomshipper.Interface

sharding ShardingStrategy

Expand Down Expand Up @@ -222,13 +220,8 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
return nil, err
}

bloomStore, err := bloomshipper.NewBloomStore(bloomShipper)
if err != nil {
return nil, err
}

// We need to keep a reference to be able to call Stop() on shutdown of the gateway.
g.bloomStore = bloomStore
g.bloomShipper = bloomShipper

if err := g.initServices(); err != nil {
return nil, err
Expand All @@ -243,7 +236,7 @@ func (g *Gateway) initServices() error {
svcs := []services.Service{g.queue, g.activeUsers}
for i := 0; i < g.cfg.WorkerConcurrency; i++ {
id := fmt.Sprintf("bloom-query-worker-%d", i)
w := newWorker(id, g.workerConfig, g.queue, g.bloomStore, g.pendingTasks, g.logger, g.workerMetrics)
w := newWorker(id, g.workerConfig, g.queue, g.bloomShipper, g.pendingTasks, g.logger, g.workerMetrics)
svcs = append(svcs, w)
}
g.serviceMngr, err = services.NewManager(svcs...)
Expand Down Expand Up @@ -295,7 +288,7 @@ func (g *Gateway) running(ctx context.Context) error {
}

func (g *Gateway) stopping(_ error) error {
g.bloomStore.Stop()
g.bloomShipper.Stop()
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

Expand Down
24 changes: 7 additions & 17 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

// replace store implementation and re-initialize workers and sub-services
bqs, data := createBlockQueriers(t, 5, now.Add(-8*time.Hour), now, 0, 1024)
gw.bloomStore = newMockBloomStore(bqs)
gw.bloomShipper = newMockBloomStore(bqs)
err = gw.initServices()
require.NoError(t, err)

Expand Down Expand Up @@ -331,7 +331,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), tenantID)
res, err := gw.FilterChunkRefs(ctx, req)
require.NoError(t, err)

expectedResponse := &logproto.FilterChunkRefResponse{
ChunkRefs: inputChunkRefs[:1],
}
Expand Down Expand Up @@ -373,15 +372,10 @@ type mockBloomStore struct {
bqs []bloomshipper.BlockQuerierWithFingerprintRange
}

var _ bloomshipper.Store = &mockBloomStore{}

// GetBlockQueriersForBlockRefs implements bloomshipper.Store.
func (s *mockBloomStore) GetBlockQueriersForBlockRefs(_ context.Context, _ string, _ []bloomshipper.BlockRef) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) {
return s.bqs, nil
}
var _ bloomshipper.Interface = &mockBloomStore{}

// GetBlockRefs implements bloomshipper.Store.
func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _, _ time.Time) ([]bloomshipper.BlockRef, error) {
// GetBlockRefs implements bloomshipper.Interface
func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _, _ model.Time) ([]bloomshipper.BlockRef, error) {
blocks := make([]bloomshipper.BlockRef, 0, len(s.bqs))
for i := range s.bqs {
blocks = append(blocks, bloomshipper.BlockRef{
Expand All @@ -395,15 +389,11 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _, _ tim
return blocks, nil
}

// GetBlockQueriers implements bloomshipper.Store.
func (s *mockBloomStore) GetBlockQueriers(_ context.Context, _ string, _, _ time.Time, _ []uint64) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) {
return s.bqs, nil
}

// Stop implements bloomshipper.Interface
func (s *mockBloomStore) Stop() {}

// ForEach implements bloomshipper.Store.
func (s *mockBloomStore) ForEach(_ context.Context, _ string, _ []bloomshipper.BlockRef, callback bloomshipper.ForEachBlockCallback) error {
// Fetch implements bloomshipper.Interface
func (s *mockBloomStore) Fetch(_ context.Context, _ string, _ []bloomshipper.BlockRef, callback bloomshipper.ForEachBlockCallback) error {
shuffled := make([]bloomshipper.BlockQuerierWithFingerprintRange, len(s.bqs))
_ = copy(shuffled, s.bqs)

Expand Down
14 changes: 9 additions & 5 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ type worker struct {
id string
cfg workerConfig
queue *queue.RequestQueue
store bloomshipper.Store
shipper bloomshipper.Interface
tasks *pendingTasks
logger log.Logger
metrics *workerMetrics
}

func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, store bloomshipper.Store, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker {
func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, shipper bloomshipper.Interface, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker {
w := &worker{
id: id,
cfg: cfg,
queue: queue,
store: store,
shipper: shipper,
tasks: tasks,
logger: log.With(logger, "worker", id),
metrics: metrics,
Expand Down Expand Up @@ -162,7 +162,7 @@ func (w *worker) running(ctx context.Context) error {
level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks))

storeFetchStart := time.Now()
blockRefs, err := w.store.GetBlockRefs(taskCtx, tasks[0].Tenant, day, day.Add(Day).Add(-1*time.Nanosecond))
blockRefs, err := w.shipper.GetBlockRefs(taskCtx, tasks[0].Tenant, toModelTime(day), toModelTime(day.Add(Day).Add(-1*time.Nanosecond)))
w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockRefs").Observe(time.Since(storeFetchStart).Seconds())
if err != nil {
for _, t := range tasks {
Expand Down Expand Up @@ -218,7 +218,7 @@ func (w *worker) stopping(err error) error {
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day time.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
return w.processBlock(bq, day, b.tasks)
Expand Down Expand Up @@ -250,3 +250,7 @@ func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, task
w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration)
return nil
}

func toModelTime(t time.Time) model.Time {
return model.TimeFromUnixNano(t.UnixNano())
}
16 changes: 8 additions & 8 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
func Test_blockDownloader_downloadBlock(t *testing.T) {
tests := map[string]struct {
cacheEnabled bool
expectedTotalGetBlocksCalls int
expectedTotalGetBlocksCalls int32
}{
"cache disabled": {
cacheEnabled: false,
Expand Down Expand Up @@ -129,7 +129,7 @@ func Test_blockDownloader_downloadBlock(t *testing.T) {
case <-done:
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")
require.Equal(t, 20, blockClient.getBlockCalls)
require.Equal(t, int32(20), blockClient.getBlockCalls.Load())

blocksCh, errorsCh = downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks = make(map[string]any, len(blockReferences))
Expand All @@ -150,15 +150,15 @@ func Test_blockDownloader_downloadBlock(t *testing.T) {
case <-done:
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")
require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls)
require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls.Load())
})
}
}

func Test_blockDownloader_downloadBlock_deduplication(t *testing.T) {
tests := map[string]struct {
cacheEnabled bool
expectedTotalGetBlocksCalls int
expectedTotalGetBlocksCalls int32
}{
"requests to blockClient must be deduplicated by blockPath if cache is enabled": {
cacheEnabled: true,
Expand Down Expand Up @@ -195,7 +195,7 @@ func Test_blockDownloader_downloadBlock_deduplication(t *testing.T) {
t.Cleanup(downloader.stop)
require.NoError(t, err)

blocksDownloadedCount := atomic.Uint32{}
var blocksDownloadedCount atomic.Uint32
mutex := sync.Mutex{}
multiError := util.MultiError{}
waitGroup := sync.WaitGroup{}
Expand Down Expand Up @@ -225,7 +225,7 @@ func Test_blockDownloader_downloadBlock_deduplication(t *testing.T) {

require.NoError(t, multiError.Err())
require.Equal(t, uint32(10), blocksDownloadedCount.Load())
require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls)
require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls.Load())
})
}
}
Expand Down Expand Up @@ -340,11 +340,11 @@ type blockSupplier func() LazyBlock
type mockBlockClient struct {
responseDelay time.Duration
mockData map[string]blockSupplier
getBlockCalls int
getBlockCalls atomic.Int32
}

func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (LazyBlock, error) {
m.getBlockCalls++
m.getBlockCalls.Inc()
time.Sleep(m.responseDelay)
supplier, exists := m.mockData[reference.BlockPath]
if exists {
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)

Expand All @@ -24,6 +25,19 @@ func (r fpRange) maxFp() uint64 {
return r[1]
}

type BlockQuerierWithFingerprintRange struct {
*v1.BlockQuerier
MinFp, MaxFp model.Fingerprint
}

type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error

type Interface interface {
GetBlockRefs(ctx context.Context, tenant string, from, through model.Time) ([]BlockRef, error)
Fetch(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error
Stop()
}

type Shipper struct {
client Client
config config.Config
Expand Down
61 changes: 0 additions & 61 deletions pkg/storage/stores/shipper/bloomshipper/store.go

This file was deleted.

11 changes: 0 additions & 11 deletions pkg/storage/stores/shipper/bloomshipper/store_test.go

This file was deleted.

0 comments on commit b899cc7

Please sign in to comment.