Skip to content

Commit

Permalink
Add AvailableCapacity method to SourceManager (#1665)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcastorina authored Aug 29, 2023
1 parent 2b1b1b5 commit 7ba880f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
22 changes: 20 additions & 2 deletions pkg/sources/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ type SourceManager struct {
handles map[handle]sourceInfo
handlesLock sync.Mutex
// Pool limiting the amount of concurrent sources running.
pool errgroup.Group
pool errgroup.Group
poolLimit int
currentRunningCount int32
// Max number of units to scan concurrently per source.
concurrentUnits int
// Run the sources using source unit enumeration / chunking if available.
useSourceUnits bool
Expand Down Expand Up @@ -68,7 +71,10 @@ func WithReportHook(hook JobProgressHook) func(*SourceManager) {

// WithConcurrentSources limits the concurrent number of sources a manager can run.
func WithConcurrentSources(concurrency int) func(*SourceManager) {
return func(mgr *SourceManager) { mgr.pool.SetLimit(concurrency) }
return func(mgr *SourceManager) {
mgr.pool.SetLimit(concurrency)
mgr.poolLimit = concurrency
}
}

// WithBufferedOutput sets the size of the buffer used for the Chunks() channel.
Expand Down Expand Up @@ -167,6 +173,8 @@ func (s *SourceManager) asyncRun(ctx context.Context, handle handle) (JobProgres
ctx, cancel := context.WithCancel(ctx)
progress := NewJobProgress(jobID, int64(handle), sourceName, WithHooks(s.hooks...), WithCancel(cancel))
s.pool.Go(func() error {
atomic.AddInt32(&s.currentRunningCount, 1)
defer atomic.AddInt32(&s.currentRunningCount, -1)
ctx := context.WithValues(ctx,
"job_id", jobID,
"source_manager_worker_id", common.RandomID(5),
Expand Down Expand Up @@ -207,6 +215,16 @@ func (s *SourceManager) ScanChunk(chunk *Chunk) {
s.outputChunks <- chunk
}

// AvailableCapacity returns the number of concurrent jobs the manager can
// accommodate at this time. If there is no limit, -1 is returned.
func (s *SourceManager) AvailableCapacity() int {
if s.poolLimit == 0 {
return -1
}
runCount := atomic.LoadInt32(&s.currentRunningCount)
return s.poolLimit - int(runCount)
}

// preflightChecks is a helper method to check the Manager or the context isn't
// done and that the handle is valid.
func (s *SourceManager) preflightChecks(ctx context.Context, handle handle) error {
Expand Down
21 changes: 21 additions & 0 deletions pkg/sources/source_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,24 @@ func TestSourceManagerCancelRun(t *testing.T) {
assert.Error(t, ref.Snapshot().FatalError())
assert.True(t, errors.Is(ref.Snapshot().FatalError(), returnedErr))
}

func TestSourceManagerAvailableCapacity(t *testing.T) {
mgr := NewManager(WithConcurrentSources(1337))
start, end := make(chan struct{}), make(chan struct{})
handle, err := enrollDummy(mgr, callbackChunker{func(context.Context, chan *Chunk) error {
start <- struct{}{} // Send start signal.
<-end // Wait for end signal.
return nil
}})
assert.NoError(t, err)

assert.Equal(t, 1337, mgr.AvailableCapacity())
ref, err := mgr.ScheduleRun(context.Background(), handle)
assert.NoError(t, err)

<-start // Wait for start signal.
assert.Equal(t, 1336, mgr.AvailableCapacity())
end <- struct{}{} // Send end signal.
<-ref.Done() // Wait for the job to finish.
assert.Equal(t, 1337, mgr.AvailableCapacity())
}

0 comments on commit 7ba880f

Please sign in to comment.