From 3d53d43c99bffc2e04a99734469c22c13eb8fa18 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 23 Jan 2024 17:53:41 +0200 Subject: [PATCH] panic investigation 2 Signed-off-by: Vladyslav Diachenko --- pkg/bloomgateway/worker.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 03b6c26085f8..55d0ca943945 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -174,23 +174,30 @@ func (w *worker) running(ctx context.Context) error { } for day, tasks := range tasksByDay { + logger := log.With(w.logger, "day", day) + level.Debug(logger).Log("msg", "tasks per day", "tasks_len", len(tasks)) + for _, task := range tasks { + level.Debug(w.logger).Log("msg", "individual task", "task", task.ID, "closed", task.closed) + } // Remove tasks that are already cancelled tasks = slices.DeleteFunc(tasks, func(t Task) bool { return t.Err() != nil }) + level.Debug(logger).Log("msg", "not cancelled tasks per day", "tasks_len", len(tasks)) // no tasks to process, continue with next day if len(tasks) == 0 { + level.Debug(logger).Log("msg", "no tasks to process, continue with next day") continue } - logger := log.With(w.logger, "day", day) level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks)) storeFetchStart := time.Now() blockRefs, err := w.shipper.GetBlockRefs(iterationCtx, tasks[0].Tenant, toModelTime(day), toModelTime(day.Add(Day).Add(-1*time.Nanosecond))) w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockRefs").Observe(time.Since(storeFetchStart).Seconds()) if err != nil { + level.Debug(logger).Log("msg", "error processing tasks. notifying all task's channels and go to the next day", "err", err) // send error to error channel of each task for _, t := range tasks { t.ErrCh <- err