Skip to content

Commit

Permalink
Fix RTT calculation error for container hooks
Browse files Browse the repository at this point in the history
Signed-off-by: Dushyant Behl <[email protected]>
  • Loading branch information
dushyantbehl committed Jul 27, 2023
1 parent bae7ff7 commit 9898807
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 94 deletions.
2 changes: 1 addition & 1 deletion bpf/dns_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static inline int trace_dns(struct sk_buff *skb) {
if ((bpf_ntohs(dns.flags) & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(&id, &dns_req, bpf_ntohs(dns.id), false);
if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) {
u64 ts = bpf_ktime_get_ns();
u64 ts = bpf_ktime_get_ns();
bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY);
}
id.direction = EGRESS;
Expand Down
21 changes: 7 additions & 14 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}

// Record the current time first.
u64 current_time = bpf_ktime_get_ns();
pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

flow_id id;
__builtin_memset(&id, 0, sizeof(id));

pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
pkt.id = &id;
pkt.current_ts = current_time;

void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
Expand All @@ -77,15 +74,11 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = current_time;
aggregate_flow->end_mono_time_ts = pkt.current_ts;
aggregate_flow->flags |= pkt.flags;

// Does not matter the gate. Will be zero if not enabled.
if (pkt.rtt > 0) {
/* Since RTT is calculated for few packets we need to check if it is non zero value then only we update
* the flow. If we remove this check a packet which fails to calculate RTT will override the previous valid
* RTT with 0.
*/
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
}

Expand All @@ -103,8 +96,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics new_flow = {
.packets = 1,
.bytes = skb->len,
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.flow_rtt = pkt.rtt
};
Expand Down
128 changes: 67 additions & 61 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
#include "utils.h"
#include "maps_definition.h"

static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, u8 reversed) {
static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, bool reverse) {
flow_id *id = pkt->id;
if (reversed) {
if (reverse) {
__builtin_memcpy(seq_id->src_ip, id->dst_ip, IP_MAX_LEN);
__builtin_memcpy(seq_id->dst_ip, id->src_ip, IP_MAX_LEN);
seq_id->src_port = id->dst_port;
Expand All @@ -23,42 +23,76 @@ static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt,
seq_id->src_port = id->src_port;
seq_id->dst_port = id->dst_port;
}
seq_id->transport_protocol = id->transport_protocol;
seq_id->seq_id = seq;
seq_id->if_index = id->if_index;
}

static __always_inline void reverse_flow_id(flow_id *o, flow_id *r) {
/* eth_protocol, transport_protocol and if_index remains same */
r->eth_protocol = o->eth_protocol;
r->transport_protocol = o->transport_protocol;
r->if_index = o->if_index;
/* reverse the direction */
r->direction = (o->direction == INGRESS) ? EGRESS : INGRESS;
/* src mac and dst mac gets reversed */
__builtin_memcpy(r->src_mac, o->dst_mac, ETH_ALEN);
__builtin_memcpy(r->dst_mac, o->src_mac, ETH_ALEN);
/* src ip and dst ip gets reversed */
__builtin_memcpy(r->src_ip, o->dst_ip, IP_MAX_LEN);
__builtin_memcpy(r->dst_ip, o->src_ip, IP_MAX_LEN);
/* src port and dst port gets reversed */
r->src_port = o->dst_port;
r->dst_port = o->src_port;
static __always_inline void reverse_flow_id_struct(flow_id *src, flow_id *dst) {
// Fields which remain same
dst->eth_protocol = src->eth_protocol;
dst->transport_protocol = src->transport_protocol;
dst->if_index = src->if_index;

// Fields which should be reversed
dst->direction = (src->direction == INGRESS) ? EGRESS : INGRESS;
__builtin_memcpy(dst->src_mac, src->dst_mac, ETH_ALEN);
__builtin_memcpy(dst->dst_mac, src->src_mac, ETH_ALEN);
__builtin_memcpy(dst->src_ip, src->dst_ip, IP_MAX_LEN);
__builtin_memcpy(dst->dst_ip, src->src_ip, IP_MAX_LEN);
dst->src_port = src->dst_port;
dst->dst_port = src->src_port;
/* ICMP type can be ignore for now. We only deal with TCP packets for now.*/
}

static __always_inline void update_reverse_flow_rtt(pkt_info *pkt) {
static __always_inline void update_reverse_flow_rtt(pkt_info *pkt, u32 seq) {
flow_id rev_flow_id;
__builtin_memset(&rev_flow_id, 0, sizeof(rev_flow_id));

reverse_flow_id(pkt->id, &rev_flow_id);
reverse_flow_id_struct(pkt->id, &rev_flow_id);

flow_metrics *reverse_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &rev_flow_id);
if (reverse_flow != NULL) {
reverse_flow->flow_rtt = pkt->rtt;
long ret = bpf_map_update_elem(&aggregated_flows, &rev_flow_id, reverse_flow, BPF_ANY);
if (pkt->rtt > reverse_flow->flow_rtt) {
reverse_flow->flow_rtt = pkt->rtt;
long ret = bpf_map_update_elem(&aggregated_flows, &rev_flow_id, reverse_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error updating rtt value in flow %d\n", ret);
}
}
}
}

static __always_inline void __calculate_tcp_rtt(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
// Stored sequence should be ack_seq - 1
u32 seq = bpf_ntohl(tcp->ack_seq) - 1;
// check reversed flow
fill_flow_seq_id(seq_id, pkt, seq, true);

u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id);
if (prev_ts != NULL) {
pkt->rtt = pkt->current_ts - *prev_ts;
// Delete the flow from flow sequence map so if it
// restarts we have a new RTT calculation.
long ret = bpf_map_delete_elem(&flow_sequences, seq_id);
if (trace_messages && ret != 0) {
bpf_printk("error updating rtt value in flow %d\n", ret);
bpf_printk("error evicting flow sequence: %d", ret);
}
// This is an ACK packet with valid sequence id so a SYN must
// have been sent. We can safely update the reverse flow RTT here.
update_reverse_flow_rtt(pkt, seq);
}
return;
}

static __always_inline void __store_tcp_ts(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) {
// store timestamp of syn packets.
u32 seq = bpf_ntohl(tcp->seq);
fill_flow_seq_id(seq_id, pkt, seq, false);
long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_NOEXIST);
if (trace_messages && ret != 0) {
bpf_printk("err saving flow sequence record %d", ret);
}
return;
}

