Skip to content

Commit

Permalink
fix: kill busy workers
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-hontarau committed Aug 8, 2024
1 parent 1413709 commit c7c2948
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
47 changes: 40 additions & 7 deletions cmd/internal/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func Handle(ctx context.Context, logger *slog.Logger) error {
return fmt.Errorf("could not get worker pool: %w", err)
}

err = extractAndKillDrainedWorkers(ctx, logger, workerPool, controller)
if err != nil {
return fmt.Errorf("could not extract and kill already drained workers: %w", err)
}

asg, err := controller.GetAutoscalingGroup(ctx)
if err != nil {
return fmt.Errorf("could not get autoscaling group: %w", err)
Expand Down Expand Up @@ -82,7 +87,6 @@ func Handle(ctx context.Context, logger *slog.Logger) error {
}

decision := state.Decide(cfg.AutoscalingMaxCreate, cfg.AutoscalingMaxKill)

if decision.ScalingDirection == internal.ScalingDirectionNone {
logger.Info("no scaling decision to be made")
return nil
Expand Down Expand Up @@ -114,20 +118,49 @@ func Handle(ctx context.Context, logger *slog.Logger) error {
)
logger.Info("scaling down ASG and killing worker")

drained, err := controller.DrainWorker(ctx, worker.ID)
_, err := controller.DrainWorker(ctx, worker.ID)
if err != nil {
return fmt.Errorf("could not drain worker: %w", err)
}
}
workerPool, err = controller.GetWorkerPool(ctx)
if err != nil {
return fmt.Errorf("could not get worker pool: %w", err)
}
err = extractAndKillDrainedWorkers(ctx, logger, workerPool, controller)
if err != nil {
return fmt.Errorf("could not extract and kill drained workers: %w", err)
}
return nil
}

func extractAndKillDrainedWorkers(ctx context.Context, logger *slog.Logger, workerPool *internal.WorkerPool, controller *internal.Controller) error {
drainedWorkers := workerPool.ExtractDrainedWorkers()
err := killWorkers(ctx, logger, drainedWorkers, controller)
if err != nil {
return err
}
return nil
}

if !drained {
logger.Warn("worker was busy, stopping the scaling down process")
return nil
func killWorkers(ctx context.Context, logger *slog.Logger, workers []internal.Worker, controller *internal.Controller) error {
for _, worker := range workers {
if worker.Busy {
continue
}

_, instanceID, err := worker.InstanceIdentity()
if err != nil {
logger.Error("could not determine instance ID", "instance_id", instanceID)
continue
}
logger.With(
"worker_id", worker.ID,
"instance_id", instanceID,
).Info("killing worker already idled worker")
if err := controller.KillInstance(ctx, string(instanceID)); err != nil {
return fmt.Errorf("could not kill instance: %w", err)
return fmt.Errorf("could not kill already drained instance: %w", err)
}
}

return nil
}
14 changes: 14 additions & 0 deletions internal/worker_pool_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ type WorkerPool struct {
Workers []Worker `graphql:"workers" json:"workers"`
}

func (wp *WorkerPool) ExtractDrainedWorkers() []Worker {
drainedWorkers := make([]Worker, 0)
nonDrainedWorkers := make([]Worker, 0)
for _, worker := range wp.Workers {
if worker.Drained {
drainedWorkers = append(drainedWorkers, worker)
continue
}
nonDrainedWorkers = append(nonDrainedWorkers, worker)
}
wp.Workers = nonDrainedWorkers
return drainedWorkers
}

type WorkerPoolDetails struct {
Pool *WorkerPool `graphql:"workerPool(id: $workerPool)"`
}

0 comments on commit c7c2948

Please sign in to comment.