Skip to content

Commit

Permalink
panic investigation 2
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko committed Jan 23, 2024
1 parent aaa8b3d commit 3d53d43
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,30 @@ func (w *worker) running(ctx context.Context) error {
}

for day, tasks := range tasksByDay {
logger := log.With(w.logger, "day", day)
level.Debug(logger).Log("msg", "tasks per day", "tasks_len", len(tasks))
for _, task := range tasks {
level.Debug(w.logger).Log("msg", "individual task", "task", task.ID, "closed", task.closed)
}

// Remove tasks that are already cancelled
tasks = slices.DeleteFunc(tasks, func(t Task) bool {
return t.Err() != nil
})
level.Debug(logger).Log("msg", "not cancelled tasks per day", "tasks_len", len(tasks))
// no tasks to process, continue with next day
if len(tasks) == 0 {
level.Debug(logger).Log("msg", "no tasks to process, continue with next day")
continue
}

logger := log.With(w.logger, "day", day)
level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks))

storeFetchStart := time.Now()
blockRefs, err := w.shipper.GetBlockRefs(iterationCtx, tasks[0].Tenant, toModelTime(day), toModelTime(day.Add(Day).Add(-1*time.Nanosecond)))
w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockRefs").Observe(time.Since(storeFetchStart).Seconds())
if err != nil {
level.Debug(logger).Log("msg", "error processing tasks. notifying all task's channels and go to the next day", "err", err)
// send error to error channel of each task
for _, t := range tasks {
t.ErrCh <- err
Expand Down

0 comments on commit 3d53d43

Please sign in to comment.