static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) {
Expand All @@ -67,43 +101,16 @@ static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction,
return;
}

switch (direction) {
case EGRESS: {
if (IS_SYN_PACKET(pkt)) {
// Record the outgoing syn sequence number
u32 seq = bpf_ntohl(tcp->seq);
fill_flow_seq_id(seq_id, pkt, seq, 0);

long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("err saving flow sequence record %d", ret);
}
}
break;
/* We calculate RTT for both SYN/SYN+ACK and SYN+ACK/ACK and take the maximum of both.*/
if (tcp->syn && tcp->ack) { // SYN ACK Packet
__calculate_tcp_rtt(pkt, tcp, seq_id);
__store_tcp_ts(pkt, tcp, seq_id);
}
case INGRESS: {
if (IS_ACK_PACKET(pkt)) {
// Stored sequence should be ack_seq - 1
u32 seq = bpf_ntohl(tcp->ack_seq) - 1;
// check reversed flow
fill_flow_seq_id(seq_id, pkt, seq, 1);

u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id);
if (prev_ts != NULL) {
pkt->rtt = pkt->current_ts - *prev_ts;
// Delete the flow from flow sequence map so if it
// restarts we have a new RTT calculation.
long ret = bpf_map_delete_elem(&flow_sequences, seq_id);
if (trace_messages && ret != 0) {
bpf_printk("error evicting flow sequence: %d", ret);
}
// This is an ACK packet with valid sequence id so a SYN must
// have been sent. We can safely update the reverse flow RTT here.
update_reverse_flow_rtt(pkt);
}
}
break;
else if (tcp->ack) {
__calculate_tcp_rtt(pkt, tcp, seq_id);
}
else if (tcp->syn) {
__store_tcp_ts(pkt, tcp, seq_id);
}
}

Expand All @@ -121,5 +128,4 @@ static __always_inline void calculate_flow_rtt(pkt_info *pkt, u8 direction, void
}
}

#endif /* __RTT_TRACKER_H__ */

#endif /* __RTT_TRACKER_H__ */
8 changes: 4 additions & 4 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
#define FIN_ACK_FLAG 0x200
#define RST_ACK_FLAG 0x400

#define IS_SYN_PACKET(pkt) ((pkt->flags & SYN_FLAG) || (pkt->flags & SYN_ACK_FLAG))
#define IS_ACK_PACKET(pkt) ((pkt->flags & ACK_FLAG) || (pkt->flags & SYN_ACK_FLAG))

#if defined(__BYTE_ORDER__) && defined(__ORDER_LITTLE_ENDIAN__) && \
__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
#define bpf_ntohs(x) __builtin_bswap16(x)
Expand Down Expand Up @@ -124,14 +121,17 @@ typedef struct flow_id_t {
// Force emitting struct flow_id into the ELF.
const struct flow_id_t *unused2 __attribute__((unused));

// Standard 4 tuple and a sequence identifier.
// Standard 4 tuple, transport protocol and a sequence identifier.
// No need to emit this struct. It's used only in kernel space
typedef struct flow_seq_id_t {
u16 src_port;
u16 dst_port;
u8 src_ip[IP_MAX_LEN];
u8 dst_ip[IP_MAX_LEN];
u32 seq_id;
u8 transport_protocol;
u32 if_index; // OS interface index
u8 __padding;
} __attribute__((packed)) flow_seq_id;

// Flow record is a tuple containing both flow identifier and metrics. It is used to send
Expand Down
8 changes: 4 additions & 4 deletions examples/flowlogs-dump/server/flowlogs-dump-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
for records := range receivedRecords {
for _, record := range records.Entries {
if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -91,10 +91,10 @@ func main() {
record.GetDnsId(),
record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Microseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(),
)
} else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt %v\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
Expand All @@ -113,7 +113,7 @@ func main() {
record.GetDnsId(),
record.GetDnsFlags(),
record.DnsLatency.AsDuration().Milliseconds(),
record.TimeFlowRtt.AsDuration().Microseconds(),
record.TimeFlowRtt.AsDuration().Nanoseconds(),
)
}
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
13 changes: 8 additions & 5 deletions pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
2 changes: 2 additions & 0 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
if enableRtt == 0 {
// Cannot set the size of map to be 0 so set it to 1.
spec.Maps[flowSequencesMap].MaxEntries = uint32(1)
} else {
log.Infof("RTT calculations are enabled")
}

if err := spec.RewriteConstants(map[string]interface{}{
Expand Down

0 comments on commit 9898807

Please sign in to comment.