From be8e254288a077602b593903ceaf1074ded02b0f Mon Sep 17 00:00:00 2001 From: Miccah Castorina Date: Tue, 31 Oct 2023 21:57:57 -0700 Subject: [PATCH] [chore] Replace chunks channel with ChunkReporter in git based sources ChunkReporter is more flexible and will allow code reuse for unit chunking. ChanReporter was added as a way to maintain the original channel functionality, so this PR should not alter existing behavior. --- hack/snifftest/main.go | 2 +- pkg/handlers/archive_test.go | 14 +++--- pkg/handlers/handlers.go | 18 ++++--- pkg/sources/chan_reporter.go | 22 +++++++++ pkg/sources/filesystem/filesystem.go | 4 +- pkg/sources/gcs/gcs.go | 4 +- pkg/sources/git/git.go | 71 ++++++++++++++++++---------- pkg/sources/git/git_test.go | 2 +- pkg/sources/github/github.go | 2 +- pkg/sources/gitlab/gitlab.go | 2 +- pkg/sources/s3/s3.go | 4 +- 11 files changed, 92 insertions(+), 53 deletions(-) create mode 100644 pkg/sources/chan_reporter.go diff --git a/hack/snifftest/main.go b/hack/snifftest/main.go index 1a1062d373fb..217ecd079f27 100644 --- a/hack/snifftest/main.go +++ b/hack/snifftest/main.go @@ -204,7 +204,7 @@ func main() { }) logger.Info("scanning repo", "repo", r) - err = s.ScanRepo(ctx, repo, path, git.NewScanOptions(), chunksChan) + err = s.ScanRepo(ctx, repo, path, git.NewScanOptions(), sources.ChanReporter{Ch: chunksChan}) if err != nil { logFatal(err, "error scanning repo") } diff --git a/pkg/handlers/archive_test.go b/pkg/handlers/archive_test.go index 3a2d9e5e4c30..8f2186f47ef0 100644 --- a/pkg/handlers/archive_test.go +++ b/pkg/handlers/archive_test.go @@ -107,12 +107,12 @@ func TestArchiveHandler(t *testing.T) { } func TestHandleFile(t *testing.T) { - ch := make(chan *sources.Chunk, 2) + reporter := sources.ChanReporter{Ch: make(chan *sources.Chunk, 2)} // Context cancels the operation. canceledCtx, cancel := context.WithCancel(context.Background()) cancel() - assert.False(t, HandleFile(canceledCtx, strings.NewReader("file"), &sources.Chunk{}, ch)) + assert.False(t, HandleFile(canceledCtx, strings.NewReader("file"), &sources.Chunk{}, reporter)) // Only one chunk is sent on the channel. // TODO: Embed a zip without making an HTTP request. @@ -124,9 +124,9 @@ func TestHandleFile(t *testing.T) { reader, err := diskbufferreader.New(resp.Body) assert.NoError(t, err) - assert.Equal(t, 0, len(ch)) - assert.True(t, HandleFile(context.Background(), reader, &sources.Chunk{}, ch)) - assert.Equal(t, 1, len(ch)) + assert.Equal(t, 0, len(reporter.Ch)) + assert.True(t, HandleFile(context.Background(), reader, &sources.Chunk{}, reporter)) + assert.Equal(t, 1, len(reporter.Ch)) } func TestReadToMax(t *testing.T) { @@ -209,7 +209,7 @@ func TestExtractTarContent(t *testing.T) { chunkCh := make(chan *sources.Chunk) go func() { defer close(chunkCh) - ok := HandleFile(ctx, file, &sources.Chunk{}, chunkCh) + ok := HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh}) assert.True(t, ok) }() @@ -262,7 +262,7 @@ func TestNestedDirArchive(t *testing.T) { go func() { defer close(sourceChan) - HandleFile(ctx, file, &sources.Chunk{}, sourceChan) + HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan}) }() count := 0 diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index 131e77c963a4..d2792c0e69f6 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -34,10 +34,10 @@ type Handler interface { // HandleFile processes a given file by selecting an appropriate handler from DefaultHandlers. // It first checks if the handler implements SpecializedHandler for any special processing, // then falls back to regular file type handling. If successful, it reads the file in chunks, -// packages them in the provided chunk skeleton, and sends them to chunksChan. +// packages them in the provided chunk skeleton, and reports them to the chunk reporter. // The function returns true if processing was successful and false otherwise. // Context is used for cancellation, and the caller is responsible for canceling it if needed. -func HandleFile(ctx context.Context, file io.Reader, chunkSkel *sources.Chunk, chunksChan chan *sources.Chunk) bool { +func HandleFile(ctx context.Context, file io.Reader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool { aCtx := logContext.AddLogger(ctx) for _, h := range DefaultHandlers() { h.New() @@ -51,7 +51,7 @@ func HandleFile(ctx context.Context, file io.Reader, chunkSkel *sources.Chunk, c return false } - if success := processHandler(aCtx, h, reReader, chunkSkel, chunksChan); success { + if success := processHandler(aCtx, h, reReader, chunkSkel, reporter); success { return true } } @@ -59,14 +59,14 @@ func HandleFile(ctx context.Context, file io.Reader, chunkSkel *sources.Chunk, c return false } -func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, chunksChan chan *sources.Chunk) bool { +func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool { defer reReader.Close() defer reReader.Stop() if specialHandler, ok := h.(SpecializedHandler); ok { file, isSpecial, err := specialHandler.HandleSpecialized(ctx, reReader) if isSpecial { - return handleChunks(ctx, h.FromFile(ctx, file), chunkSkel, chunksChan) + return handleChunks(ctx, h.FromFile(ctx, file), chunkSkel, reporter) } if err != nil { ctx.Logger().Error(err, "error handling file") @@ -82,10 +82,10 @@ func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreade return false } - return handleChunks(ctx, h.FromFile(ctx, reReader), chunkSkel, chunksChan) + return handleChunks(ctx, h.FromFile(ctx, reReader), chunkSkel, reporter) } -func handleChunks(ctx context.Context, handlerChan chan []byte, chunkSkel *sources.Chunk, chunksChan chan *sources.Chunk) bool { +func handleChunks(ctx context.Context, handlerChan chan []byte, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool { for { select { case data, open := <-handlerChan: @@ -94,9 +94,7 @@ func handleChunks(ctx context.Context, handlerChan chan []byte, chunkSkel *sourc } chunk := *chunkSkel chunk.Data = data - select { - case chunksChan <- &chunk: - case <-ctx.Done(): + if err := reporter.ChunkOk(logContext.AddLogger(ctx), chunk); err != nil { return false } case <-ctx.Done(): diff --git a/pkg/sources/chan_reporter.go b/pkg/sources/chan_reporter.go new file mode 100644 index 000000000000..11c1619806e4 --- /dev/null +++ b/pkg/sources/chan_reporter.go @@ -0,0 +1,22 @@ +package sources + +import ( + "github.com/trufflesecurity/trufflehog/v3/pkg/common" + "github.com/trufflesecurity/trufflehog/v3/pkg/context" +) + +var _ ChunkReporter = (*ChanReporter)(nil) + +// ChanReporter is a ChunkReporter that writes to a channel. +type ChanReporter struct { + Ch chan<- *Chunk +} + +func (c ChanReporter) ChunkOk(ctx context.Context, chunk Chunk) error { + return common.CancellableWrite(ctx, c.Ch, &chunk) +} + +func (ChanReporter) ChunkErr(ctx context.Context, err error) error { + ctx.Logger().Error(err, "error chunking") + return nil +} diff --git a/pkg/sources/filesystem/filesystem.go b/pkg/sources/filesystem/filesystem.go index 791f5eb93f60..e969a12b1f4c 100644 --- a/pkg/sources/filesystem/filesystem.go +++ b/pkg/sources/filesystem/filesystem.go @@ -7,9 +7,9 @@ import ( "os" "path/filepath" - diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" "github.com/go-errors/errors" "github.com/go-logr/logr" + diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -172,7 +172,7 @@ func (s *Source) scanFile(ctx context.Context, path string, chunksChan chan *sou }, Verify: s.verify, } - if handlers.HandleFile(ctx, reReader, chunkSkel, chunksChan) { + if handlers.HandleFile(ctx, reReader, chunkSkel, sources.ChanReporter{Ch: chunksChan}) { return nil } diff --git a/pkg/sources/gcs/gcs.go b/pkg/sources/gcs/gcs.go index 202752085ddd..72ac33708fee 100644 --- a/pkg/sources/gcs/gcs.go +++ b/pkg/sources/gcs/gcs.go @@ -10,9 +10,9 @@ import ( "sync" "cloud.google.com/go/storage" - diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" "github.com/go-errors/errors" "github.com/go-logr/logr" + diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" "golang.org/x/oauth2" "golang.org/x/oauth2/endpoints" "google.golang.org/protobuf/proto" @@ -375,7 +375,7 @@ func (s *Source) readObjectData(ctx context.Context, o object, chunk *sources.Ch } defer reader.Close() - if handlers.HandleFile(ctx, reader, chunk, s.chunksCh) { + if handlers.HandleFile(ctx, reader, chunk, sources.ChanReporter{Ch: s.chunksCh}) { ctx.Logger().V(3).Info("File was handled", "name", s.name, "bucket", o.bucket, "object", o.name) return nil, nil } diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 807c7b459744..c01fee4e58c4 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -25,7 +25,6 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" - "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/gitparse" "github.com/trufflesecurity/trufflehog/v3/pkg/handlers" @@ -157,10 +156,11 @@ func (s *Source) Init(aCtx context.Context, name string, jobId sources.JobID, so // Chunks emits chunks of bytes over a channel. func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ ...sources.ChunkingTarget) error { - if err := s.scanRepos(ctx, chunksChan); err != nil { + reporter := sources.ChanReporter{Ch: chunksChan} + if err := s.scanRepos(ctx, reporter); err != nil { return err } - if err := s.scanDirs(ctx, chunksChan); err != nil { + if err := s.scanDirs(ctx, reporter); err != nil { return err } @@ -174,7 +174,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ . } // scanRepos scans the configured repositories in s.conn.Repositories. -func (s *Source) scanRepos(ctx context.Context, chunksChan chan *sources.Chunk) error { +func (s *Source) scanRepos(ctx context.Context, reporter sources.ChunkReporter) error { if len(s.conn.Repositories) == 0 { return nil } @@ -196,7 +196,7 @@ func (s *Source) scanRepos(ctx context.Context, chunksChan chan *sources.Chunk) if err != nil { return err } - return s.git.ScanRepo(ctx, repo, path, s.scanOptions, chunksChan) + return s.git.ScanRepo(ctx, repo, path, s.scanOptions, reporter) }(repoURI) if err != nil { ctx.Logger().Info("error scanning repository", "repo", repoURI, "error", err) @@ -215,7 +215,7 @@ func (s *Source) scanRepos(ctx context.Context, chunksChan chan *sources.Chunk) if err != nil { return err } - return s.git.ScanRepo(ctx, repo, path, s.scanOptions, chunksChan) + return s.git.ScanRepo(ctx, repo, path, s.scanOptions, reporter) }(repoURI) if err != nil { ctx.Logger().Info("error scanning repository", "repo", repoURI, "error", err) @@ -234,7 +234,7 @@ func (s *Source) scanRepos(ctx context.Context, chunksChan chan *sources.Chunk) if err != nil { return err } - return s.git.ScanRepo(ctx, repo, path, s.scanOptions, chunksChan) + return s.git.ScanRepo(ctx, repo, path, s.scanOptions, reporter) }(repoURI) if err != nil { ctx.Logger().Info("error scanning repository", "repo", repoURI, "error", err) @@ -248,7 +248,7 @@ func (s *Source) scanRepos(ctx context.Context, chunksChan chan *sources.Chunk) } // scanDirs scans the configured directories in s.conn.Directories. -func (s *Source) scanDirs(ctx context.Context, chunksChan chan *sources.Chunk) error { +func (s *Source) scanDirs(ctx context.Context, reporter sources.ChunkReporter) error { totalRepos := len(s.conn.Repositories) + len(s.conn.Directories) for i, gitDir := range s.conn.Directories { s.SetProgressComplete(len(s.conn.Repositories)+i, totalRepos, fmt.Sprintf("Repo: %s", gitDir), "") @@ -272,7 +272,7 @@ func (s *Source) scanDirs(ctx context.Context, chunksChan chan *sources.Chunk) e defer os.RemoveAll(repoPath) } - return s.git.ScanRepo(ctx, repo, repoPath, s.scanOptions, chunksChan) + return s.git.ScanRepo(ctx, repo, repoPath, s.scanOptions, reporter) }(gitDir) if err != nil { ctx.Logger().Info("error scanning repository", "repo", gitDir, "error", err) @@ -445,7 +445,7 @@ func (s *Git) CommitsScanned() uint64 { return atomic.LoadUint64(&s.metrics.commitsScanned) } -func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string, scanOptions *ScanOptions, chunksChan chan *sources.Chunk) error { +func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string, scanOptions *ScanOptions, reporter sources.ChunkReporter) error { if err := GitCmdCheck(); err != nil { return err } @@ -505,18 +505,18 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string SourceMetadata: metadata, Verify: s.verify, } - if err := handleBinary(ctx, repo, chunksChan, chunkSkel, commitHash, fileName); err != nil { + if err := handleBinary(ctx, repo, reporter, chunkSkel, commitHash, fileName); err != nil { logger.V(1).Info("error handling binary file", "error", err, "filename", fileName, "commit", commitHash, "file", diff.PathB) } continue } if diff.Content.Len() > sources.ChunkSize+sources.PeekSize { - s.gitChunk(ctx, diff, fileName, email, hash, when, urlMetadata, chunksChan) + s.gitChunk(ctx, diff, fileName, email, hash, when, urlMetadata, reporter) continue } metadata := s.sourceMetadataFunc(fileName, email, hash, when, urlMetadata, int64(diff.LineStart)) - chunksChan <- &sources.Chunk{ + chunk := sources.Chunk{ SourceName: s.sourceName, SourceID: s.sourceID, JobID: s.jobID, @@ -525,12 +525,15 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string Data: diff.Content.Bytes(), Verify: s.verify, } + if err := reporter.ChunkOk(ctx, chunk); err != nil { + return err + } } } return nil } -func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, hash, when, urlMetadata string, chunksChan chan *sources.Chunk) { +func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, hash, when, urlMetadata string, reporter sources.ChunkReporter) { originalChunk := bufio.NewScanner(&diff.Content) newChunkBuffer := bytes.Buffer{} lastOffset := 0 @@ -543,7 +546,7 @@ func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, if newChunkBuffer.Len() > 0 { // Send the existing fragment. metadata := s.sourceMetadataFunc(fileName, email, hash, when, urlMetadata, int64(diff.LineStart+lastOffset)) - chunksChan <- &sources.Chunk{ + chunk := sources.Chunk{ SourceName: s.sourceName, SourceID: s.sourceID, JobID: s.jobID, @@ -552,13 +555,18 @@ func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, Data: append([]byte{}, newChunkBuffer.Bytes()...), Verify: s.verify, } + if err := reporter.ChunkOk(ctx, chunk); err != nil { + // TODO: Return error. + return + } + newChunkBuffer.Reset() lastOffset = offset } if len(line) > sources.ChunkSize { // Send the oversize line. metadata := s.sourceMetadataFunc(fileName, email, hash, when, urlMetadata, int64(diff.LineStart+offset)) - chunksChan <- &sources.Chunk{ + chunk := sources.Chunk{ SourceName: s.sourceName, SourceID: s.sourceID, JobID: s.jobID, @@ -567,6 +575,10 @@ func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, Data: line, Verify: s.verify, } + if err := reporter.ChunkOk(ctx, chunk); err != nil { + // TODO: Return error. + return + } continue } } @@ -578,7 +590,7 @@ func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, // Send anything still in the new chunk buffer if newChunkBuffer.Len() > 0 { metadata := s.sourceMetadataFunc(fileName, email, hash, when, urlMetadata, int64(diff.LineStart+lastOffset)) - chunksChan <- &sources.Chunk{ + chunk := sources.Chunk{ SourceName: s.sourceName, SourceID: s.sourceID, JobID: s.jobID, @@ -587,11 +599,15 @@ func (s *Git) gitChunk(ctx context.Context, diff gitparse.Diff, fileName, email, Data: append([]byte{}, newChunkBuffer.Bytes()...), Verify: s.verify, } + if err := reporter.ChunkOk(ctx, chunk); err != nil { + // TODO: Return error. + return + } } } // ScanStaged chunks staged changes. -func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string, scanOptions *ScanOptions, chunksChan chan *sources.Chunk) error { +func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string, scanOptions *ScanOptions, reporter sources.ChunkReporter) error { // Get the URL metadata for reporting (may be empty). urlMetadata := getSafeRemoteURL(repo, "origin") @@ -652,14 +668,14 @@ func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string, SourceMetadata: metadata, Verify: s.verify, } - if err := handleBinary(ctx, repo, chunksChan, chunkSkel, commitHash, fileName); err != nil { + if err := handleBinary(ctx, repo, reporter, chunkSkel, commitHash, fileName); err != nil { logger.V(1).Info("error handling binary file", "error", err, "filename", fileName) } continue } metadata := s.sourceMetadataFunc(fileName, email, "Staged", when, urlMetadata, int64(diff.LineStart)) - chunksChan <- &sources.Chunk{ + chunk := sources.Chunk{ SourceName: s.sourceName, SourceID: s.sourceID, JobID: s.jobID, @@ -668,12 +684,15 @@ func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string, Data: diff.Content.Bytes(), Verify: s.verify, } + if err := reporter.ChunkOk(ctx, chunk); err != nil { + return err + } } } return nil } -func (s *Git) ScanRepo(ctx context.Context, repo *git.Repository, repoPath string, scanOptions *ScanOptions, chunksChan chan *sources.Chunk) error { +func (s *Git) ScanRepo(ctx context.Context, repo *git.Repository, repoPath string, scanOptions *ScanOptions, reporter sources.ChunkReporter) error { if scanOptions == nil { scanOptions = NewScanOptions() } @@ -682,11 +701,11 @@ func (s *Git) ScanRepo(ctx context.Context, repo *git.Repository, repoPath strin } start := time.Now().Unix() - if err := s.ScanCommits(ctx, repo, repoPath, scanOptions, chunksChan); err != nil { + if err := s.ScanCommits(ctx, repo, repoPath, scanOptions, reporter); err != nil { return err } if !scanOptions.Bare { - if err := s.ScanStaged(ctx, repo, repoPath, scanOptions, chunksChan); err != nil { + if err := s.ScanStaged(ctx, repo, repoPath, scanOptions, reporter); err != nil { ctx.Logger().V(1).Info("error scanning unstaged changes", "error", err) } } @@ -934,7 +953,7 @@ func getSafeRemoteURL(repo *git.Repository, preferred string) string { return safeURL } -func handleBinary(ctx context.Context, repo *git.Repository, chunksChan chan *sources.Chunk, chunkSkel *sources.Chunk, commitHash plumbing.Hash, path string) error { +func handleBinary(ctx context.Context, repo *git.Repository, reporter sources.ChunkReporter, chunkSkel *sources.Chunk, commitHash plumbing.Hash, path string) error { ctx.Logger().V(5).Info("handling binary file", "path", path) commit, err := repo.CommitObject(commitHash) if err != nil { @@ -958,7 +977,7 @@ func handleBinary(ctx context.Context, repo *git.Repository, chunksChan chan *so } defer reader.Close() - if handlers.HandleFile(ctx, reader, chunkSkel, chunksChan) { + if handlers.HandleFile(ctx, reader, chunkSkel, reporter) { return nil } @@ -976,7 +995,7 @@ func handleBinary(ctx context.Context, repo *git.Repository, chunksChan chan *so if err := data.Error(); err != nil { return err } - if err := common.CancellableWrite(ctx, chunksChan, &chunk); err != nil { + if err := reporter.ChunkOk(ctx, chunk); err != nil { return err } } diff --git a/pkg/sources/git/git_test.go b/pkg/sources/git/git_test.go index 15914b9ecda9..e9ec417f3d54 100644 --- a/pkg/sources/git/git_test.go +++ b/pkg/sources/git/git_test.go @@ -236,7 +236,7 @@ func TestSource_Chunks_Integration(t *testing.T) { if err != nil { panic(err) } - err = s.git.ScanRepo(ctx, repo, repoPath, &tt.scanOptions, chunksCh) + err = s.git.ScanRepo(ctx, repo, repoPath, &tt.scanOptions, sources.ChanReporter{Ch: chunksCh}) if err != nil { panic(err) } diff --git a/pkg/sources/github/github.go b/pkg/sources/github/github.go index 0cfd3af305eb..6e60ff239cd1 100644 --- a/pkg/sources/github/github.go +++ b/pkg/sources/github/github.go @@ -797,7 +797,7 @@ func (s *Source) scan(ctx context.Context, installationClient *github.Client, ch logger.V(2).Info(fmt.Sprintf("scanned %d/%d repos", scanned, len(s.repos)), "repo_size", repoSize, "duration_seconds", time.Since(start).Seconds()) }(now) - if err = s.git.ScanRepo(ctx, repo, path, s.scanOptions, chunksChan); err != nil { + if err = s.git.ScanRepo(ctx, repo, path, s.scanOptions, sources.ChanReporter{Ch: chunksChan}); err != nil { scanErrs.Add(fmt.Errorf("error scanning repo %s: %w", repoURL, err)) return nil } diff --git a/pkg/sources/gitlab/gitlab.go b/pkg/sources/gitlab/gitlab.go index f1efa8917931..6923d63f0cd5 100644 --- a/pkg/sources/gitlab/gitlab.go +++ b/pkg/sources/gitlab/gitlab.go @@ -453,7 +453,7 @@ func (s *Source) scanRepos(ctx context.Context, chunksChan chan *sources.Chunk) } logger.V(2).Info(fmt.Sprintf("Starting to scan repo %d/%d", i+1, len(s.repos))) - if err = s.git.ScanRepo(ctx, repo, path, s.scanOptions, chunksChan); err != nil { + if err = s.git.ScanRepo(ctx, repo, path, s.scanOptions, sources.ChanReporter{Ch: chunksChan}); err != nil { scanErrs.Add(err) return nil } diff --git a/pkg/sources/s3/s3.go b/pkg/sources/s3/s3.go index e21830b7a5a1..977cc33b2adc 100644 --- a/pkg/sources/s3/s3.go +++ b/pkg/sources/s3/s3.go @@ -14,9 +14,9 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/sts" - diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" "github.com/go-errors/errors" "github.com/go-logr/logr" + diskbufferreader "github.com/trufflesecurity/disk-buffer-reader" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -376,7 +376,7 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan }, Verify: s.verify, } - if handlers.HandleFile(ctx, reader, chunkSkel, chunksChan) { + if handlers.HandleFile(ctx, reader, chunkSkel, sources.ChanReporter{Ch: chunksChan}) { atomic.AddUint64(objectCount, 1) s.log.V(5).Info("S3 object scanned.", "object_count", objectCount, "page_number", pageNumber) return nil