diff --git a/bpf/configs.h b/bpf/configs.h index cdd9d58dc..fcf55db55 100644 --- a/bpf/configs.h +++ b/bpf/configs.h @@ -8,5 +8,5 @@ volatile const u8 trace_messages = 0; volatile const u8 enable_rtt = 0; volatile const u16 pca_port = 0; volatile const u8 pca_proto = 0; - +volatile const u8 enable_dns_tracking = 0; #endif //__CONFIGS_H__ diff --git a/bpf/dns_tracker.h b/bpf/dns_tracker.h index 48506eba8..cc8a5b2c8 100644 --- a/bpf/dns_tracker.h +++ b/bpf/dns_tracker.h @@ -1,5 +1,5 @@ /* - light weight DNS tracker using trace points. + light weight DNS tracker. */ #ifndef __DNS_TRACKER_H__ @@ -19,27 +19,6 @@ struct dns_header { u16 arcount; }; -static inline void find_or_create_dns_flow(flow_id *id, struct dns_header *dns, int len, u16 flags, u64 latency) { - flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id); - u64 current_time = bpf_ktime_get_ns(); - // net_dev_queue trace point hook will run before TC hooks, so the flow shouldn't exists, if it does - // that indicates we have a stale DNS query/response or in the middle of TCP flow so we will do nothing - if (aggregate_flow == NULL) { - // there is no matching flows so lets create new one and dns info - flow_metrics new_flow; - __builtin_memset(&new_flow, 0, sizeof(new_flow)); - new_flow.start_mono_time_ts = current_time; - new_flow.end_mono_time_ts = current_time; - new_flow.packets = 1; - new_flow.bytes = len; - new_flow.flags = flags; - new_flow.dns_record.id = bpf_ntohs(dns->id); - new_flow.dns_record.flags = bpf_ntohs(dns->flags); - new_flow.dns_record.latency = latency; - bpf_map_update_elem(&aggregated_flows, id, &new_flow, BPF_ANY); - } -} - static inline void fill_dns_id (flow_id *id, dns_flow_id *dns_flow, u16 dns_id, bool reverse) { dns_flow->id = dns_id; dns_flow->protocol = id->transport_protocol; @@ -56,73 +35,66 @@ static inline void fill_dns_id (flow_id *id, dns_flow_id *dns_flow, u16 dns_id, } } -static inline int trace_dns(struct sk_buff *skb) { - flow_id id; - u8 protocol = 0; - u16 family = 0,flags = 0, len = 0; - - __builtin_memset(&id, 0, sizeof(id)); - - id.if_index = skb->skb_iif; - - // read L2 info - set_key_with_l2_info(skb, &id, &family); - - // read L3 info - set_key_with_l3_info(skb, family, &id, &protocol); - - switch (protocol) { - case IPPROTO_UDP: - len = set_key_with_udp_info(skb, &id, IPPROTO_UDP); - // make sure udp payload doesn't exceed max msg size - if (len - sizeof(struct udphdr) > UDP_MAXMSG) { - return -1; +static __always_inline u8 calc_dns_header_offset(pkt_info *pkt, void *data_end) { + u8 len = 0; + switch (pkt->id->transport_protocol) { + case IPPROTO_TCP: { + struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr; + if (!tcp || ((void *)tcp + sizeof(*tcp) > data_end)) { + return 0; + } + len = tcp->doff * sizeof(u32) + 1; // DNS over TCP has one additional byte for length + break; + } + case IPPROTO_UDP: { + struct udphdr *udp = (struct udphdr *) pkt->l4_hdr; + if (!udp || ((void *)udp + sizeof(*udp) > data_end)) { + return 0; + } + len = bpf_ntohs(udp->len); + // make sure udp payload doesn't exceed max msg size + if (len - sizeof(struct udphdr) > UDP_MAXMSG) { + return 0; + } + // set the length to udp hdr size as it will be used to locate dns header + len = sizeof(struct udphdr); + break; } - // set the length to udp hdr size as it will be used below to locate dns header - len = sizeof(struct udphdr); - break; - case IPPROTO_TCP: - len = set_key_with_tcp_info(skb, &id, IPPROTO_TCP, &flags); - break; - default: - return -1; } + return len; +} - // check for DNS packets - if (id.dst_port == DNS_PORT || id.src_port == DNS_PORT) { - struct dns_header dns; +static __always_inline void track_dns_packet(pkt_info *pkt, void *data_end) { + if (pkt->id->dst_port == DNS_PORT || pkt->id->src_port == DNS_PORT) { dns_flow_id dns_req; - u64 latency = 0; - bpf_probe_read(&dns, sizeof(dns), (struct dns_header *)(skb->head + skb->transport_header + len)); - if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */ - fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false); + + u8 len = calc_dns_header_offset(pkt, data_end); + if (!len) + return; + struct dns_header *dns = (struct dns_header *)(pkt->l4_hdr + len); + if (!dns || ((void *)dns + sizeof(*dns) > data_end)) { + return; + } + u16 dns_id = bpf_ntohs(dns->id); + u16 flags = bpf_ntohs(dns->flags); + u64 ts = bpf_ktime_get_ns(); + + if ((flags & DNS_QR_FLAG) == 0) { /* dns query */ + fill_dns_id(pkt->id, &dns_req, dns_id, false); if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) { - u64 ts = bpf_ktime_get_ns(); bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY); } - id.direction = EGRESS; } else { /* dns response */ - id.direction = INGRESS; - fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), true); + fill_dns_id(pkt->id, &dns_req, dns_id, true); u64 *value = bpf_map_lookup_elem(&dns_flows, &dns_req); if (value != NULL) { - latency = bpf_ktime_get_ns() - *value; + pkt->dns_latency = ts - *value; + pkt->dns_id = dns_id; + pkt->dns_flags = flags; bpf_map_delete_elem(&dns_flows, &dns_req); - find_or_create_dns_flow(&id, &dns, skb->len, flags, latency); } } // end of dns response } // end of dns port check - - return 0; -} - -SEC("tracepoint/net/net_dev_queue") -int trace_net_packets(struct trace_event_raw_net_dev_template *args) { - struct sk_buff skb; - - __builtin_memset(&skb, 0, sizeof(skb)); - bpf_probe_read(&skb, sizeof(struct sk_buff), args->skbaddr); - return trace_dns(&skb); } #endif // __DNS_TRACKER_H__ diff --git a/bpf/flows.c b/bpf/flows.c index e51e60a8a..0d21ea924 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -41,7 +41,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { // If sampling is defined, will only parse 1 out of "sampling" flows - if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { + if (!enable_dns_tracking && sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) { return TC_ACT_OK; } @@ -73,6 +73,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { calculate_flow_rtt(&pkt, direction, data_end); } + if (enable_dns_tracking) { + track_dns_packet(&pkt, data_end); + } // TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide // a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/ flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); @@ -91,7 +94,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { if (pkt.rtt > aggregate_flow->flow_rtt) { aggregate_flow->flow_rtt = pkt.rtt; } - + aggregate_flow->dns_record.id = pkt.dns_id; + aggregate_flow->dns_record.flags = pkt.dns_flags; + aggregate_flow->dns_record.latency = pkt.dns_latency; long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); if (trace_messages && ret != 0) { // usually error -16 (-EBUSY) is printed here. @@ -111,6 +116,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { .flags = pkt.flags, .flow_rtt = pkt.rtt, .dscp = pkt.dscp, + .dns_record.id = pkt.dns_id, + .dns_record.flags = pkt.dns_flags, + .dns_record.latency = pkt.dns_latency, }; // even if we know that the entry is new, another CPU might be concurrently inserting a flow diff --git a/bpf/types.h b/bpf/types.h index 6084b1490..f9790427b 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -159,6 +159,9 @@ typedef struct pkt_info_t { void *l4_hdr; // Stores the actual l4 header u64 rtt; // rtt calculated from the flow if possible. else zero u8 dscp; // IPv4/6 DSCP value + u16 dns_id; + u16 dns_flags; + u64 dns_latency; } pkt_info; // Structure for payload metadata diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go index e95e7b997..bb10a1591 100644 --- a/pkg/ebpf/bpf_bpfeb.go +++ b/pkg/ebpf/bpf_bpfeb.go @@ -128,7 +128,6 @@ type BpfProgramSpecs struct { IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"` } // BpfMapSpecs contains maps before they are loaded into the kernel. @@ -187,7 +186,6 @@ type BpfPrograms struct { IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.Program `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"` } func (p *BpfPrograms) Close() error { @@ -197,7 +195,6 @@ func (p *BpfPrograms) Close() error { p.IngressFlowParse, p.IngressPcaParse, p.KfreeSkb, - p.TraceNetPackets, ) } diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 670b6687a..56ef79795 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.go b/pkg/ebpf/bpf_bpfel.go index 4a84d59b4..8556d19a5 100644 --- a/pkg/ebpf/bpf_bpfel.go +++ b/pkg/ebpf/bpf_bpfel.go @@ -128,7 +128,6 @@ type BpfProgramSpecs struct { IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.ProgramSpec `ebpf:"trace_net_packets"` } // BpfMapSpecs contains maps before they are loaded into the kernel. @@ -187,7 +186,6 @@ type BpfPrograms struct { IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.Program `ebpf:"kfree_skb"` - TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"` } func (p *BpfPrograms) Close() error { @@ -197,7 +195,6 @@ func (p *BpfPrograms) Close() error { p.IngressFlowParse, p.IngressPcaParse, p.KfreeSkb, - p.TraceNetPackets, ) } diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index cb818ed55..1c1fef030 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 57631b29b..cf213ce6e 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -35,14 +35,14 @@ const ( flowSequencesMap = "flow_sequences" dnsLatencyMap = "dns_flows" // constants defined in flows.c as "volatile const" - constSampling = "sampling" - constTraceMessages = "trace_messages" - constEnableRtt = "enable_rtt" - pktDropHook = "kfree_skb" - dnsTraceHook = "net_dev_queue" - constPcaPort = "pca_port" - constPcaProto = "pca_proto" - pcaRecordsMap = "packet_record" + constSampling = "sampling" + constTraceMessages = "trace_messages" + constEnableRtt = "enable_rtt" + constEnableDNSTracking = "enable_dns_tracking" + pktDropHook = "kfree_skb" + constPcaPort = "pca_port" + constPcaProto = "pca_proto" + pcaRecordsMap = "packet_record" ) var log = logrus.WithField("component", "ebpf.FlowFetcher") @@ -53,16 +53,15 @@ var plog = logrus.WithField("component", "ebpf.PacketFetcher") // and to flows that are forwarded by the kernel via ringbuffer because could not be aggregated // in the map type FlowFetcher struct { - objects *BpfObjects - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter - ringbufReader *ringbuf.Reader - cacheMaxSize int - enableIngress bool - enableEgress bool - pktDropsTracePoint link.Link - dnsTrackerTracePoint link.Link + objects *BpfObjects + qdiscs map[ifaces.Interface]*netlink.GenericQdisc + egressFilters map[ifaces.Interface]*netlink.BpfFilter + ingressFilters map[ifaces.Interface]*netlink.BpfFilter + ringbufReader *ringbuf.Reader + cacheMaxSize int + enableIngress bool + enableEgress bool + pktDropsTracePoint link.Link } type FlowFetcherConfig struct { @@ -113,14 +112,20 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { log.Debugf("RTT calculations are enabled") } - if !cfg.DNSTracker { + enableDNSTracking := 0 + if cfg.DNSTracker { + enableDNSTracking = 1 + } + + if enableDNSTracking == 0 { spec.Maps[dnsLatencyMap].MaxEntries = 1 } if err := spec.RewriteConstants(map[string]interface{}{ - constSampling: uint32(cfg.Sampling), - constTraceMessages: uint8(traceMsgs), - constEnableRtt: uint8(enableRtt), + constSampling: uint32(cfg.Sampling), + constTraceMessages: uint8(traceMsgs), + constEnableRtt: uint8(enableRtt), + constEnableDNSTracking: uint8(enableDNSTracking), }); err != nil { return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) } @@ -149,30 +154,21 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { } } - var dnsTrackerLink link.Link - if cfg.DNSTracker { - dnsTrackerLink, err = link.Tracepoint("net", dnsTraceHook, objects.TraceNetPackets, nil) - if err != nil { - return nil, fmt.Errorf("failed to attach the BPF program to trace_net_packets: %w", err) - } - } - // read events from igress+egress ringbuffer flows, err := ringbuf.NewReader(objects.DirectFlows) if err != nil { return nil, fmt.Errorf("accessing to ringbuffer: %w", err) } return &FlowFetcher{ - objects: &objects, - ringbufReader: flows, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - cacheMaxSize: cfg.CacheMaxSize, - enableIngress: cfg.EnableIngress, - enableEgress: cfg.EnableEgress, - pktDropsTracePoint: pktDropsLink, - dnsTrackerTracePoint: dnsTrackerLink, + objects: &objects, + ringbufReader: flows, + egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, + cacheMaxSize: cfg.CacheMaxSize, + enableIngress: cfg.EnableIngress, + enableEgress: cfg.EnableEgress, + pktDropsTracePoint: pktDropsLink, }, nil } @@ -301,11 +297,6 @@ func (m *FlowFetcher) Close() error { errs = append(errs, err) } } - if m.dnsTrackerTracePoint != nil { - if err := m.dnsTrackerTracePoint.Close(); err != nil { - errs = append(errs, err) - } - } // m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer // from another goroutine to avoid the system not being able to exit if there // isn't traffic in a given interface @@ -491,7 +482,6 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf type NewBpfPrograms struct { EgressFlowParse *ebpf.Program `ebpf:"egress_flow_parse"` IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` - TraceNetPackets *ebpf.Program `ebpf:"trace_net_packets"` } type NewBpfObjects struct { NewBpfPrograms @@ -517,7 +507,6 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf objects.FlowSequences = newObjects.FlowSequences objects.EgressFlowParse = newObjects.EgressFlowParse objects.IngressFlowParse = newObjects.IngressFlowParse - objects.TraceNetPackets = newObjects.TraceNetPackets objects.KfreeSkb = nil } else { if err := spec.LoadAndAssign(&objects, nil); err != nil { @@ -574,13 +563,12 @@ func NewPacketFetcher( objects.DirectFlows = nil objects.AggregatedFlows = nil objects.FlowSequences = nil - objects.TraceNetPackets = nil delete(spec.Programs, aggregatedFlowsMap) delete(spec.Programs, flowSequencesMap) delete(spec.Programs, constSampling) delete(spec.Programs, constTraceMessages) delete(spec.Programs, constEnableRtt) - delete(spec.Programs, dnsTraceHook) + delete(spec.Programs, constEnableDNSTracking) pcaPort := 0 pcaProto := 0 diff --git a/pkg/flow/deduper.go b/pkg/flow/deduper.go index cb1cca38f..9021db995 100644 --- a/pkg/flow/deduper.go +++ b/pkg/flow/deduper.go @@ -26,7 +26,6 @@ type deduperCache struct { type entry struct { key *ebpf.BpfFlowId - dnsRecord *ebpf.BpfDnsRecordT ifIndex uint32 expiryTime time.Time } @@ -70,17 +69,6 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) { fEntry := ele.Value.(*entry) fEntry.expiryTime = timeNow().Add(c.expire) c.entries.MoveToFront(ele) - // The input flow is duplicate if its interface is different to the interface - // of the non-duplicate flow that was first registered in the cache - // except if the new flow has DNS enrichment in this case will enrich the flow in the cache - // with DNS info and mark the current flow as duplicate - if r.Metrics.DnsRecord.Latency != 0 && fEntry.dnsRecord.Latency == 0 { - // copy DNS record to the cached entry and mark it as duplicate - fEntry.dnsRecord.Flags = r.Metrics.DnsRecord.Flags - fEntry.dnsRecord.Id = r.Metrics.DnsRecord.Id - fEntry.dnsRecord.Latency = r.Metrics.DnsRecord.Latency - // fall through to do interface check - } if fEntry.ifIndex != r.Id.IfIndex { if justMark { r.Duplicate = true @@ -95,7 +83,6 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) { // so we register it for that concrete interface e := entry{ key: &rk, - dnsRecord: &r.Metrics.DnsRecord, ifIndex: r.Id.IfIndex, expiryTime: timeNow().Add(c.expire), } diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go index 7d6cf6cb2..7dd859621 100644 --- a/pkg/flow/deduper_test.go +++ b/pkg/flow/deduper_test.go @@ -36,20 +36,6 @@ var ( }, Metrics: ebpf.BpfFlowMetrics{ Packets: 2, Bytes: 456, Flags: 1, }}, Interface: "123456789"} - // another flow from 2 different interfaces and directions with DNS latency set on the latest - threeIf1 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{ - EthProtocol: 1, Direction: 1, SrcPort: 433, DstPort: 456, - DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}, IfIndex: 1, - }, Metrics: ebpf.BpfFlowMetrics{ - Packets: 2, Bytes: 456, Flags: 1, - }}, Interface: "eth0"} - threeIf2 = &Record{RawRecord: RawRecord{Id: ebpf.BpfFlowId{ - EthProtocol: 1, Direction: 0, SrcPort: 433, DstPort: 456, - DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}, IfIndex: 2, - }, Metrics: ebpf.BpfFlowMetrics{ - Packets: 2, Bytes: 456, Flags: 1, - DnsRecord: ebpf.BpfDnsRecordT{Id: 1, Flags: 100, Latency: 1000}, - }}, Interface: "123456789", DNSLatency: time.Millisecond} ) func TestDedupe(t *testing.T) { @@ -59,17 +45,15 @@ func TestDedupe(t *testing.T) { go Dedupe(time.Minute, false)(input, output) input <- []*Record{ - oneIf2, // record 1 at interface 2: should be accepted - twoIf1, // record 2 at interface 1: should be accepted - oneIf1, // record 1 duplicate at interface 1: should NOT be accepted - oneIf1, // (same record key, different interface) - twoIf2, // record 2 duplicate at interface 2: should NOT be accepted - oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface) - threeIf1, // record 1 has no DNS so it get enriched with DNS record from the following record - threeIf2, // record 2 is duplicate of record1 and have DNS info , should not be accepted + oneIf2, // record 1 at interface 2: should be accepted + twoIf1, // record 2 at interface 1: should be accepted + oneIf1, // record 1 duplicate at interface 1: should NOT be accepted + oneIf1, // (same record key, different interface) + twoIf2, // record 2 duplicate at interface 2: should NOT be accepted + oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface) } deduped := receiveTimeout(t, output) - assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2, threeIf1}, deduped) + assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2}, deduped) // should still accept records with same key, same interface, // and discard these with same key, different interface @@ -77,10 +61,6 @@ func TestDedupe(t *testing.T) { deduped = receiveTimeout(t, output) assert.Equal(t, []*Record{oneIf2}, deduped) - // make sure flow with no DNS get enriched with the DNS record - assert.Equal(t, threeIf1.Metrics.DnsRecord.Id, threeIf2.Metrics.DnsRecord.Id) - assert.Equal(t, threeIf1.Metrics.DnsRecord.Flags, threeIf2.Metrics.DnsRecord.Flags) - assert.Equal(t, threeIf1.Metrics.DnsRecord.Latency, threeIf2.Metrics.DnsRecord.Latency) } func TestDedupe_EvictFlows(t *testing.T) {