Skip to content

Commit

Permalink
Refactor runner - Remove 'DoneWhenAllIdle', Add 'ActiveThreads' counter
Browse files Browse the repository at this point in the history
  • Loading branch information
omerzi authored Oct 2, 2022
2 parents a6f23b4 + 4492401 commit 58efc9a
Showing 1 changed file with 47 additions and 92 deletions.
139 changes: 47 additions & 92 deletions parallel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package parallel

import (
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
)

type Runner interface {
Expand All @@ -15,9 +13,11 @@ type Runner interface {
Done()
Cancel()
Errors() map[int]error
DoneWhenAllIdle(int) error
RunningThreads() int
ActiveThreads() uint32
OpenThreads() int
IsStarted() bool
SetMaxParallel(int)
TotalTasksInQueue() uint32
}

type TaskFunc func(int) error
Expand All @@ -34,8 +34,7 @@ type runner struct {
// Tasks waiting to be executed.
tasks chan *task
// Tasks counter, used to give each task an identifier (task.num).
taskCount uint32

taskId uint32
// A channel that is closed when the runner is cancelled.
cancel chan struct{}
// Used to make sure the cancel channel is closed only once.
Expand All @@ -44,20 +43,20 @@ type runner struct {
maxParallel int
// If true, the runner will be cancelled on the first error thrown from a task.
failFast bool
// Indicates that the runner received some tasks and started executing them.
started bool
// A WaitGroup that waits for all the threads to close.
threadsWaitGroup sync.WaitGroup
// Threads counter, used to give each thread an identifier (threadId).
threadCount uint32
// The number of open threads.
runningThreads int
// A lock on runningThreads.
runningThreadsLock sync.Mutex

// A map of all open threads with a boolean indicating whether they're idle (open threads that do not run any task at the moment).
idle sync.Map
// A map of all open threads with the last time they ended a task.
lastActive sync.Map

openThreads int
// A lock on openThreads.
openThreadsLock sync.Mutex
// The number of threads currently running tasks.
activeThreads uint32
// The number of tasks in the queue.
totalTasksInQueue uint32
// A map of errors keyed by threadId.
errors map[int]error
// A lock on the errors map.
Expand Down Expand Up @@ -107,13 +106,14 @@ func (r *runner) AddTaskWithError(t TaskFunc, errorHandler OnErrorFunc) (int, er
}

func (r *runner) addTask(t TaskFunc, errorHandler OnErrorFunc) (int, error) {
nextCount := atomic.AddUint32(&r.taskCount, 1)
nextCount := atomic.AddUint32(&r.taskId, 1)
task := &task{run: t, num: nextCount - 1, onError: errorHandler}

select {
case <-r.cancel:
return -1, errors.New("runner stopped")
default:
atomic.AddUint32(&r.totalTasksInQueue, 1)
r.tasks <- task
return int(task.num), nil
}
Expand All @@ -129,11 +129,16 @@ func (r *runner) Run() {
r.threadsWaitGroup.Wait()
}

// The producer notifies that no more tasks will be produced.
// Done is used to notify that no more tasks will be produced.
func (r *runner) Done() {
close(r.tasks)
}

// IsStarted is true when a task was executed, false otherwise.
func (r *runner) IsStarted() bool {
return r.started
}

// Cancel stops the Runner from getting new tasks and empties the tasks queue.
// No new tasks will be executed, and tasks that already started will continue running and won't be interrupted.
// If this Runner is already cancelled, then this function will do nothing.
Expand All @@ -148,71 +153,22 @@ func (r *runner) Cancel() {
})
}

// Returns a map of errors keyed by the task number
// Errors Returns a map of errors keyed by the task number
func (r *runner) Errors() map[int]error {
return r.errors
}

// Define the work as done when all consumers are idle for idleThresholdSeconds.
// The function will wait until all consumers are idle.
// Can be run by the producer as a go routine right after starting to produce.
// CAUTION - Might panic if no task is added on the initial idleThresholdSeconds.
func (r *runner) DoneWhenAllIdle(idleThresholdSeconds int) error {
for {
time.Sleep(time.Duration(idleThresholdSeconds) * time.Second)
allIdle := true
var e error
// Iterate over all open threads to check if all of them are idle.
r.idle.Range(func(key, value interface{}) bool {
threadId, ok := key.(int)
if !ok {
e = errors.New("thread ID must be a number")
// This will break the iteration.
return false
}
threadIdle, ok := value.(bool)
if !ok {
e = errors.New("thread idle value must be a boolean")
// This will break the iteration.
return false
}

if !threadIdle {
allIdle = false
return false
}

lastActiveValue, _ := r.lastActive.Load(threadId)
threadLastActive, _ := lastActiveValue.(string)
idleTimestamp, err := strconv.ParseInt(threadLastActive, 10, 64)
if err != nil {
e = errors.New("unexpected idle timestamp on consumer. err: " + err.Error())
return false
}

idleTime := time.Unix(idleTimestamp, 0)
// Check if the time passed since the thread was recently active is shorter than idleThresholdSeconds.
if time.Since(idleTime).Seconds() < float64(idleThresholdSeconds) {
allIdle = false
return false
}
return true
})
if e != nil {
return e
}
func (r *runner) ActiveThreads() uint32 {
return r.activeThreads
}

// All consumers are idle for the required time.
if allIdle {
close(r.tasks)
return nil
}
}
// OpenThreads returns the number of open threads (including idle threads).
func (r *runner) OpenThreads() int {
return r.openThreads
}

// RunningThreads returns the number of open threads (including idle threads).
func (r *runner) RunningThreads() int {
return r.runningThreads
func (r *runner) TotalTasksInQueue() uint32 {
return r.totalTasksInQueue
}

func (r *runner) SetMaxParallel(newVal int) {
Expand All @@ -237,17 +193,21 @@ func (r *runner) addThread() {
nextThreadId := atomic.AddUint32(&r.threadCount, 1) - 1
go func(threadId int) {
defer r.threadsWaitGroup.Done()

r.runningThreadsLock.Lock()
r.runningThreads++
r.runningThreadsLock.Unlock()
r.openThreadsLock.Lock()
r.openThreads++
r.openThreadsLock.Unlock()

// Keep on taking tasks from the queue.
for t := range r.tasks {
r.idle.Store(threadId, false)

// Increase the total of active threads.
atomic.AddUint32(&r.activeThreads, 1)
r.started = true
// Run the task.
e := t.run(threadId)
// Decrease the total of active threads.
atomic.AddUint32(&r.activeThreads, ^uint32(0))
// Decrease the total of in progress tasks.
atomic.AddUint32(&r.totalTasksInQueue, ^uint32(0))
if e != nil {
if t.onError != nil {
t.onError(e)
Expand All @@ -263,20 +223,15 @@ func (r *runner) addThread() {
break
}
}
r.idle.Store(threadId, true)
// Save the current time as the thread's last active time.
r.lastActive.Store(threadId, strconv.FormatInt(time.Now().Unix(), 10))

r.runningThreadsLock.Lock()
// If there are more open threads than maxParallel, then this thread will be closed.
if r.runningThreads > r.maxParallel {
r.runningThreads--
r.runningThreadsLock.Unlock()
r.idle.Delete(threadId)
r.lastActive.Delete(threadId)
r.openThreadsLock.Lock()
// If the total of open threads is larger than the maximum (maxParallel), then this thread should be closed.
if r.openThreads > r.maxParallel {
r.openThreads--
r.openThreadsLock.Unlock()
break
}
r.runningThreadsLock.Unlock()
r.openThreadsLock.Unlock()
}
}(int(nextThreadId))
}

0 comments on commit 58efc9a

Please sign in to comment.