diff --git a/pkg/sources/job_progress.go b/pkg/sources/job_progress.go index 5fffd79aba5d..252bef1c5680 100644 --- a/pkg/sources/job_progress.go +++ b/pkg/sources/job_progress.go @@ -43,9 +43,9 @@ type JobProgressHook interface { // If the job supports it, the reference can also be used to cancel running via // CancelRun. type JobProgressRef struct { - JobID JobID - SourceID SourceID - SourceName string + JobID JobID `json:"job_id"` + SourceID SourceID `json:"source_id"` + SourceName string `json:"source_name"` jobProgress *JobProgress } @@ -120,28 +120,28 @@ type JobProgress struct { // JobProgressMetrics tracks the metrics of a job. type JobProgressMetrics struct { - StartTime time.Time - EndTime time.Time + StartTime *time.Time `json:"start_time,omitempty"` + EndTime *time.Time `json:"end_time,omitempty"` // Total number of units found by the Source. - TotalUnits uint64 + TotalUnits uint64 `json:"total_units,omitempty"` // Total number of units that have finished chunking. - FinishedUnits uint64 + FinishedUnits uint64 `json:"finished_units,omitempty"` // Total number of chunks produced. This metric updates before the // chunk is sent on the output channel. - TotalChunks uint64 + TotalChunks uint64 `json:"total_chunks,omitempty"` // All errors encountered. - Errors []error + Errors []error `json:"errors,omitempty"` // Set to true if the source supports enumeration and has finished // enumerating. If the source does not support enumeration, this field // is always false. - DoneEnumerating bool + DoneEnumerating bool `json:"done_enumerating,omitempty"` // Progress information reported by the source. - SourcePercent int64 - SourceMessage string - SourceEncodedResumeInfo string - SourceSectionsCompleted int32 - SourceSectionsRemaining int32 + SourcePercent int64 `json:"source_percent,omitempty"` + SourceMessage string `json:"source_message,omitempty"` + SourceEncodedResumeInfo string `json:"source_encoded_resume_info,omitempty"` + SourceSectionsCompleted int32 `json:"source_sections_completed,omitempty"` + SourceSectionsRemaining int32 `json:"source_sections_remaining,omitempty"` } // WithHooks adds hooks to be called when an event triggers. @@ -189,14 +189,14 @@ func (jp *JobProgress) executeHooks(todo func(hook JobProgressHook)) { // without the JobProgressRef parameter. func (jp *JobProgress) Start(start time.Time) { jp.metricsLock.Lock() - jp.metrics.StartTime = start + jp.metrics.StartTime = &start jp.metricsLock.Unlock() jp.executeHooks(func(hook JobProgressHook) { hook.Start(jp.Ref(), start) }) } func (jp *JobProgress) End(end time.Time) { jp.metricsLock.Lock() - jp.metrics.EndTime = end + jp.metrics.EndTime = &end jp.metricsLock.Unlock() jp.executeHooks(func(hook JobProgressHook) { hook.End(jp.Ref(), end) }) @@ -248,6 +248,17 @@ func (jp *JobProgress) Snapshot() JobProgressMetrics { defer jp.metricsLock.Unlock() metrics := jp.metrics + + // Make a copy of the fields to make them read only. + if jp.metrics.StartTime != nil { + startTime := *jp.metrics.StartTime + metrics.StartTime = &startTime + } + if jp.metrics.EndTime != nil { + endTime := *jp.metrics.EndTime + metrics.EndTime = &endTime + } + metrics.Errors = make([]error, len(metrics.Errors)) copy(metrics.Errors, jp.metrics.Errors) @@ -343,13 +354,13 @@ func (m JobProgressMetrics) PercentComplete() int { // has been running. If it hasn't started yet, 0 is returned. If it has // finished, the total time is returned. func (m JobProgressMetrics) ElapsedTime() time.Duration { - if m.StartTime.IsZero() { + if m.StartTime == nil { return 0 } - if m.EndTime.IsZero() { - return time.Since(m.StartTime) + if m.EndTime == nil { + return time.Since(*m.StartTime) } - return m.EndTime.Sub(m.StartTime) + return m.EndTime.Sub(*m.StartTime) } // ErrorsFor returns all the errors for the given SourceUnit. If there are no diff --git a/pkg/sources/job_progress_hook.go b/pkg/sources/job_progress_hook.go index 5a592da3a0b3..1592bcb385b0 100644 --- a/pkg/sources/job_progress_hook.go +++ b/pkg/sources/job_progress_hook.go @@ -60,7 +60,7 @@ func (u *UnitHook) StartUnitChunking(ref JobProgressRef, unit SourceUnit, start u.metrics.Add(id, &UnitMetrics{ Unit: unit, Parent: ref, - StartTime: start, + StartTime: &start, }) } @@ -73,7 +73,7 @@ func (u *UnitHook) EndUnitChunking(ref JobProgressRef, unit SourceUnit, end time if !ok { return } - metrics.EndTime = end + metrics.EndTime = &end } func (u *UnitHook) ReportChunk(ref JobProgressRef, unit SourceUnit, chunk *Chunk) { @@ -168,17 +168,17 @@ func (u *UnitHook) UnitMetrics() []UnitMetrics { } type UnitMetrics struct { - Unit SourceUnit - Parent JobProgressRef + Unit SourceUnit `json:"unit,omitempty"` + Parent JobProgressRef `json:"parent,omitempty"` // Start and end time for chunking this unit. - StartTime time.Time - EndTime time.Time + StartTime *time.Time `json:"start_time,omitempty"` + EndTime *time.Time `json:"end_time,omitempty"` // Total number of chunks produced from this unit. - TotalChunks uint64 + TotalChunks uint64 `json:"total_chunks,omitempty"` // Total number of bytes produced from this unit. - TotalBytes uint64 + TotalBytes uint64 `json:"total_bytes,omitempty"` // All errors encountered by this unit. - Errors []error + Errors []error `json:"errors,omitempty"` // Flag to mark that these metrics were intentionally evicted from // the cache. handled bool @@ -192,13 +192,13 @@ func (u UnitMetrics) IsFinished() bool { // has been running. If it hasn't started yet, 0 is returned. If it has // finished, the total time is returned. func (u UnitMetrics) ElapsedTime() time.Duration { - if u.StartTime.IsZero() { + if u.StartTime == nil { return 0 } - if u.EndTime.IsZero() { - return time.Since(u.StartTime) + if u.EndTime == nil { + return time.Since(*u.StartTime) } - return u.EndTime.Sub(u.StartTime) + return u.EndTime.Sub(*u.StartTime) } // NoopHook implements JobProgressHook by doing nothing. This is useful for diff --git a/pkg/sources/job_progress_test.go b/pkg/sources/job_progress_test.go index 47724edf3bbc..05d4c6e6ae86 100644 --- a/pkg/sources/job_progress_test.go +++ b/pkg/sources/job_progress_test.go @@ -120,10 +120,12 @@ func TestJobProgressElapsedTime(t *testing.T) { metrics := JobProgressMetrics{} assert.Equal(t, time.Duration(0), metrics.ElapsedTime()) - metrics.StartTime = time.Date(2022, time.March, 30, 0, 0, 0, 0, time.UTC) + startTime := time.Date(2022, time.March, 30, 0, 0, 0, 0, time.UTC) + metrics.StartTime = &startTime assert.Greater(t, metrics.ElapsedTime(), time.Duration(0)) - metrics.EndTime = metrics.StartTime.Add(1 * time.Hour) + endTime := metrics.StartTime.Add(1 * time.Hour) + metrics.EndTime = &endTime assert.Equal(t, metrics.ElapsedTime(), 1*time.Hour) }