Skip to content

Commit

Permalink
Support fatal errors in job reports
Browse files Browse the repository at this point in the history
  • Loading branch information
mcastorina committed Jul 27, 2023
1 parent e391e89 commit 3603d7d
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 42 deletions.
9 changes: 9 additions & 0 deletions pkg/sources/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sources

import (
"errors"
"fmt"
"sync"
)
Expand Down Expand Up @@ -32,5 +33,13 @@ func (s *ScanErrors) Count() uint64 {
}

func (s *ScanErrors) String() string {
s.mu.RLock()
defer s.mu.RUnlock()
return fmt.Sprintf("%v", s.errors)
}

func (s *ScanErrors) Errors() error {
s.mu.RLock()
defer s.mu.RUnlock()
return errors.Join(s.errors...)
}
86 changes: 71 additions & 15 deletions pkg/sources/job_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,92 @@ package sources

import (
"errors"
"fmt"
"sync"
"time"
)

// Fatal is a wrapper around error to differentiate non-fatal errors from fatal
// ones. A fatal error is typically from a finished context or any error
// returned from a source's Init, Chunks, Enumerate, or ChunkUnit methods.
type Fatal struct{ error }

func (f Fatal) Error() string { return fmt.Sprintf("fatal: %s", f.error.Error()) }
func (f Fatal) Unwrap() error { return f.error }

// JobReport aggregates information about a run of a Source.
type JobReport struct {
SourceID int64
JobID int64
StartTime time.Time
EndTime time.Time
TotalChunks uint64
errors []error
errorsLock sync.Mutex
SourceID int64
JobID int64
StartTime time.Time
EndTime time.Time
TotalChunks uint64
errors ScanErrors
chunkErrors map[string][]error
chunkErrorsLock sync.Mutex
}

// AddError adds a non-nil error to the aggregate of errors encountered during
// scanning.
// AddError adds a non-nil error to the aggregate of errors
// encountered during scanning.
func (jr *JobReport) AddError(err error) {
if err == nil {
return
}
jr.errorsLock.Lock()
defer jr.errorsLock.Unlock()
jr.errors = append(jr.errors, err)
jr.errors.Add(err)
}

// AddChunkError adds a non-nil error to the aggregate of errors encountered
// during chunking.
func (jr *JobReport) AddChunkError(unit SourceUnit, err error) {
if err == nil {
return
}
id := ""
if unit != nil {
id = unit.SourceUnitID()
}
jr.chunkErrorsLock.Lock()
defer jr.chunkErrorsLock.Unlock()
if jr.chunkErrors == nil {
jr.chunkErrors = make(map[string][]error)
}
jr.chunkErrors[id] = append(jr.chunkErrors[id], err)
}

// Errors joins all aggregated errors into one. If there were no errors, nil is
// returned. errors.Is can be used to check for specific errors.
func (jr *JobReport) Errors() error {
jr.errorsLock.Lock()
defer jr.errorsLock.Unlock()
return errors.Join(jr.errors...)
return errors.Join(jr.EnumerationErrors(), jr.ChunkErrors())
}

// EnumerationErrors joins all errors encountered during initialization or
// enumeration.
func (jr *JobReport) EnumerationErrors() error {
return jr.errors.Errors()
}

// ChunkErrors joins all errors encountered during chunking.
func (jr *JobReport) ChunkErrors() error {
jr.chunkErrorsLock.Lock()
defer jr.chunkErrorsLock.Unlock()
// Check if we only have errors without unit information.
if errs, ok := jr.chunkErrors[""]; ok && len(jr.chunkErrors) == 1 {
return errors.Join(errs...)
}

aggregate := make([]error, 0, len(jr.chunkErrors))
for id, errs := range jr.chunkErrors {
err := fmt.Errorf("unit %q\n%w\n", id, errors.Join(errs...))
aggregate = append(aggregate, err)
}
return errors.Join(aggregate...)
}

