diff --git a/cloud/disk_manager/internal/pkg/tasks/errors/errors.go b/cloud/disk_manager/internal/pkg/tasks/errors/errors.go index 4218921d2ad..525ada03343 100644 --- a/cloud/disk_manager/internal/pkg/tasks/errors/errors.go +++ b/cloud/disk_manager/internal/pkg/tasks/errors/errors.go @@ -1,15 +1,11 @@ package errors import ( - "context" "fmt" "runtime/debug" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/errors" - "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/logging" - "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/persistence" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/tasks/common/protos" - grpc_codes "google.golang.org/grpc/codes" grpc_status "google.golang.org/grpc/status" ) @@ -297,38 +293,6 @@ func (e *DetailedError) GRPCStatus() *grpc_status.Status { //////////////////////////////////////////////////////////////////////////////// -func isCancelledError(err error) bool { - switch { - case - errors.Is(err, context.Canceled), - persistence.IsTransportError(err, grpc_codes.Canceled): - return true - default: - return false - } -} - -func LogError( - ctx context.Context, - err error, - format string, - args ...interface{}, -) { - - description := fmt.Sprintf(format, args...) - - if Is(err, NewWrongGenerationError()) || - Is(err, NewInterruptExecutionError()) || - isCancelledError(err) { - - logging.Debug(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err) - } else { - logging.Warn(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err) - } -} - -//////////////////////////////////////////////////////////////////////////////// - func New(text string) error { return errors.New(text) } diff --git a/cloud/disk_manager/internal/pkg/tasks/execution_context.go b/cloud/disk_manager/internal/pkg/tasks/execution_context.go index dfaff37ff63..6674f03c8f2 100644 --- a/cloud/disk_manager/internal/pkg/tasks/execution_context.go +++ b/cloud/disk_manager/internal/pkg/tasks/execution_context.go @@ -10,6 +10,7 @@ import ( "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/persistence" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/tasks/errors" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/tasks/storage" + grpc_codes "google.golang.org/grpc/codes" ) //////////////////////////////////////////////////////////////////////////////// @@ -262,7 +263,7 @@ func (c *executionContext) setError(ctx context.Context, e error) error { return taskState }) if err != nil { - errors.LogError( + logError( ctx, err, "failed to commit non retriable error for %v with task id %v", @@ -273,7 +274,7 @@ func (c *executionContext) setError(ctx context.Context, e error) error { } if !errors.IsSilent(e) { - errors.LogError( + logError( ctx, e, "commited fatal error for %v with task id %v", @@ -339,3 +340,35 @@ func newExecutionContext( taskState: taskState, } } + +//////////////////////////////////////////////////////////////////////////////// + +func isCancelledError(err error) bool { + switch { + case + errors.Is(err, context.Canceled), + persistence.IsTransportError(err, grpc_codes.Canceled): + return true + default: + return false + } +} + +func logError( + ctx context.Context, + err error, + format string, + args ...interface{}, +) { + + description := fmt.Sprintf(format, args...) + + if errors.Is(err, errors.NewWrongGenerationError()) || + errors.Is(err, errors.NewInterruptExecutionError()) || + isCancelledError(err) { + + logging.Debug(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err) + } else { + logging.Warn(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err) + } +} diff --git a/cloud/disk_manager/internal/pkg/tasks/runner.go b/cloud/disk_manager/internal/pkg/tasks/runner.go index d5ffcd54d95..7c67e63e558 100644 --- a/cloud/disk_manager/internal/pkg/tasks/runner.go +++ b/cloud/disk_manager/internal/pkg/tasks/runner.go @@ -135,7 +135,7 @@ func (r *runnerForRun) executeTask( // If there was no error, task has completed successfully. err = execCtx.finish(ctx) if err != nil { - errors.LogError( + logError( ctx, err, "failed to commit finishing for %v with task id %v", @@ -147,7 +147,7 @@ func (r *runnerForRun) executeTask( return } - errors.LogError( + logError( ctx, err, "got error for %v with task id %v", @@ -158,7 +158,7 @@ func (r *runnerForRun) executeTask( if errors.IsPanicError(err) { if execCtx.taskState.PanicCount >= r.maxPanicCount { - errors.LogError( + logError( ctx, err, "panic count exceeded for %v with task id %v", @@ -178,7 +178,7 @@ func (r *runnerForRun) executeTask( err = execCtx.incrementPanicCount(ctx) if err != nil { - errors.LogError( + logError( ctx, err, "failed to increment panic count for %v with task id %v", @@ -198,7 +198,7 @@ func (r *runnerForRun) executeTask( if errors.Is(err, errors.NewEmptyNonCancellableError()) { err = execCtx.setNonCancellableError(ctx, err) if err != nil { - errors.LogError( + logError( ctx, err, "failed to commit non cancellable error for %v with task id %v", @@ -222,7 +222,7 @@ func (r *runnerForRun) executeTask( // Restart task from the beginning. err = execCtx.clearState(ctx) if err != nil { - errors.LogError( + logError( ctx, err, "failed to clear state for %v with task id %v", @@ -239,7 +239,7 @@ func (r *runnerForRun) executeTask( if !retriableError.IgnoreRetryLimit && execCtx.getRetriableErrorCount() >= r.maxRetriableErrorCount { - errors.LogError( + logError( ctx, err, "retriable error count exceeded for %v with task id %v", @@ -259,7 +259,7 @@ func (r *runnerForRun) executeTask( err = execCtx.incrementRetriableErrorCount(ctx) if err != nil { - errors.LogError( + logError( ctx, err, "failed to increment retriable error count for %v with task id %v", @@ -374,7 +374,7 @@ func (r *runnerForCancel) executeTask( } if err != nil { - errors.LogError( + logError( ctx, err, "got error for %v with task id %v", @@ -397,7 +397,7 @@ func (r *runnerForCancel) executeTask( // completed. err = execCtx.setCancelled(ctx) if err != nil { - errors.LogError( + logError( ctx, err, "failed to commit cancellation for %v with task id %v", @@ -443,7 +443,7 @@ func taskPinger( err := execCtx.ping(ctx) // Pinger being cancelled does not constitute an error. if err != nil && ctx.Err() == nil { - errors.LogError( + logError( ctx, err, "failed to ping %v", @@ -479,7 +479,7 @@ func lockAndExecuteTask( taskState, err := runner.lockTask(ctx, taskInfo) if err != nil { - errors.LogError( + logError( ctx, err, "failed to lock task %v", @@ -491,7 +491,7 @@ func lockAndExecuteTask( task, err := registry.NewTask(taskState.TaskType) if err != nil { - errors.LogError( + logError( ctx, err, "failed to construct task %v", @@ -505,7 +505,7 @@ func lockAndExecuteTask( err = task.Load(taskState.Request, taskState.State) if err != nil { - errors.LogError( + logError( ctx, err, "failed to load task %v", diff --git a/cloud/disk_manager/internal/pkg/tasks/scheduler_impl.go b/cloud/disk_manager/internal/pkg/tasks/scheduler_impl.go index d705057d1b3..fdfa21b0529 100644 --- a/cloud/disk_manager/internal/pkg/tasks/scheduler_impl.go +++ b/cloud/disk_manager/internal/pkg/tasks/scheduler_impl.go @@ -237,7 +237,7 @@ func (s *scheduler) WaitTask( err := execCtx.AddTaskDependency(ctx, taskID) if err != nil { - errors.LogError(ctx, err, "failed to add task dependency %v", taskID) + logError(ctx, err, "failed to add task dependency %v", taskID) return nil, err }