Skip to content

Commit

Permalink
Revert "change aggregation flow map to hashmap instead perCPU hashmap (
Browse files Browse the repository at this point in the history
…#118)"

This reverts commit b6e2b87.
  • Loading branch information
msherif1234 committed Aug 31, 2023
1 parent 6d9d2e7 commit e6a5200
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 42 deletions.
2 changes: 1 addition & 1 deletion bpf/maps_definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct {

// Key: the flow identifier. Value: the flow metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
style E fill:#990
E --> |"polls<br/>HashMap"| M(flow.MapTracer)
E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
ReadRingBuf() (ringbuf.Record, error)
}
Expand Down
57 changes: 48 additions & 9 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ var (
DstPort: 456,
IfIndex: 3,
}
key1Dupe = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
IfIndex: 4,
}

key2 = ebpf.BpfFlowId{
SrcPort: 333,
Expand Down Expand Up @@ -76,11 +81,21 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
Expand All @@ -104,12 +119,23 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "foo", f.Interface)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "bar", f.Interface)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported)
Expand All @@ -132,11 +158,21 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows)
Expand Down Expand Up @@ -183,10 +219,13 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
})

now := uint64(monotime.Now())
key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}
key1Metrics := []*ebpf.BpfFlowMetrics{
{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
}

ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{
key1: &key1Metrics,
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics{
key1: key1Metrics,
})
return export
}
Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
18 changes: 11 additions & 7 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,24 +371,28 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics {
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]*BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
var flow = make(map[BpfFlowId]*BpfFlowMetrics, m.cacheMaxSize)
var flow = make(map[BpfFlowId][]*BpfFlowMetrics, m.cacheMaxSize)
var id BpfFlowId
var metric BpfFlowMetrics
var metrics []BpfFlowMetrics

// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metric) {
for iterator.Next(&id, &metrics) {
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry")
}
metricPtr := new(BpfFlowMetrics)
*metricPtr = metric
flow[id] = metricPtr
metricsPtr := make([]*BpfFlowMetrics, m.cacheMaxSize)
for _, m := range metrics {
metricPtr := new(BpfFlowMetrics)
*metricPtr = m
metricsPtr = append(metricsPtr, metricPtr)
}
flow[id] = append(flow[id], metricsPtr...)
}

return flow
Expand Down
4 changes: 3 additions & 1 deletion pkg/flow/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
alog.Debug("exiting account routine")
return
}
if _, ok := c.entries[record.Id]; !ok {
if stored, ok := c.entries[record.Id]; ok {
Accumulate(stored, &record.Metrics)
} else {
if len(c.entries) >= c.maxEntries {
evictingEntries := c.entries
c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}
Expand Down
20 changes: 10 additions & 10 deletions pkg/flow/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1,
Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1,
},
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
},
k2: {
RawRecord: RawRecord{
Expand Down Expand Up @@ -178,31 +178,31 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 10,
Packets: 1,
Bytes: 30,
Packets: 3,
StartMonoTimeTs: 123,
EndMonoTimeTs: 123,
EndMonoTimeTs: 789,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
assert.Equal(t, Record{
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 10,
Packets: 1,
Bytes: 20,
Packets: 2,
StartMonoTimeTs: 1123,
EndMonoTimeTs: 1123,
EndMonoTimeTs: 1456,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
}, *records[0])

// no more flows are evicted
Expand Down
17 changes: 17 additions & 0 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ func NewRecord(
return &record
}

func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
// time == 0 if the value has not been yet set
if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs {
r.StartMonoTimeTs = src.StartMonoTimeTs
}
if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs {
r.EndMonoTimeTs = src.EndMonoTimeTs
}
r.Bytes += src.Bytes
r.Packets += src.Packets
r.Flags |= src.Flags
// Accumulate Drop statistics
r.PktDrops.Bytes += src.PktDrops.Bytes
r.PktDrops.Packets += src.PktDrops.Packets
r.PktDrops.LatestFlags |= src.PktDrops.LatestFlags
}

// IP returns the net.IP equivalent object
func IP(ia IPAddr) net.IP {
return ia[:]
Expand Down
24 changes: 22 additions & 2 deletions pkg/flow/tracer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type MapTracer struct {
}

type mapFetcher interface {
LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
var forwardingFlows []*Record
laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
aggregatedMetrics := flowMetrics
aggregatedMetrics := m.aggregate(flowMetrics)
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
if aggregatedMetrics.EndMonoTimeTs == 0 {
continue
Expand Down Expand Up @@ -126,3 +126,23 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
}

func (m *MapTracer) aggregate(metrics []*ebpf.BpfFlowMetrics) *ebpf.BpfFlowMetrics {
if len(metrics) == 0 {
mtlog.Warn("invoked aggregate with no values")
return &ebpf.BpfFlowMetrics{}
}
aggr := &ebpf.BpfFlowMetrics{}
for _, mt := range metrics {
if mt != nil {
// eBPF hashmap values are not zeroed when the entry is removed. That causes that we
// might receive entries from previous collect-eviction timeslots.
// We need to check the flow time and discard old flows.
if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
continue
}
Accumulate(aggr, mt)
}
}
return aggr
}
21 changes: 16 additions & 5 deletions pkg/flow/tracer_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,36 @@ import (

func TestPacketAggregation(t *testing.T) {
type testCase struct {
input ebpf.BpfFlowMetrics
input []*ebpf.BpfFlowMetrics
expected ebpf.BpfFlowMetrics
}
tcs := []testCase{{
input: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1},
input: []*ebpf.BpfFlowMetrics{
{Packets: 0, Bytes: 0, StartMonoTimeTs: 0, EndMonoTimeTs: 0, Flags: 1},
{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
},
expected: ebpf.BpfFlowMetrics{
Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1,
},
}, {
input: ebpf.BpfFlowMetrics{Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1},
input: []*ebpf.BpfFlowMetrics{
{Packets: 0x3, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1},
{Packets: 0x2, Bytes: 0x8c, StartMonoTimeTs: 0x17f3e9633a7f, EndMonoTimeTs: 0x17f3e96f164e, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
{Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
},
expected: ebpf.BpfFlowMetrics{
Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1,
Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1,
},
}}
ft := MapTracer{}
for i, tc := range tcs {
t.Run(fmt.Sprint(i), func(t *testing.T) {
assert.Equal(t,
tc.expected,
tc.input)
*ft.aggregate(tc.input))
})
}
}
10 changes: 5 additions & 5 deletions pkg/test/tracer_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
// TracerFake fakes the kernel-side eBPF map structures for testing
type TracerFake struct {
interfaces map[ifaces.Interface]struct{}
mapLookups chan map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
mapLookups chan map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics
ringBuf chan ringbuf.Record
}

func NewTracerFake() *TracerFake {
return &TracerFake{
interfaces: map[ifaces.Interface]struct{}{},
mapLookups: make(chan map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, 100),
mapLookups: make(chan map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics, 100),
ringBuf: make(chan ringbuf.Record, 100),
}
}
Expand All @@ -34,12 +34,12 @@ func (m *TracerFake) Register(iface ifaces.Interface) error {
return nil
}

func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics {
func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics {
select {
case r := <-m.mapLookups:
return r
default:
return map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}
return map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics{}
}
}

Expand All @@ -50,7 +50,7 @@ func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) {
return <-m.ringBuf, nil
}

func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics) {
func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId][]*ebpf.BpfFlowMetrics) {
m.mapLookups <- results
}

Expand Down

0 comments on commit e6a5200

Please sign in to comment.