Skip to content

Commit

Permalink
Rename man variables to mgr
Browse files Browse the repository at this point in the history
  • Loading branch information
mcastorina committed Jul 26, 2023
1 parent 9b34de0 commit 41f0dfe
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
12 changes: 6 additions & 6 deletions pkg/sources/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ type apiClient interface {

// WithAPI adds an API client to the manager for tracking jobs and progress.
func WithAPI(api apiClient) func(*SourceManager) {
return func(man *SourceManager) { man.api = api }
return func(mgr *SourceManager) { mgr.api = api }
}

// WithConcurrency limits the concurrent number of sources a manager can run.
func WithConcurrency(concurrency int) func(*SourceManager) {
return func(man *SourceManager) { man.pool.SetLimit(concurrency) }
return func(mgr *SourceManager) { mgr.pool.SetLimit(concurrency) }
}

// WithBufferedOutput sets the size of the buffer used for the Chunks() channel.
func WithBufferedOutput(size int) func(*SourceManager) {
return func(man *SourceManager) { man.outputChunks = make(chan *Chunk, size) }
return func(mgr *SourceManager) { mgr.outputChunks = make(chan *Chunk, size) }
}

// NewManager creates a new manager with the provided options.
func NewManager(opts ...func(*SourceManager)) *SourceManager {
man := SourceManager{
mgr := SourceManager{
// Default to the headless API. Can be overwritten by the WithAPI option.
api: &headlessAPI{},
handles: make(map[handle]SourceInitFunc),
outputChunks: make(chan *Chunk),
}
for _, opt := range opts {
opt(&man)
opt(&mgr)
}
return &man
return &mgr
}

// Enroll informs the SourceManager to track and manage a Source.
Expand Down
28 changes: 14 additions & 14 deletions pkg/sources/source_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (c *counterChunker) Chunks(_ context.Context, ch chan *Chunk) error {
}

// enrollDummy is a helper function to enroll a DummySource with a SourceManager.
func enrollDummy(man *SourceManager, chunkMethod chunker) (handle, error) {
return man.Enroll(context.Background(), "dummy", 1337,
func enrollDummy(mgr *SourceManager, chunkMethod chunker) (handle, error) {
return mgr.Enroll(context.Background(), "dummy", 1337,
func(ctx context.Context, jobID, sourceID int64) (Source, error) {
source := &DummySource{chunker: chunkMethod}
if err := source.Init(ctx, "dummy", jobID, sourceID, true, nil, 42); err != nil {
Expand All @@ -69,16 +69,16 @@ func tryRead(ch <-chan *Chunk) (*Chunk, error) {
}

func TestSourceManagerRun(t *testing.T) {
man := NewManager(WithBufferedOutput(8))
handle, err := enrollDummy(man, &counterChunker{count: 1})
mgr := NewManager(WithBufferedOutput(8))
handle, err := enrollDummy(mgr, &counterChunker{count: 1})
if err != nil {
t.Fatalf("unexpected error enrolling source: %v", err)
}
for i := 0; i < 3; i++ {
if err := man.Run(context.Background(), handle); err != nil {
if err := mgr.Run(context.Background(), handle); err != nil {
t.Fatalf("unexpected error running source: %v", err)
}
chunk, err := tryRead(man.Chunks())
chunk, err := tryRead(mgr.Chunks())
if err != nil {
t.Fatalf("reading chunk failed: %v", err)
}
Expand All @@ -87,33 +87,33 @@ func TestSourceManagerRun(t *testing.T) {
}

// The Chunks channel should be empty now.
if chunk, err := tryRead(man.Chunks()); err == nil {
if chunk, err := tryRead(mgr.Chunks()); err == nil {
t.Fatalf("unexpected chunk found: %+v", chunk)
}
}
}

func TestSourceManagerWait(t *testing.T) {
man := NewManager()
handle, err := enrollDummy(man, &counterChunker{count: 1})
mgr := NewManager()
handle, err := enrollDummy(mgr, &counterChunker{count: 1})
if err != nil {
t.Fatalf("unexpected error enrolling source: %v", err)
}
// Asynchronously run the source.
if err := man.ScheduleRun(context.Background(), handle); err != nil {
if err := mgr.ScheduleRun(context.Background(), handle); err != nil {
t.Fatalf("unexpected error scheduling run: %v", err)
}
// Read the 1 chunk we're expecting so Waiting completes.
<-man.Chunks()
<-mgr.Chunks()
// Wait for all resources to complete.
if err := man.Wait(); err != nil {
if err := mgr.Wait(); err != nil {
t.Fatalf("unexpected error waiting: %v", err)
}
// Enroll and run should return an error now.
if _, err := enrollDummy(man, &counterChunker{count: 1}); err == nil {
if _, err := enrollDummy(mgr, &counterChunker{count: 1}); err == nil {
t.Fatalf("expected enroll to fail")
}
if err := man.ScheduleRun(context.Background(), handle); err == nil {
if err := mgr.ScheduleRun(context.Background(), handle); err == nil {
t.Fatalf("expected scheduling run to fail")
}
}

0 comments on commit 41f0dfe

Please sign in to comment.