Skip to content

Commit

Permalink
Refactor UnitHook to block the scan if finished metrics aren't handled
Browse files Browse the repository at this point in the history
  • Loading branch information
mcastorina committed Jan 18, 2024
1 parent b0fd951 commit db3106a
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 62 deletions.
3 changes: 2 additions & 1 deletion pkg/sources/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func (jp *JobProgress) TrackProgress(progress *Progress) {
// closure.
func (jp *JobProgress) executeHooks(todo func(hook JobProgressHook)) {
for _, hook := range jp.hooks {
// TODO: Non-blocking?
// Execute hooks synchronously so they can provide
// back-pressure to the source.
todo(hook)
}
}
Expand Down
97 changes: 55 additions & 42 deletions pkg/sources/job_progress_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sources
import (
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -14,19 +13,27 @@ import (
// UnitHook implements JobProgressHook for tracking the progress of each
// individual unit.
type UnitHook struct {
metrics *lru.Cache[string, *UnitMetrics]
mu sync.Mutex
metrics *lru.Cache[string, *UnitMetrics]
mu sync.Mutex
finishedMetrics chan UnitMetrics
NoopHook
}

type UnitHookOpt func(*UnitHook)

func WithUnitHookCache(cache *lru.Cache[string, *UnitMetrics]) UnitHookOpt {
return func(hook *UnitHook) { hook.metrics = cache }
// WithUnitHookFinishBufferSize sets the buffer size for handling finished
// metrics (default is 1024). If the buffer fills, then scanning will stop
// until there is room.
func WithUnitHookFinishBufferSize(buf int) UnitHookOpt {
return func(hook *UnitHook) {
hook.finishedMetrics = make(chan UnitMetrics, buf)
}
}

func NewUnitHook(ctx context.Context, opts ...UnitHookOpt) *UnitHook {
// lru.NewWithEvict can only fail if the size is < 0.
func NewUnitHook(ctx context.Context, opts ...UnitHookOpt) (*UnitHook, <-chan UnitMetrics) {
// lru.NewWithEvict can only fail if the size is < 0. This is the
// maximum number of concurrent in-progress units that this hook can
// handle.
cache, _ := lru.NewWithEvict(1024, func(key string, value *UnitMetrics) {
if value.handled {
return
Expand All @@ -36,11 +43,14 @@ func NewUnitHook(ctx context.Context, opts ...UnitHookOpt) *UnitHook {
"metric", value,
)
})
hook := UnitHook{metrics: cache}
hook := UnitHook{
metrics: cache,
finishedMetrics: make(chan UnitMetrics, 1024),
}
for _, opt := range opts {
opt(&hook)
}
return &hook
return &hook, hook.finishedMetrics
}

// id is a helper method to generate an ID for the given job and unit.
Expand All @@ -66,14 +76,28 @@ func (u *UnitHook) StartUnitChunking(ref JobProgressRef, unit SourceUnit, start

func (u *UnitHook) EndUnitChunking(ref JobProgressRef, unit SourceUnit, end time.Time) {
id := u.id(ref, unit)

metrics, ok := u.finishUnit(id)
if !ok {
return
}
metrics.EndTime = &end
// Intentionally block the hook from returning to supply back-pressure
// to the source.
u.finishedMetrics <- *metrics
}

func (u *UnitHook) finishUnit(id string) (*UnitMetrics, bool) {
u.mu.Lock()
defer u.mu.Unlock()

metrics, ok := u.metrics.Get(id)
if !ok {
return
return nil, false
}
metrics.EndTime = &end
metrics.handled = true
u.metrics.Remove(id)
return metrics, true
}

func (u *UnitHook) ReportChunk(ref JobProgressRef, unit SourceUnit, chunk *Chunk) {
Expand Down Expand Up @@ -122,51 +146,40 @@ func (u *UnitHook) ReportError(ref JobProgressRef, err error) {
}

func (u *UnitHook) Finish(ref JobProgressRef) {
u.mu.Lock()
defer u.mu.Unlock()
// Clear out any metrics on this job. This covers the case for the
// source running without unit support.
prefix := u.id(ref, nil)
for _, id := range u.metrics.Keys() {
if !strings.HasPrefix(id, prefix) {
continue
}
metric, ok := u.metrics.Get(id)
if !ok {
continue
}
// If the unit is nil, the source does not support units.
// Use the overall job metrics instead.
if metric.Unit == nil {
snap := ref.Snapshot()
metric.StartTime = snap.StartTime
metric.EndTime = snap.EndTime
metric.Errors = snap.Errors
}
id := u.id(ref, nil)
metrics, ok := u.finishUnit(id)
if !ok {
return
}
snap := ref.Snapshot()
metrics.StartTime = snap.StartTime
metrics.EndTime = snap.EndTime
metrics.Errors = snap.Errors
// Intentionally block the hook from returning to supply back-pressure
// to the source.
u.finishedMetrics <- *metrics
}

// UnitMetrics gets all the currently active or newly finished metrics for this
// job. If a unit returned from this method has finished, it will be removed
// from the cache and no longer returned in successive calls to UnitMetrics().
func (u *UnitHook) UnitMetrics() []UnitMetrics {
// InProgressSnapshot gets all the currently active metrics across all jobs.
func (u *UnitHook) InProgressSnapshot() []UnitMetrics {
u.mu.Lock()
defer u.mu.Unlock()
output := make([]UnitMetrics, 0, u.metrics.Len())
for _, id := range u.metrics.Keys() {
metric, ok := u.metrics.Get(id)
if !ok {
continue
}
output = append(output, *metric)
if metric.IsFinished() {
metric.handled = true
u.metrics.Remove(id)
if metrics, ok := u.metrics.Get(id); ok {
output = append(output, *metrics)
}
}
return output
}

func (u *UnitHook) Close() error {
close(u.finishedMetrics)
return nil
}

type UnitMetrics struct {
Unit SourceUnit `json:"unit,omitempty"`
Parent JobProgressRef `json:"parent,omitempty"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/sources/source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sources

import (
"fmt"
"io"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -169,6 +170,11 @@ func (s *SourceManager) Wait() error {
}
close(s.outputChunks)
close(s.firstErr)
for _, hook := range s.hooks {
if hookCloser, ok := hook.(io.Closer); ok {
_ = hookCloser.Close()
}
}
return s.waitErr
}

Expand Down
49 changes: 30 additions & 19 deletions pkg/sources/source_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sort"
"testing"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/anypb"

Expand Down Expand Up @@ -316,7 +316,7 @@ func TestSourceManagerAvailableCapacity(t *testing.T) {
}

func TestSourceManagerUnitHook(t *testing.T) {
hook := NewUnitHook(context.TODO())
hook, ch := NewUnitHook(context.TODO())

input := []unitChunk{
{unit: "1 one", output: "bar"},
Expand All @@ -333,9 +333,13 @@ func TestSourceManagerUnitHook(t *testing.T) {
ref, err := mgr.Run(context.Background(), "dummy", source)
assert.NoError(t, err)
<-ref.Done()
assert.NoError(t, mgr.Wait())

metrics := hook.UnitMetrics()
assert.Equal(t, 3, len(metrics))
assert.Equal(t, 0, len(hook.InProgressSnapshot()))
var metrics []UnitMetrics
for metric := range ch {
metrics = append(metrics, metric)
}
sort.Slice(metrics, func(i, j int) bool {
return metrics[i].Unit.SourceUnitID() < metrics[j].Unit.SourceUnitID()
})
Expand Down Expand Up @@ -366,14 +370,10 @@ func TestSourceManagerUnitHook(t *testing.T) {
assert.Equal(t, 1, len(m2.Errors))
}

// TestSourceManagerUnitHookNoBlock tests that the UnitHook drops metrics if
// they aren't handled fast enough.
func TestSourceManagerUnitHookNoBlock(t *testing.T) {
var evictedKeys []string
cache, _ := lru.NewWithEvict(1, func(key string, _ *UnitMetrics) {
evictedKeys = append(evictedKeys, key)
})
hook := NewUnitHook(context.TODO(), WithUnitHookCache(cache))
// TestSourceManagerUnitHookBackPressure tests that the UnitHook blocks if the
// finished metrics aren't handled fast enough.
func TestSourceManagerUnitHookBackPressure(t *testing.T) {
hook, ch := NewUnitHook(context.TODO(), WithUnitHookFinishBufferSize(0))

input := []unitChunk{
{unit: "one", output: "bar"},
Expand All @@ -389,18 +389,25 @@ func TestSourceManagerUnitHookNoBlock(t *testing.T) {
assert.NoError(t, err)
ref, err := mgr.Run(context.Background(), "dummy", source)
assert.NoError(t, err)
<-ref.Done()

assert.Equal(t, 2, len(evictedKeys))
metrics := hook.UnitMetrics()
assert.Equal(t, 1, len(metrics))
assert.Equal(t, "three", metrics[0].Unit.SourceUnitID())
var metrics []UnitMetrics
for i := 0; i < len(input); i++ {
select {
case <-ref.Done():
t.Fatal("job should not finish until metrics have been collected")
case <-time.After(1 * time.Millisecond):
}
metrics = append(metrics, <-ch)
}

assert.NoError(t, mgr.Wait())
assert.Equal(t, 3, len(metrics), metrics)
}

// TestSourceManagerUnitHookNoUnits tests whether the UnitHook works for
// sources that don't support units.
func TestSourceManagerUnitHookNoUnits(t *testing.T) {
hook := NewUnitHook(context.TODO())
hook, ch := NewUnitHook(context.TODO())

mgr := NewManager(
WithBufferedOutput(8),
Expand All @@ -412,8 +419,12 @@ func TestSourceManagerUnitHookNoUnits(t *testing.T) {
ref, err := mgr.Run(context.Background(), "dummy", source)
assert.NoError(t, err)
<-ref.Done()
assert.NoError(t, mgr.Wait())

metrics := hook.UnitMetrics()
var metrics []UnitMetrics
for metric := range ch {
metrics = append(metrics, metric)
}
assert.Equal(t, 1, len(metrics))

m := metrics[0]
Expand Down

0 comments on commit db3106a

Please sign in to comment.