Skip to content

Commit

Permalink
[Disk Manager] internal/pkg/tasks/errors should not depend on interna…
Browse files Browse the repository at this point in the history
…l/pkg/persistence
  • Loading branch information
SvartMetal authored Jan 8, 2024
1 parent 0c20bd1 commit 27b93ea
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 53 deletions.
36 changes: 0 additions & 36 deletions cloud/disk_manager/internal/pkg/tasks/errors/errors.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package errors

import (
"context"
"fmt"
"runtime/debug"

"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/errors"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/logging"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/persistence"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/tasks/common/protos"
grpc_codes "google.golang.org/grpc/codes"
grpc_status "google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -297,38 +293,6 @@ func (e *DetailedError) GRPCStatus() *grpc_status.Status {

////////////////////////////////////////////////////////////////////////////////

func isCancelledError(err error) bool {
switch {
case
errors.Is(err, context.Canceled),
persistence.IsTransportError(err, grpc_codes.Canceled):
return true
default:
return false
}
}

func LogError(
ctx context.Context,
err error,
format string,
args ...interface{},
) {

description := fmt.Sprintf(format, args...)

if Is(err, NewWrongGenerationError()) ||
Is(err, NewInterruptExecutionError()) ||
isCancelledError(err) {

logging.Debug(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err)
} else {
logging.Warn(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err)
}
}

////////////////////////////////////////////////////////////////////////////////

func New(text string) error {
return errors.New(text)
}
Expand Down
37 changes: 35 additions & 2 deletions cloud/disk_manager/internal/pkg/tasks/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/persistence"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/tasks/errors"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/tasks/storage"
grpc_codes "google.golang.org/grpc/codes"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -262,7 +263,7 @@ func (c *executionContext) setError(ctx context.Context, e error) error {
return taskState
})
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to commit non retriable error for %v with task id %v",
Expand All @@ -273,7 +274,7 @@ func (c *executionContext) setError(ctx context.Context, e error) error {
}

if !errors.IsSilent(e) {
errors.LogError(
logError(
ctx,
e,
"commited fatal error for %v with task id %v",
Expand Down Expand Up @@ -339,3 +340,35 @@ func newExecutionContext(
taskState: taskState,
}
}

////////////////////////////////////////////////////////////////////////////////

func isCancelledError(err error) bool {
switch {
case
errors.Is(err, context.Canceled),
persistence.IsTransportError(err, grpc_codes.Canceled):
return true
default:
return false
}
}

func logError(
ctx context.Context,
err error,
format string,
args ...interface{},
) {

description := fmt.Sprintf(format, args...)

if errors.Is(err, errors.NewWrongGenerationError()) ||
errors.Is(err, errors.NewInterruptExecutionError()) ||
isCancelledError(err) {

logging.Debug(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err)
} else {
logging.Warn(logging.AddCallerSkip(ctx, 1), "%v: %v", description, err)
}
}
28 changes: 14 additions & 14 deletions cloud/disk_manager/internal/pkg/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *runnerForRun) executeTask(
// If there was no error, task has completed successfully.
err = execCtx.finish(ctx)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to commit finishing for %v with task id %v",
Expand All @@ -147,7 +147,7 @@ func (r *runnerForRun) executeTask(
return
}

errors.LogError(
logError(
ctx,
err,
"got error for %v with task id %v",
Expand All @@ -158,7 +158,7 @@ func (r *runnerForRun) executeTask(

if errors.IsPanicError(err) {
if execCtx.taskState.PanicCount >= r.maxPanicCount {
errors.LogError(
logError(
ctx,
err,
"panic count exceeded for %v with task id %v",
Expand All @@ -178,7 +178,7 @@ func (r *runnerForRun) executeTask(

err = execCtx.incrementPanicCount(ctx)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to increment panic count for %v with task id %v",
Expand All @@ -198,7 +198,7 @@ func (r *runnerForRun) executeTask(
if errors.Is(err, errors.NewEmptyNonCancellableError()) {
err = execCtx.setNonCancellableError(ctx, err)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to commit non cancellable error for %v with task id %v",
Expand All @@ -222,7 +222,7 @@ func (r *runnerForRun) executeTask(
// Restart task from the beginning.
err = execCtx.clearState(ctx)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to clear state for %v with task id %v",
Expand All @@ -239,7 +239,7 @@ func (r *runnerForRun) executeTask(
if !retriableError.IgnoreRetryLimit &&
execCtx.getRetriableErrorCount() >= r.maxRetriableErrorCount {

errors.LogError(
logError(
ctx,
err,
"retriable error count exceeded for %v with task id %v",
Expand All @@ -259,7 +259,7 @@ func (r *runnerForRun) executeTask(

err = execCtx.incrementRetriableErrorCount(ctx)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to increment retriable error count for %v with task id %v",
Expand Down Expand Up @@ -374,7 +374,7 @@ func (r *runnerForCancel) executeTask(
}

if err != nil {
errors.LogError(
logError(
ctx,
err,
"got error for %v with task id %v",
Expand All @@ -397,7 +397,7 @@ func (r *runnerForCancel) executeTask(
// completed.
err = execCtx.setCancelled(ctx)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to commit cancellation for %v with task id %v",
Expand Down Expand Up @@ -443,7 +443,7 @@ func taskPinger(
err := execCtx.ping(ctx)
// Pinger being cancelled does not constitute an error.
if err != nil && ctx.Err() == nil {
errors.LogError(
logError(
ctx,
err,
"failed to ping %v",
Expand Down Expand Up @@ -479,7 +479,7 @@ func lockAndExecuteTask(

taskState, err := runner.lockTask(ctx, taskInfo)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to lock task %v",
Expand All @@ -491,7 +491,7 @@ func lockAndExecuteTask(

task, err := registry.NewTask(taskState.TaskType)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to construct task %v",
Expand All @@ -505,7 +505,7 @@ func lockAndExecuteTask(

err = task.Load(taskState.Request, taskState.State)
if err != nil {
errors.LogError(
logError(
ctx,
err,
"failed to load task %v",
Expand Down
2 changes: 1 addition & 1 deletion cloud/disk_manager/internal/pkg/tasks/scheduler_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (s *scheduler) WaitTask(

err := execCtx.AddTaskDependency(ctx, taskID)
if err != nil {
errors.LogError(ctx, err, "failed to add task dependency %v", taskID)
logError(ctx, err, "failed to add task dependency %v", taskID)
return nil, err
}

Expand Down

0 comments on commit 27b93ea

Please sign in to comment.