Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dont kill busy workers #14

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions internal/auto_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {
return fmt.Errorf("could not get worker pool: %w", err)
}

// remove drained workers from the previous autoscaler run that were busy
err = s.extractAndKillDrainedWorkers(ctx, logger, workerPool)
if err != nil {
return fmt.Errorf("could not extract and kill already drained workers: %w", err)
}

asg, err := s.controller.GetAutoscalingGroup(ctx)
if err != nil {
return fmt.Errorf("could not get autoscaling group: %w", err)
Expand Down Expand Up @@ -111,6 +117,24 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {

idleWorkers := state.IdleWorkers()

err = s.drainWorkers(ctx, decision, idleWorkers, logger)
if err != nil {
return fmt.Errorf("could not drain workers: %w", err)
}

// fetch again workers and kill only drained non-busy workers
workerPool, err = s.controller.GetWorkerPool(ctx)
if err != nil {
return fmt.Errorf("could not get worker pool: %w", err)
}
err = s.extractAndKillDrainedWorkers(ctx, logger, workerPool)
if err != nil {
return fmt.Errorf("could not extract and kill drained workers: %w", err)
}
return nil
}

func (s AutoScaler) drainWorkers(ctx context.Context, decision Decision, idleWorkers []Worker, logger *slog.Logger) error {
for i := 0; i < decision.ScalingSize; i++ {
worker := idleWorkers[i]

Expand All @@ -122,20 +146,41 @@ func (s AutoScaler) Scale(ctx context.Context, cfg RuntimeConfig) error {
)
logger.Info("scaling down ASG and killing worker")

drained, err := s.controller.DrainWorker(ctx, worker.ID)
_, err := s.controller.DrainWorker(ctx, worker.ID)
if err != nil {
return fmt.Errorf("could not drain worker: %w", err)
}
}
return nil
}

if !drained {
logger.Warn("worker was busy, stopping the scaling down process")
return nil
func (s AutoScaler) extractAndKillDrainedWorkers(ctx context.Context, logger *slog.Logger, workerPool *WorkerPool) error {
drainedWorkers := workerPool.ExtractDrainedWorkers()
err := s.killWorkers(ctx, logger, drainedWorkers)
if err != nil {
return err
}
return nil
}

func (s AutoScaler) killWorkers(ctx context.Context, logger *slog.Logger, workers []Worker) error {
for _, worker := range workers {
if worker.Busy {
continue
}
_, instanceID, err := worker.InstanceIdentity()
if err != nil {
logger.Error("could not determine instance ID", "instance_id", instanceID)
continue
}
logger.With(
"worker_id", worker.ID,
"instance_id", instanceID,
).Info("killing drained worker")

if err := s.controller.KillInstance(ctx, string(instanceID)); err != nil {
return fmt.Errorf("could not kill instance: %w", err)
return fmt.Errorf("could not kill drained instance: %w", err)
}
}

return nil
}
56 changes: 55 additions & 1 deletion internal/auto_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,47 @@ func TestAutoScalerScalingNone(t *testing.T) {
require.NoError(t, err)
}

func TestAutoScalerScalingNoneWithDrainedWorkers(t *testing.T) {
var buf bytes.Buffer
h := slog.NewTextHandler(&buf, nil)

cfg := internal.RuntimeConfig{}

ctrl := new(MockController)
defer ctrl.AssertExpectations(t)

scaler := internal.NewAutoScaler(ctrl, slog.New(h))

ctrl.On("GetWorkerPool", mock.Anything).Return(&internal.WorkerPool{
Workers: []internal.Worker{
{
ID: "1",
Metadata: `{"asg_id": "group", "instance_id": "instance"}`,
},
{
ID: "2",
Metadata: `{"asg_id": "group", "instance_id": "instance2"}`,
Drained: true,
},
{
ID: "3",
Metadata: `{"asg_id": "group", "instance_id": "instance3"}`,
Drained: true,
Busy: true,
},
},
}, nil)
ctrl.On("KillInstance", mock.Anything, "instance2").Return(nil)
ctrl.On("GetAutoscalingGroup", mock.Anything).Return(&types.AutoScalingGroup{
AutoScalingGroupName: ptr("group"),
MinSize: ptr(int32(1)),
MaxSize: ptr(int32(3)),
DesiredCapacity: ptr(int32(2)),
}, nil)
err := scaler.Scale(context.Background(), cfg)
require.NoError(t, err)
}

func TestAutoScalerScalingUp(t *testing.T) {
var buf bytes.Buffer
h := slog.NewTextHandler(&buf, nil)
Expand Down Expand Up @@ -100,7 +141,20 @@ func TestAutoScalerScalingDown(t *testing.T) {
Metadata: `{"asg_id": "group", "instance_id": "instance2"}`,
},
},
}, nil)
}, nil).Once()
ctrl.On("GetWorkerPool", mock.Anything).Return(&internal.WorkerPool{
Workers: []internal.Worker{
{
ID: "1",
Metadata: `{"asg_id": "group", "instance_id": "instance"}`,
Drained: true,
},
{
ID: "2",
Metadata: `{"asg_id": "group", "instance_id": "instance2"}`,
},
},
}, nil).Once()
ctrl.On("GetAutoscalingGroup", mock.Anything).Return(&types.AutoScalingGroup{
AutoScalingGroupName: ptr("group"),
MinSize: ptr(int32(1)),
Expand Down
14 changes: 14 additions & 0 deletions internal/worker_pool_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ type WorkerPool struct {
Workers []Worker `graphql:"workers" json:"workers"`
}

func (wp *WorkerPool) ExtractDrainedWorkers() []Worker {
drainedWorkers := make([]Worker, 0)
nonDrainedWorkers := make([]Worker, 0)
for _, worker := range wp.Workers {
if worker.Drained {
drainedWorkers = append(drainedWorkers, worker)
continue
}
nonDrainedWorkers = append(nonDrainedWorkers, worker)
}
wp.Workers = nonDrainedWorkers
return drainedWorkers
}

type WorkerPoolDetails struct {
Pool *WorkerPool `graphql:"workerPool(id: $workerPool)"`
}
Loading