// FatalError returns the first Fatal error, if any, encountered in the scan.
func (jr *JobReport) FatalError() error {
var err Fatal
if found := errors.As(jr.Errors(), &err); found {
return err
}
return nil
}
34 changes: 34 additions & 0 deletions pkg/sources/job_report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sources

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestJobReportFatalErrors(t *testing.T) {
var jr JobReport

// Add a non-fatal error.
jr.AddError(fmt.Errorf("oh no"))
assert.Error(t, jr.Errors())
assert.NoError(t, jr.FatalError())
assert.NoError(t, jr.ChunkErrors())

// Add a fatal error and make sure we can test comparison.
err := fmt.Errorf("fatal error")
jr.AddError(Fatal{err})
assert.Error(t, jr.Errors())
assert.Error(t, jr.FatalError())
assert.NoError(t, jr.ChunkErrors())
assert.True(t, errors.Is(jr.FatalError(), err))

// Add another fatal error and test we still return the first.
jr.AddError(Fatal{fmt.Errorf("second fatal error")})
assert.Error(t, jr.Errors())
assert.Error(t, jr.FatalError())
assert.NoError(t, jr.ChunkErrors())
assert.True(t, errors.Is(jr.FatalError(), err))
}
47 changes: 20 additions & 27 deletions pkg/sources/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,12 @@ func (s *SourceManager) Wait() error {
// TODO: Maybe switch to using a semaphore.Weighted.
_ = s.pool.Wait()

// Aggregate all errors from all job reports.
// TODO: This should probably only be the fatal errors. We'll also need
// to rewrite this for when the reports start getting culled.
// Aggregate the first fatal errors from all job reports.
s.reportLock.Lock()
defer s.reportLock.Unlock()
errs := make([]error, 0, len(s.report))
for _, report := range s.report {
errs = append(errs, report.Errors())
errs = append(errs, report.FatalError())
}
s.doneErr = errors.Join(errs...)
return s.doneErr
Expand Down Expand Up @@ -251,8 +249,8 @@ func (s *SourceManager) run(ctx context.Context, handle handle) (*JobReport, err
// Initialize the source.
source, err := initFunc(ctx, jobID, int64(handle))
if err != nil {
report.AddError(err)
return report, err
report.AddError(Fatal{err})
return report, Fatal{err}
}
// Check for the preferred method of tracking source units.
if enumChunker, ok := source.(SourceUnitEnumChunker); ok && s.useSourceUnits {
Expand Down Expand Up @@ -283,8 +281,8 @@ func (s *SourceManager) runWithoutUnits(ctx context.Context, handle handle, sour
defer wg.Wait()
defer close(ch)
if err := source.Chunks(ctx, ch); err != nil {
report.AddError(err)
return report, err
report.AddChunkError(nil, Fatal{err})
return report, Fatal{err}
}
return report, nil
}
Expand All @@ -295,13 +293,14 @@ func (s *SourceManager) runWithoutUnits(ctx context.Context, handle handle, sour
func (s *SourceManager) runWithUnits(ctx context.Context, handle handle, source SourceUnitEnumChunker, report *JobReport) (*JobReport, error) {
reporter := &mgrUnitReporter{
unitCh: make(chan SourceUnit),
report: report,
}
// Produce units.
go func() {
// TODO: Catch panics and add to report.
defer close(reporter.unitCh)
if err := source.Enumerate(ctx, reporter); err != nil {
report.AddError(err)
report.AddError(Fatal{err})
}
}()
var wg sync.WaitGroup
Expand All @@ -312,17 +311,18 @@ func (s *SourceManager) runWithUnits(ctx context.Context, handle handle, source
unitPool.SetLimit(s.concurrentUnits)
}
for unit := range reporter.unitCh {
unit := unit
reporter := &mgrChunkReporter{
unitID: unit.SourceUnitID(),
unit: unit,
chunkCh: make(chan *Chunk),
report: report,
}
unit := unit
// Consume units and produce chunks.
unitPool.Go(func() error {
// TODO: Catch panics and add to report.
defer close(reporter.chunkCh)
if err := source.ChunkUnit(ctx, unit, reporter); err != nil {
report.AddError(err)
report.AddError(Fatal{err})
}
return nil
})
Expand All @@ -338,8 +338,7 @@ func (s *SourceManager) runWithUnits(ctx context.Context, handle handle, source
}()
}
wg.Wait()
// TODO: Return fatal errors.
return report, nil
return report, report.FatalError()
}

// getInitFunc is a helper method for safe concurrent access to the
Expand Down Expand Up @@ -368,37 +367,31 @@ func (api *headlessAPI) GetJobID(ctx context.Context, id int64) (int64, error) {

// mgrUnitReporter implements the UnitReporter interface.
type mgrUnitReporter struct {
unitCh chan SourceUnit
unitErrs []error
unitErrsLock sync.Mutex
unitCh chan SourceUnit
report *JobReport
}

func (s *mgrUnitReporter) UnitOk(ctx context.Context, unit SourceUnit) error {
return common.CancellableWrite(ctx, s.unitCh, unit)
}

func (s *mgrUnitReporter) UnitErr(ctx context.Context, err error) error {
s.unitErrsLock.Lock()
defer s.unitErrsLock.Unlock()
s.unitErrs = append(s.unitErrs, err)
s.report.AddError(err)
return nil
}

// mgrChunkReporter implements the ChunkReporter interface.
type mgrChunkReporter struct {
unitID string
chunkCh chan *Chunk
chunkErrs []error
chunkErrsLock sync.Mutex
unit SourceUnit
chunkCh chan *Chunk
report *JobReport
}

func (s *mgrChunkReporter) ChunkOk(ctx context.Context, chunk Chunk) error {
return common.CancellableWrite(ctx, s.chunkCh, &chunk)
}

func (s *mgrChunkReporter) ChunkErr(ctx context.Context, err error) error {
s.chunkErrsLock.Lock()
defer s.chunkErrsLock.Unlock()
s.chunkErrs = append(s.chunkErrs, err)
s.report.AddChunkError(s.unit, err)
return nil
}

0 comments on commit 3603d7d

Please sign in to comment.