Skip to content

Commit

Permalink
NBS-4789: [Disk Manager] refactor sre tools (#177)
Browse files Browse the repository at this point in the history
* refactor sre tools

* resolve issues

* fix build

* fix issue
  • Loading branch information
BarkovBG authored Jan 19, 2024
1 parent 093ecde commit a2fe687
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 33 deletions.
22 changes: 16 additions & 6 deletions cloud/disk_manager/pkg/admin/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func newListReadyToRunCmd(
serverConfig,
"ready_to_run",
func(ctx context.Context, storage tasks_storage.Storage, limit uint64) ([]string, error) {
taskInfos, err := storage.ListTasksReadyToRun(
tasks, err := storage.ListTasksReadyToRun(
ctx,
limit,
nil, // taskTypeWhitelist
Expand All @@ -234,7 +234,7 @@ func newListReadyToRunCmd(
return []string{}, err
}

return getTaskIDs(taskInfos), nil
return getTaskIDs(tasks), nil
},
)
}
Expand All @@ -249,7 +249,7 @@ func newListReadyToCancelCmd(
serverConfig,
"ready_to_cancel",
func(ctx context.Context, storage tasks_storage.Storage, limit uint64) ([]string, error) {
taskInfos, err := storage.ListTasksReadyToCancel(
tasks, err := storage.ListTasksReadyToCancel(
ctx,
limit,
nil, // taskTypeWhitelist
Expand All @@ -258,7 +258,7 @@ func newListReadyToCancelCmd(
return []string{}, err
}

return getTaskIDs(taskInfos), nil
return getTaskIDs(tasks), nil
},
)
}
Expand All @@ -273,7 +273,12 @@ func newListRunningCmd(
serverConfig,
"running",
func(ctx context.Context, storage tasks_storage.Storage, limit uint64) ([]string, error) {
return storage.ListTasksRunning(ctx, limit)
tasks, err := storage.ListTasksRunning(ctx, limit)
if err != nil {
return []string{}, err
}

return getTaskIDs(tasks), nil
},
)
}
Expand All @@ -288,7 +293,12 @@ func newListCancellingCmd(
serverConfig,
"cancelling",
func(ctx context.Context, storage tasks_storage.Storage, limit uint64) ([]string, error) {
return storage.ListTasksCancelling(ctx, limit)
tasks, err := storage.ListTasksCancelling(ctx, limit)
if err != nil {
return []string{}, err
}

return getTaskIDs(tasks), nil
},
)
}
Expand Down
8 changes: 4 additions & 4 deletions cloud/tasks/storage/compound_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ func (s *compoundStorage) ListTasksStallingWhileCancelling(
func (s *compoundStorage) ListTasksRunning(
ctx context.Context,
limit uint64,
) ([]string, error) {
) ([]TaskInfo, error) {

tasks := []string{}
tasks := []TaskInfo{}
err := s.visit(ctx, func(storage Storage) error {
values, err := storage.ListTasksRunning(ctx, limit)
tasks = append(tasks, values...)
Expand All @@ -240,9 +240,9 @@ func (s *compoundStorage) ListTasksRunning(
func (s *compoundStorage) ListTasksCancelling(
ctx context.Context,
limit uint64,
) ([]string, error) {
) ([]TaskInfo, error) {

tasks := []string{}
tasks := []TaskInfo{}
err := s.visit(ctx, func(storage Storage) error {
values, err := storage.ListTasksCancelling(ctx, limit)
tasks = append(tasks, values...)
Expand Down
8 changes: 4 additions & 4 deletions cloud/tasks/storage/mocks/storage_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,19 @@ func (s *StorageMock) ListTasksStallingWhileCancelling(
func (s *StorageMock) ListTasksRunning(
ctx context.Context,
limit uint64,
) ([]string, error) {
) ([]tasks_storage.TaskInfo, error) {

args := s.Called(ctx, limit)
res, _ := args.Get(0).([]string)
res, _ := args.Get(0).([]tasks_storage.TaskInfo)
return res, args.Error(1)
}
func (s *StorageMock) ListTasksCancelling(
ctx context.Context,
limit uint64,
) ([]string, error) {
) ([]tasks_storage.TaskInfo, error) {

args := s.Called(ctx, limit)
res, _ := args.Get(0).([]string)
res, _ := args.Get(0).([]tasks_storage.TaskInfo)
return res, args.Error(1)
}

Expand Down
4 changes: 2 additions & 2 deletions cloud/tasks/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ type Storage interface {
) ([]TaskInfo, error)

// Used for SRE tools.
ListTasksRunning(ctx context.Context, limit uint64) ([]string, error)
ListTasksCancelling(ctx context.Context, limit uint64) ([]string, error)
ListTasksRunning(ctx context.Context, limit uint64) ([]TaskInfo, error)
ListTasksCancelling(ctx context.Context, limit uint64) ([]TaskInfo, error)
ListFailedTasks(ctx context.Context, since time.Time) ([]string, error)
ListSlowTasks(ctx context.Context, since time.Time, estimateMiss time.Duration) ([]string, error)

Expand Down
12 changes: 6 additions & 6 deletions cloud/tasks/storage/storage_ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ func (s *storageYDB) ListTasksStallingWhileCancelling(
func (s *storageYDB) ListTasksRunning(
ctx context.Context,
limit uint64,
) ([]string, error) {
) ([]TaskInfo, error) {

var tasks []string
var tasks []TaskInfo

err := s.db.Execute(
ctx,
func(ctx context.Context, session *persistence.Session) error {
var err error
tasks, err = s.listTaskIDs(
tasks, err = s.listTasks(
ctx,
session,
"running",
Expand All @@ -232,15 +232,15 @@ func (s *storageYDB) ListTasksRunning(
func (s *storageYDB) ListTasksCancelling(
ctx context.Context,
limit uint64,
) ([]string, error) {
) ([]TaskInfo, error) {

var tasks []string
var tasks []TaskInfo

err := s.db.Execute(
ctx,
func(ctx context.Context, session *persistence.Session) error {
var err error
tasks, err = s.listTaskIDs(
tasks, err = s.listTasks(
ctx,
session,
"cancelling",
Expand Down
36 changes: 26 additions & 10 deletions cloud/tasks/storage/storage_ydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,14 +1114,22 @@ func TestStorageYDBListTasksRunning(t *testing.T) {
})
require.NoError(t, err)

expectedTasks := []string{
taskIDRunning,
taskIDStallingWhileRunning,
expectedTasks := []TaskInfo{
TaskInfo{
ID: taskIDRunning,
GenerationID: generationID,
TaskType: "task1",
},
TaskInfo{
ID: taskIDStallingWhileRunning,
GenerationID: generationID,
TaskType: "task1",
},
}

taskIDs, err := storage.ListTasksRunning(ctx, 100500)
tasks, err := storage.ListTasksRunning(ctx, 100500)
require.NoError(t, err)
require.ElementsMatch(t, expectedTasks, taskIDs)
require.ElementsMatch(t, expectedTasks, tasks)
}

func TestStorageYDBListTasksCancelling(t *testing.T) {
Expand Down Expand Up @@ -1284,14 +1292,22 @@ func TestStorageYDBListTasksCancelling(t *testing.T) {
})
require.NoError(t, err)

expectedTasks := []string{
taskIDCancelling,
taskIDStallingWhileCancelling,
expectedTasks := []TaskInfo{
TaskInfo{
ID: taskIDCancelling,
GenerationID: generationID,
TaskType: "task1",
},
TaskInfo{
ID: taskIDStallingWhileCancelling,
GenerationID: generationID,
TaskType: "task1",
},
}

taskIDs, err := storage.ListTasksCancelling(ctx, 100500)
tasks, err := storage.ListTasksCancelling(ctx, 100500)
require.NoError(t, err)
require.ElementsMatch(t, expectedTasks, taskIDs)
require.ElementsMatch(t, expectedTasks, tasks)
}

func TestStorageYDBListFailedTasks(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cloud/tasks/tasks_tests/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func TestTasksRunningLimit(t *testing.T) {

runningLongTaskCount := 0
for _, task := range runningTasks {
taskState, err := s.storage.GetTask(ctx, task)
taskState, err := s.storage.GetTask(ctx, task.ID)
require.NoError(t, err)

if taskState.TaskType == "long" {
Expand Down

0 comments on commit a2fe687

Please sign in to comment.