diff --git a/pkg/flow/deduper.go b/pkg/flow/deduper.go index 7d43bbc5c..2c5f18552 100644 --- a/pkg/flow/deduper.go +++ b/pkg/flow/deduper.go @@ -26,6 +26,7 @@ type deduperCache struct { type entry struct { key *ebpf.BpfFlowId + dnsRecord *ebpf.BpfDnsRecordT ifIndex uint32 expiryTime time.Time } @@ -46,14 +47,7 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o cache.removeExpired() fwd := make([]*Record, 0, len(records)) for _, record := range records { - if cache.isDupe((*ebpf.BpfFlowId)(&record.Id)) { - if justMark { - record.Duplicate = true - } else { - continue - } - } - fwd = append(fwd, record) + cache.checkDupe(record, justMark, &fwd) } if len(fwd) > 0 { out <- fwd @@ -62,10 +56,9 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o } } -// isDupe returns whether the passed record has been already checked for duplicate for -// another interface -func (c *deduperCache) isDupe(key *ebpf.BpfFlowId) bool { - rk := *key +// checkDupe check current record if its already available nad if not added to fwd records list +func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) { + rk := r.Id // zeroes fields from key that should be ignored from the flow comparison rk.IfIndex = 0 rk.SrcMac = [MacLen]uint8{0, 0, 0, 0, 0, 0} @@ -79,17 +72,39 @@ func (c *deduperCache) isDupe(key *ebpf.BpfFlowId) bool { c.entries.MoveToFront(ele) // The input flow is duplicate if its interface is different to the interface // of the non-duplicate flow that was first registered in the cache - return fEntry.ifIndex != key.IfIndex + // except if the new flow has DNS enrichment in this case we will + // forward both flows + if r.DNSLatency != 0 && fEntry.dnsRecord.Latency == 0 && fEntry.ifIndex != r.Id.IfIndex { + // copy DNS record to the cached entry and mark it as duplicate + fEntry.dnsRecord.Flags = r.Metrics.DnsRecord.Flags + fEntry.dnsRecord.Id = r.Metrics.DnsRecord.Id + fEntry.dnsRecord.Latency = r.Metrics.DnsRecord.Latency + if justMark { + r.Duplicate = true + *fwd = append(*fwd, r) + } + return + } + if fEntry.ifIndex != r.Id.IfIndex { + if justMark { + r.Duplicate = true + *fwd = append(*fwd, r) + } + return + } + *fwd = append(*fwd, r) + return } // The flow has not been accounted previously (or was forgotten after expiration) // so we register it for that concrete interface e := entry{ key: &rk, - ifIndex: key.IfIndex, + dnsRecord: &r.Metrics.DnsRecord, + ifIndex: r.Id.IfIndex, expiryTime: timeNow().Add(c.expire), } c.ifaces[rk] = c.entries.PushFront(&e) - return false + *fwd = append(*fwd, r) } func (c *deduperCache) removeExpired() { diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go index b5bb0e929..7d6cf6cb2 100644 --- a/pkg/flow/deduper_test.go +++ b/pkg/flow/deduper_test.go @@ -23,7 +23,7 @@ var ( }, Metrics: ebpf.BpfFlowMetrics{ Packets: 2, Bytes: 456, Flags: 1, }}, Interface: "123456789"} - // another fow from 2 different interfaces and directions + // another flow from 2 different interfaces and directions twoIf1 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{ EthProtocol: 1, Direction: 1, SrcPort: 333, DstPort: 456, DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}, IfIndex: 1, @@ -36,6 +36,20 @@ var ( }, Metrics: ebpf.BpfFlowMetrics{ Packets: 2, Bytes: 456, Flags: 1, }}, Interface: "123456789"} + // another flow from 2 different interfaces and directions with DNS latency set on the latest + threeIf1 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{ + EthProtocol: 1, Direction: 1, SrcPort: 433, DstPort: 456, + DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}, IfIndex: 1, + }, Metrics: ebpf.BpfFlowMetrics{ + Packets: 2, Bytes: 456, Flags: 1, + }}, Interface: "eth0"} + threeIf2 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{ + EthProtocol: 1, Direction: 0, SrcPort: 433, DstPort: 456, + DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}, IfIndex: 2, + }, Metrics: ebpf.BpfFlowMetrics{ + Packets: 2, Bytes: 456, Flags: 1, + DnsRecord: ebpf.BpfDnsRecordT{Id: 1, Flags: 100, Latency: 1000}, + }}, Interface: "123456789", DNSLatency: time.Millisecond} ) func TestDedupe(t *testing.T) { @@ -45,21 +59,28 @@ func TestDedupe(t *testing.T) { go Dedupe(time.Minute, false)(input, output) input <- []*Record{ - oneIf2, // record 1 at interface 2: should be accepted - twoIf1, // record 2 at interface 1: should be accepted - oneIf1, // record 1 duplicate at interface 1: should NOT be accepted - oneIf1, // (same record key, different interface) - twoIf2, // record 2 duplicate at interface 2: should NOT be accepted - oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface) + oneIf2, // record 1 at interface 2: should be accepted + twoIf1, // record 2 at interface 1: should be accepted + oneIf1, // record 1 duplicate at interface 1: should NOT be accepted + oneIf1, // (same record key, different interface) + twoIf2, // record 2 duplicate at interface 2: should NOT be accepted + oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface) + threeIf1, // record 1 has no DNS so it get enriched with DNS record from the following record + threeIf2, // record 2 is duplicate of record1 and have DNS info , should not be accepted } deduped := receiveTimeout(t, output) - assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2}, deduped) + assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2, threeIf1}, deduped) // should still accept records with same key, same interface, // and discard these with same key, different interface input <- []*Record{oneIf1, oneIf2} deduped = receiveTimeout(t, output) assert.Equal(t, []*Record{oneIf2}, deduped) + + // make sure flow with no DNS get enriched with the DNS record + assert.Equal(t, threeIf1.Metrics.DnsRecord.Id, threeIf2.Metrics.DnsRecord.Id) + assert.Equal(t, threeIf1.Metrics.DnsRecord.Flags, threeIf2.Metrics.DnsRecord.Flags) + assert.Equal(t, threeIf1.Metrics.DnsRecord.Latency, threeIf2.Metrics.DnsRecord.Latency) } func TestDedupe_EvictFlows(t *testing.T) {