Skip to content

Commit

Permalink
[chore] Replace chunks channel with ChunkReporter in git based sources (
Browse files Browse the repository at this point in the history
#2082)

ChunkReporter is more flexible and will allow code reuse for unit
chunking. ChanReporter was added as a way to maintain the original
channel functionality, so this PR should not alter existing behavior.
  • Loading branch information
mcastorina authored Nov 1, 2023
1 parent d55cb56 commit 52600a8
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 53 deletions.
2 changes: 1 addition & 1 deletion hack/snifftest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func main() {
})

logger.Info("scanning repo", "repo", r)
err = s.ScanRepo(ctx, repo, path, git.NewScanOptions(), chunksChan)
err = s.ScanRepo(ctx, repo, path, git.NewScanOptions(), sources.ChanReporter{Ch: chunksChan})
if err != nil {
logFatal(err, "error scanning repo")
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/handlers/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func TestArchiveHandler(t *testing.T) {
}

func TestHandleFile(t *testing.T) {
ch := make(chan *sources.Chunk, 2)
reporter := sources.ChanReporter{Ch: make(chan *sources.Chunk, 2)}

// Context cancels the operation.
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
assert.False(t, HandleFile(canceledCtx, strings.NewReader("file"), &sources.Chunk{}, ch))
assert.False(t, HandleFile(canceledCtx, strings.NewReader("file"), &sources.Chunk{}, reporter))

// Only one chunk is sent on the channel.
// TODO: Embed a zip without making an HTTP request.
Expand All @@ -124,9 +124,9 @@ func TestHandleFile(t *testing.T) {
reader, err := diskbufferreader.New(resp.Body)
assert.NoError(t, err)

assert.Equal(t, 0, len(ch))
assert.True(t, HandleFile(context.Background(), reader, &sources.Chunk{}, ch))
assert.Equal(t, 1, len(ch))
assert.Equal(t, 0, len(reporter.Ch))
assert.True(t, HandleFile(context.Background(), reader, &sources.Chunk{}, reporter))
assert.Equal(t, 1, len(reporter.Ch))
}

func TestReadToMax(t *testing.T) {
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestExtractTarContent(t *testing.T) {
chunkCh := make(chan *sources.Chunk)
go func() {
defer close(chunkCh)
ok := HandleFile(ctx, file, &sources.Chunk{}, chunkCh)
ok := HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh})
assert.True(t, ok)
}()

Expand Down Expand Up @@ -262,7 +262,7 @@ func TestNestedDirArchive(t *testing.T) {

go func() {
defer close(sourceChan)
HandleFile(ctx, file, &sources.Chunk{}, sourceChan)
HandleFile(ctx, file, &sources.Chunk{}, sources.ChanReporter{Ch: sourceChan})
}()

count := 0
Expand Down
18 changes: 8 additions & 10 deletions pkg/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type Handler interface {
// HandleFile processes a given file by selecting an appropriate handler from DefaultHandlers.
// It first checks if the handler implements SpecializedHandler for any special processing,
// then falls back to regular file type handling. If successful, it reads the file in chunks,
// packages them in the provided chunk skeleton, and sends them to chunksChan.
// packages them in the provided chunk skeleton, and reports them to the chunk reporter.
// The function returns true if processing was successful and false otherwise.
// Context is used for cancellation, and the caller is responsible for canceling it if needed.
func HandleFile(ctx context.Context, file io.Reader, chunkSkel *sources.Chunk, chunksChan chan *sources.Chunk) bool {
func HandleFile(ctx context.Context, file io.Reader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool {
aCtx := logContext.AddLogger(ctx)
for _, h := range DefaultHandlers() {
h.New()
Expand All @@ -51,22 +51,22 @@ func HandleFile(ctx context.Context, file io.Reader, chunkSkel *sources.Chunk, c
return false
}

if success := processHandler(aCtx, h, reReader, chunkSkel, chunksChan); success {
if success := processHandler(aCtx, h, reReader, chunkSkel, reporter); success {
return true
}
}

return false
}

func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, chunksChan chan *sources.Chunk) bool {
func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreader.DiskBufferReader, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool {
defer reReader.Close()
defer reReader.Stop()

if specialHandler, ok := h.(SpecializedHandler); ok {
file, isSpecial, err := specialHandler.HandleSpecialized(ctx, reReader)
if isSpecial {
return handleChunks(ctx, h.FromFile(ctx, file), chunkSkel, chunksChan)
return handleChunks(ctx, h.FromFile(ctx, file), chunkSkel, reporter)
}
if err != nil {
ctx.Logger().Error(err, "error handling file")
Expand All @@ -82,10 +82,10 @@ func processHandler(ctx logContext.Context, h Handler, reReader *diskbufferreade
return false
}

return handleChunks(ctx, h.FromFile(ctx, reReader), chunkSkel, chunksChan)
return handleChunks(ctx, h.FromFile(ctx, reReader), chunkSkel, reporter)
}

func handleChunks(ctx context.Context, handlerChan chan []byte, chunkSkel *sources.Chunk, chunksChan chan *sources.Chunk) bool {
func handleChunks(ctx context.Context, handlerChan chan []byte, chunkSkel *sources.Chunk, reporter sources.ChunkReporter) bool {
for {
select {
case data, open := <-handlerChan:
Expand All @@ -94,9 +94,7 @@ func handleChunks(ctx context.Context, handlerChan chan []byte, chunkSkel *sourc
}
chunk := *chunkSkel
chunk.Data = data
select {
case chunksChan <- &chunk:
case <-ctx.Done():
if err := reporter.ChunkOk(logContext.AddLogger(ctx), chunk); err != nil {
return false
}
case <-ctx.Done():
Expand Down
22 changes: 22 additions & 0 deletions pkg/sources/chan_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sources

import (
"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
)

var _ ChunkReporter = (*ChanReporter)(nil)

// ChanReporter is a ChunkReporter that writes to a channel.
type ChanReporter struct {
Ch chan<- *Chunk
}

func (c ChanReporter) ChunkOk(ctx context.Context, chunk Chunk) error {
return common.CancellableWrite(ctx, c.Ch, &chunk)
}

func (ChanReporter) ChunkErr(ctx context.Context, err error) error {
ctx.Logger().Error(err, "error chunking")
return nil
}
4 changes: 2 additions & 2 deletions pkg/sources/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"os"
"path/filepath"

diskbufferreader "github.com/trufflesecurity/disk-buffer-reader"
"github.com/go-errors/errors"
"github.com/go-logr/logr"
diskbufferreader "github.com/trufflesecurity/disk-buffer-reader"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *Source) scanFile(ctx context.Context, path string, chunksChan chan *sou
},
Verify: s.verify,
}
if handlers.HandleFile(ctx, reReader, chunkSkel, chunksChan) {
if handlers.HandleFile(ctx, reReader, chunkSkel, sources.ChanReporter{Ch: chunksChan}) {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"sync"

"cloud.google.com/go/storage"
diskbufferreader "github.com/trufflesecurity/disk-buffer-reader"
"github.com/go-errors/errors"
"github.com/go-logr/logr"
diskbufferreader "github.com/trufflesecurity/disk-buffer-reader"
"golang.org/x/oauth2"
"golang.org/x/oauth2/endpoints"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -375,7 +375,7 @@ func (s *Source) readObjectData(ctx context.Context, o object, chunk *sources.Ch
}
defer reader.Close()

if handlers.HandleFile(ctx, reader, chunk, s.chunksCh) {
if handlers.HandleFile(ctx, reader, chunk, sources.ChanReporter{Ch: s.chunksCh}) {
ctx.Logger().V(3).Info("File was handled", "name", s.name, "bucket", o.bucket, "object", o.name)
return nil, nil
}
Expand Down
Loading

0 comments on commit 52600a8

Please sign in to comment.