Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] - Introduce Fatal/Non-Fatal File Handling Errors #3521

Open
wants to merge 4 commits into
base: refactor-data-or-err
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/handlers/ar.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan Da
} else {
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open ar archive")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()

start := time.Now()
arReader, err := deb.LoadAr(input)
if err != nil {
ctx.Logger().Error(err, "Error loading AR file")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: loading AR error: %v", ErrProcessingFatal, err),
}
return
}

Expand Down Expand Up @@ -85,12 +89,19 @@ func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, dataO

rdr, err := newMimeTypeReader(arEntry.Data)
if err != nil {
return fmt.Errorf("error creating mime-type reader: %w", err)
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error creating AR mime-type reader: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}

if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
fileCtx.Logger().Error(err, "error handling archive content in AR")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error handling archive content in AR: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}

h.metrics.incFilesProcessed()
Expand Down
4 changes: 3 additions & 1 deletion pkg/handlers/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) ch
} else {
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open archive")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func (h *defaultHandler) handleNonArchiveContent(
dataOrErr := DataOrErr{}
if err := data.Error(); err != nil {
h.metrics.incErrors()
dataOrErr.Err = fmt.Errorf("%w: error reading chunk", err)
dataOrErr.Err = fmt.Errorf("%w: error reading chunk: %v", ErrProcessingWarning, err)
if writeErr := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); writeErr != nil {
return fmt.Errorf("%w: error writing to data channel", writeErr)
return fmt.Errorf("%w: error writing to data channel: %v", ErrProcessingFatal, writeErr)
}
continue
}
Expand Down
37 changes: 34 additions & 3 deletions pkg/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"bufio"
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -38,7 +39,16 @@ type fileReader struct {
*iobuf.BufferedReadSeeker
}

var ErrEmptyReader = errors.New("reader is empty")
var (
ErrEmptyReader = errors.New("reader is empty")

// ErrProcessingFatal indicates a severe error that requires stopping the file processing.
ErrProcessingFatal = errors.New("fatal error processing file")

// ErrProcessingWarning indicates a recoverable error that can be logged,
// allowing processing to continue.
ErrProcessingWarning = errors.New("error processing file")
)

// mimeTypeReader wraps an io.Reader with MIME type information.
// This type is used to pass content through the processing pipeline
Expand Down Expand Up @@ -351,7 +361,7 @@ func HandleFile(
// handleChunksWithError processes data and errors received from the dataErrChan channel.
// For each DataOrErr received:
// - If it contains data, the function creates a chunk based on chunkSkel and reports it through the reporter.
// - If it contains an error, the function logs the error.
// - If it contains an error, the function returns the error immediately.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for fatal errors, right? This comment seems like a good place to explain non-fatal errors as well. I would also like in some comment somewhere a description of what types of errors are fatal versus non-fatal - maybe in the doc comments of functions that use DataOrErr?

// The function also listens for context cancellation to gracefully terminate processing if the context is done.
// It returns nil upon successful processing of all data, or the first encountered error.
func handleChunksWithError(
Expand All @@ -369,7 +379,10 @@ func handleChunksWithError(
return nil
}
if dataOrErr.Err != nil {
ctx.Logger().Error(dataOrErr.Err, "error processing chunk")
if isFatal(dataOrErr.Err) {
return dataOrErr.Err
}
ctx.Logger().Error(dataOrErr.Err, "non-critical error processing chunk")
continue
}
if len(dataOrErr.Data) > 0 {
Expand All @@ -384,3 +397,21 @@ func handleChunksWithError(
}
}
}

// isFatal determines whether the given error is a fatal error that should
// terminate processing the current file, or a non-critical error that can be logged and ignored.
// "Fatal" errors include context cancellation, deadline exceeded, and the
// ErrProcessingFatal error. Non-fatal errors include the ErrProcessingWarning
// error as well as any other error that is not one of the fatal errors.
func isFatal(err error) bool {
switch {
case errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, ErrProcessingFatal):
return true
case errors.Is(err, ErrProcessingWarning):
return false
default:
return false
}
}
77 changes: 77 additions & 0 deletions pkg/handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"archive/zip"
"bytes"
stdctx "context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -781,3 +782,79 @@ func getGitCommitHash(t *testing.T, gitDir string) string {
commitHash := strings.TrimSpace(string(hashBytes))
return commitHash
}

type mockReporter struct{ reportedChunks int }

func (m *mockReporter) ChunkOk(context.Context, sources.Chunk) error {
m.reportedChunks++
return nil
}

func (m *mockReporter) ChunkErr(context.Context, error) error { return nil }

func TestHandleChunksWithError(t *testing.T) {
tests := []struct {
name string
input []DataOrErr
expectedErr error
expectedReportedChunks int
}{
{
name: "Non-Critical Error",
input: []DataOrErr{{Err: ErrProcessingWarning}},
},
{
name: "Critical Error",
input: []DataOrErr{{Err: ErrProcessingFatal}},
expectedErr: ErrProcessingFatal,
},
{
name: "No Error",
input: []DataOrErr{
{Data: []byte("test data")},
{Data: []byte("more data")},
},
expectedReportedChunks: 2,
},
{
name: "Context Canceled",
input: []DataOrErr{{Err: stdctx.Canceled}},
expectedErr: stdctx.Canceled,
},
{
name: "Context Deadline Exceeded",
input: []DataOrErr{{Err: stdctx.DeadlineExceeded}},
expectedErr: stdctx.DeadlineExceeded,
},
{
name: "EOF Error",
input: []DataOrErr{{Err: io.EOF}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx := context.Background()
chunkSkel := &sources.Chunk{}
reporter := new(mockReporter)

dataErrChan := make(chan DataOrErr, len(tc.input))
for _, de := range tc.input {
dataErrChan <- de
}
close(dataErrChan)

err := handleChunksWithError(ctx, dataErrChan, chunkSkel, reporter)

if tc.expectedErr != nil {
assert.ErrorIs(t, err, tc.expectedErr, "handleChunksWithError should return the expected error")
} else {
assert.NoError(t, err, "handleChunksWithError should not return an error for non-critical errors")
}

assert.Equal(t, tc.expectedReportedChunks, reporter.reportedChunks, "should have reported the expected number of chunks")
})
}
}
16 changes: 12 additions & 4 deletions pkg/handlers/rpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,26 @@ func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) chan D
} else {
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open rpm archive")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()

start := time.Now()
rpm, err := rpmutils.ReadRpm(input)
if err != nil {
ctx.Logger().Error(err, "Error reading rpm file")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: reading rpm error: %v", ErrProcessingFatal, err),
}
return
}

reader, err := rpm.PayloadReaderExtended()
if err != nil {
ctx.Logger().Error(err, "Error reading rpm file")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: uncompressing rpm error: %v", ErrProcessingFatal, err),
}
return
}

Expand Down Expand Up @@ -99,7 +105,9 @@ func (h *rpmHandler) processRPMFiles(
}

if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
fileCtx.Logger().Error(err, "error handling archive content in RPM")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error processing RPM archive: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
}

Expand Down
Loading