Skip to content

Commit

Permalink
otimization and fix the bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrav committed Jan 31, 2024
1 parent 404b2b6 commit 76c2b20
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 34 deletions.
22 changes: 17 additions & 5 deletions pkg/engine/ahocorasick/ahocorasickcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,27 @@ func NewAhoCorasickCore(allDetectors []detectors.Detector) *AhoCorasickCore {
// it will be called once per chunk and that many allocations has a noticeable performance cost.
func (ac *AhoCorasickCore) PopulateMatchingDetectors(chunkData string, dts map[DetectorKey]detectors.Detector) []detectors.Detector {
matches := ac.prefilter.MatchString(strings.ToLower(chunkData))
d := make([]detectors.Detector, 0, len(matches))
for _, m := range ac.prefilter.MatchString(strings.ToLower(chunkData)) {

// Use a map to avoid adding duplicate detectors to the slice.
addedDetectors := make(map[DetectorKey]struct{})
uniqueDetectors := make([]detectors.Detector, 0, len(matches))

for _, m := range matches {
for _, k := range ac.keywordsToDetectors[m.MatchString()] {
dts[k] = ac.detectorsByKey[k]
d = append(d, ac.detectorsByKey[k])
if _, exists := addedDetectors[k]; exists {
continue
}
// Add to the map to track already added detectors.
addedDetectors[k] = struct{}{}

// Add the detector to the map and slice.
detector := ac.detectorsByKey[k]
dts[k] = detector
uniqueDetectors = append(uniqueDetectors, detector)
}
}

return d
return uniqueDetectors
}

// createDetectorKey creates a unique key for each detector from its type, version, and, for
Expand Down
67 changes: 41 additions & 26 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func Start(ctx context.Context, options ...Option) (*Engine, error) {
return e, nil
}

const defaultChannelBuffer = 1
var defaultChannelBuffer = runtime.NumCPU()

// initialize prepares the engine's internal structures. The LRU cache optimizes
// deduplication efforts, allowing the engine to quickly check if a chunk has
Expand All @@ -334,11 +334,25 @@ func (e *Engine) initialize(ctx context.Context, options ...Option) error {
if err != nil {
return fmt.Errorf("failed to initialize LRU cache: %w", err)
}
const (
// detectableChunksChanMultiplier is set to accommodate a high number of concurrent worker goroutines.
// This multiplier ensures that the detectableChunksChan channel has sufficient buffer capacity
// to hold messages from multiple worker groups (detector workers/ reverifier workers) without blocking.
// A large buffer helps accommodate for the fact workers are producing data at a faster rate
// than it can be consumed.
detectableChunksChanMultiplier = 50
// reverifiableChunksChanMultiplier uses a smaller buffer compared to detectableChunksChanMultiplier.
// This reflects the anticipated lower volume of data that needs re-verification.
// The buffer size is a trade-off between memory usage and the need to prevent blocking.
reverifiableChunksChanMultiplier = 25
)

// Channels are used for communication between different parts of the engine,
// ensuring that data flows smoothly without race conditions.
e.detectableChunksChan = make(chan detectableChunk, defaultChannelBuffer)
e.reverifiableChunksChan = make(chan reVerifiableChunk, defaultChannelBuffer)
// The buffer sizes for these channels are set to multiples of defaultChannelBuffer,
// considering the expected concurrency and workload in the system.
e.detectableChunksChan = make(chan detectableChunk, defaultChannelBuffer*detectableChunksChanMultiplier)
e.reverifiableChunksChan = make(chan reVerifiableChunk, defaultChannelBuffer*reverifiableChunksChanMultiplier)
e.results = make(chan detectors.ResultWithMetadata, defaultChannelBuffer)
e.dedupeCache = cache
e.printer = new(output.PlainPrinter)
Expand Down Expand Up @@ -430,8 +444,9 @@ func (e *Engine) startWorkers(ctx context.Context) {

// Reverifier workers handle verification of chunks that have been detected by multiple detectors.
// They ensure that verification is disabled for any secrets that have been detected by multiple detectors.
const reverifierWorkerMultiplier = detectorWorkerMultiplier / 2
ctx.Logger().V(2).Info("starting reverifier workers", "count", e.concurrency)
for worker := uint64(0); worker < uint64(e.concurrency*100); worker++ {
for worker := uint64(0); worker < uint64(e.concurrency*reverifierWorkerMultiplier); worker++ {
e.reverifiersWg.Add(1)
go func() {
ctx := context.WithValue(ctx, "secret_worker_id", common.RandomID(5))
Expand Down Expand Up @@ -526,7 +541,7 @@ func (e *Engine) detectorWorker(ctx context.Context) {
var wgReverify sync.WaitGroup

// Reuse the same map to avoid allocations.
const avgDetectorsPerChunk = 2
const avgDetectorsPerChunk = 8
chunkSpecificDetectors := make(map[ahocorasick.DetectorKey]detectors.Detector, avgDetectorsPerChunk)
for originalChunk := range e.ChunksChan() {
for chunk := range sources.Chunker(originalChunk) {
Expand Down Expand Up @@ -575,21 +590,31 @@ func (e *Engine) detectorWorker(ctx context.Context) {
ctx.Logger().V(4).Info("finished scanning chunks")
}

func likelyDuplicate(ctx context.Context, val string, dupes map[string]struct{}) bool {
if _, ok := dupes[val]; ok {
func likelyDuplicate(ctx context.Context, val []byte, dupes map[string]struct{}) bool {
if _, ok := dupes[string(val)]; ok {
return true
}
for k := range dupes {

// The string conversion is purposefully placed after the dupes check to avoid the allocation.
// []byte -> string conversion within a map lookup does not allocate. (due to compiler optimizations)
valStr := string(val)
const similarityThreshold = 0.9
for dupe := range dupes {
// Avoid comparing strings of vastly different lengths.
if len(k)*10 < len(val)*9 || len(k)*10 > len(val)*11 {
if len(dupe)*10 < len(valStr)*9 || len(dupe)*10 > len(valStr)*11 {
continue
}

similarity := strutil.Similarity(val, k, metrics.NewLevenshtein())
similarity := strutil.Similarity(valStr, dupe, metrics.NewLevenshtein())

// close enough
if similarity > 0.9 {
ctx.Logger().V(2).Info("found similar duplicate", "val", val, "k", k, "similarity", similarity)
if similarity > similarityThreshold {
ctx.Logger().V(2).Info(
"found similar duplicate",
"val", val,
"dupe", dupe,
"similarity", similarity,
)
return true
}
}
Expand All @@ -604,7 +629,6 @@ func (e *Engine) reverifierWorker(ctx context.Context) {
detectorsWithResult := make(map[detectors.Detector]struct{}, avgSecretsPerDetector)
chunkSecrets := make(map[string]struct{}, avgSecretsPerDetector)

nextChunk:
for chunk := range e.reverifiableChunksChan {
for _, detector := range chunk.detectors {
// DO NOT VERIFY at this stage of the pipeline.
Expand Down Expand Up @@ -632,32 +656,23 @@ nextChunk:
// Ex:
// - postman api key: PMAK-qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r
// - malicious detector "api key": qnwfsLyRSyfCwfpHaQP1UzDhrgpWvHjbYzjpRCMshjt417zWcrzyHUArs7r
valStr := string(val)
if likelyDuplicate(ctx, valStr, chunkSecrets) {
if likelyDuplicate(ctx, val, chunkSecrets) {
// This indicates that the same secret was found by multiple detectors.
// We should NOT VERIFY this chunk's data.
if e.reverificationTracking != nil {
e.reverificationTracking.increment()
}
chunk.reverifyWgDoneFn()

e.processResult(ctx, detectableChunk{
chunk: chunk.chunk,
detector: detector,
decoder: chunk.decoder,
wgDoneFn: wgDetect.Done,
}, res)

// Empty the dupes and detectors slice.
for k := range chunkSecrets {
delete(chunkSecrets, k)
}
for k := range detectorsWithResult {
delete(detectorsWithResult, k)
}
continue nextChunk
// Remove the detector and secret from the maps
delete(detectorsWithResult, detector)
}
chunkSecrets[valStr] = struct{}{}
chunkSecrets[string(val)] = struct{}{}
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestEngine_DuplicatSecrets(t *testing.T) {
WithConcurrency(1),
WithDecoders(decoders.DefaultDecoders()...),
WithDetectors(DefaultDetectors()...),
WithVerify(true),
WithVerify(false),
WithPrinter(new(discardPrinter)),
)
assert.Nil(t, err)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestReverifcationChunk(t *testing.T) {
WithConcurrency(1),
WithDecoders(decoders.DefaultDecoders()...),
WithDetectors(conf.Detectors...),
WithVerify(true),
WithVerify(false),
WithPrinter(new(discardPrinter)),
withReverificationTracking(),
)
Expand All @@ -246,7 +246,8 @@ func TestReverifcationChunk(t *testing.T) {

// Wait for all the chunks to be processed.
assert.Nil(t, e.Finish(ctx))
want := uint64(1)
// We want TWO secrets that match both the custom regexes.
want := uint64(2)
assert.Equal(t, want, e.GetMetrics().UnverifiedSecretsFound)

wantDupe := 1
Expand Down

0 comments on commit 76c2b20

Please sign in to comment.