diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 2e129b439..326de2b39 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -79,6 +79,7 @@ type ebpfFlowFetcher interface { Register(iface ifaces.Interface) error LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics + DeleteMapsStaleEntries(timeOut time.Duration) ReadRingBuf() (ringbuf.Record, error) } @@ -163,7 +164,7 @@ func flowsAgent(cfg *Config, return iface } - mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout) + mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout) rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout) accounter := flow.NewAccounter( cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now) diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 426afe428..bf1c8241a 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -146,4 +146,7 @@ type Config struct { EnablePktDrops bool `env:"ENABLE_PKT_DROPS" envDefault:"false"` // EnableDNSTracking enable DNS tracking eBPF hook to track dns query/response flows EnableDNSTracking bool `env:"ENABLE_DNS_TRACKING" envDefault:"false"` + // StaleEntriesEvictTimeout specifies the maximum duration that stale entries are kept + // before being deleted, default is 5 seconds. + StaleEntriesEvictTimeout time.Duration `env:"STALE_ENTRIES_EVICT_TIMEOUT" envDefault:"5s"` } diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 0d80c2094..48c49bf3e 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -5,6 +5,7 @@ import ( "fmt" "io/fs" "strings" + "time" "github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces" "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" @@ -14,6 +15,7 @@ import ( "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" + "github.com/gavv/monotime" "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" @@ -432,5 +434,50 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics { *metricPtr = metric flow[id] = metricPtr } + return flow } + +// DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them +func (m *FlowFetcher) DeleteMapsStaleEntries(timeOut time.Duration) { + m.lookupAndDeleteDNSMap(timeOut) + m.lookupAndDeleteRTTMap(timeOut) +} + +// lookupAndDeleteDNSMap iterate over DNS queries map and delete any stale DNS requests +// entries which never get responses for. +func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) { + monotonicTimeNow := monotime.Now() + dnsMap := m.objects.DnsFlows + var dnsKey BpfDnsFlowId + var dnsVal uint64 + + iterator := dnsMap.Iterate() + for iterator.Next(&dnsKey, &dnsVal) { + if time.Duration(uint64(monotonicTimeNow)-dnsVal) >= timeOut { + if err := dnsMap.Delete(dnsKey); err != nil { + log.WithError(err).WithField("dnsKey", dnsKey). + Warnf("couldn't delete DNS record entry") + } + } + } +} + +// lookupAndDeleteRTTMap iterate over flows sequence map and delete any +// stale flows that we never get responses for. +func (m *FlowFetcher) lookupAndDeleteRTTMap(timeOut time.Duration) { + monotonicTimeNow := monotime.Now() + rttMap := m.objects.FlowSequences + var rttKey BpfFlowSeqId + var rttVal uint64 + + iterator := rttMap.Iterate() + for iterator.Next(&rttKey, &rttVal) { + if time.Duration(uint64(monotonicTimeNow)-rttVal) >= timeOut { + if err := rttMap.Delete(rttKey); err != nil { + log.WithError(err).WithField("rttKey", rttKey). + Warnf("couldn't delete RTT record entry") + } + } + } +} diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index 8a0118893..1cfd6d6df 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -18,8 +18,9 @@ var mtlog = logrus.WithField("component", "flow.MapTracer") // MapTracer accesses a mapped source of flows (the eBPF PerCPU HashMap), deserializes it into // a flow Record structure, and performs the accumulation of each perCPU-record into a single flow type MapTracer struct { - mapFetcher mapFetcher - evictionTimeout time.Duration + mapFetcher mapFetcher + evictionTimeout time.Duration + staleEntriesEvictTimeout time.Duration // manages the access to the eviction routines, avoiding two evictions happening at the same time evictionCond *sync.Cond lastEvictionNs uint64 @@ -27,14 +28,16 @@ type MapTracer struct { type mapFetcher interface { LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics + DeleteMapsStaleEntries(timeOut time.Duration) } -func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer { +func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration) *MapTracer { return &MapTracer{ - mapFetcher: fetcher, - evictionTimeout: evictionTimeout, - lastEvictionNs: uint64(monotime.Now()), - evictionCond: sync.NewCond(&sync.Mutex{}), + mapFetcher: fetcher, + evictionTimeout: evictionTimeout, + lastEvictionNs: uint64(monotime.Now()), + evictionCond: sync.NewCond(&sync.Mutex{}), + staleEntriesEvictTimeout: staleEntriesEvictTimeout, } } @@ -109,6 +112,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows uint64(monotonicTimeNow), )) } + m.mapFetcher.DeleteMapsStaleEntries(m.staleEntriesEvictTimeout) m.lastEvictionNs = laterFlowNs select { case <-ctx.Done(): diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go index 960de0c6e..522c63326 100644 --- a/pkg/test/tracer_fake.go +++ b/pkg/test/tracer_fake.go @@ -3,6 +3,7 @@ package test import ( "bytes" "encoding/binary" + "time" "github.com/cilium/ebpf/ringbuf" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" @@ -42,6 +43,9 @@ func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetric } } +func (m *TracerFake) DeleteMapsStaleEntries(_ time.Duration) { +} + func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) { return <-m.ringBuf, nil }