Skip to content

Commit

Permalink
Add support for RX threads.
Browse files Browse the repository at this point in the history
This should support NFQ with in autofp and workers mode. It also
adds very basic support for the ips packet metrics as

  suricata_ip_blocked_packets_total

Closes #17
  • Loading branch information
awelzel committed Aug 5, 2024
1 parent 73713b0 commit f293546
Show file tree
Hide file tree
Showing 5 changed files with 13,363 additions and 30 deletions.
108 changes: 78 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ var (
newPerThreadCounterMetric("flow_bypassed", "local_capture_bytes_total", "", "local_capture_bytes"),
}

perThreadIpsMetrics = []metricInfo{
newPerThreadCounterMetric("ips", "accepted_packets_total", "", "accepted"),
newPerThreadCounterMetric("ips", "blocked_packets_total", "", "blocked"),
newPerThreadCounterMetric("ips", "rejected_packets_total", "", "rejected"),
newPerThreadCounterMetric("ips", "replaced_packets_total", "", "replaced"),
}

perThreadTcpMetricsReceive = []metricInfo{
newPerThreadCounterMetric("tcp", "syn_packets_total", "", "syn"),
newPerThreadCounterMetric("tcp", "synack_packets_total", "", "synack"),
newPerThreadCounterMetric("tcp", "rst_packets_total", "", "rst"),
}

// From .thread.tcp
perThreadTcpMetrics = []metricInfo{
// New in 7.0.0
Expand All @@ -220,9 +233,6 @@ var (
newPerThreadCounterMetric("tcp", "invalid_checksum_packets_total", "", "invalid_checksum"),
// Removed in 7.0.0: 0360cb654293c333e3be70204705fa7ec328512e
newPerThreadCounterMetric("tcp", "no_flow_total", "", "no_flow").Optional(),
newPerThreadCounterMetric("tcp", "syn_packets_total", "", "syn"),
newPerThreadCounterMetric("tcp", "synack_packets_total", "", "synack"),
newPerThreadCounterMetric("tcp", "rst_packets_total", "", "rst"),
newPerThreadCounterMetric("tcp", "midstream_pickups_total", "", "midstream_pickups"),
newPerThreadCounterMetric("tcp", "pkt_on_wrong_thread_total", "", "pkt_on_wrong_thread"),
newPerThreadCounterMetric("tcp", "segment_memcap_drop_total", "", "segment_memcap_drop"),
Expand Down Expand Up @@ -533,7 +543,12 @@ func handleNapatechMetrics(ch chan<- prometheus.Metric, message map[string]any)
}
}

func handleWorkerThread(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
// Handle the shared RX and worker thread portions.
//
// Depending on autofp or workers runmode, the "capture" entry
// is in the RX threads.
func handleReceiveCommon(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {

if capture, ok := thread["capture"].(map[string]any); ok {
for _, m := range perThreadCaptureMetrics {
if cm := newConstMetric(m, capture, threadName); cm != nil {
Expand Down Expand Up @@ -564,58 +579,88 @@ func handleWorkerThread(ch chan<- prometheus.Metric, threadName string, thread m
}
}

tcp := thread["tcp"].(map[string]any)
for _, m := range perThreadTcpMetrics {
if cm := newConstMetric(m, tcp, threadName); cm != nil {
ch <- cm
}
}

flow := thread["flow"].(map[string]any)
for _, m := range perThreadFlowMetrics {
if cm := newConstMetric(m, flow, threadName); cm != nil {
ch <- cm
}
}

wrk := flow["wrk"].(map[string]any)
for _, m := range perThreadFlowWrkMetrics {
if cm := newConstMetric(m, wrk, threadName); cm != nil {
// Convert all decoder entries that look like numbers
// as perThreadDecoder metric with a "kind" label.
decoder := thread["decoder"].(map[string]any)
for _, m := range perThreadDecoderMetrics {
if cm := newConstMetric(m, decoder, threadName); cm != nil {
ch <- cm
}
}

// Defrag stats from worker and receive threads.
defrag := thread["defrag"].(map[string]any)
defragIpv4 := defrag["ipv4"].(map[string]any)
defragIpv6 := defrag["ipv6"].(map[string]any)

for _, m := range perThreadDefragIpv4Metrics {
if cm := newConstMetric(m, defragIpv4, threadName); cm != nil {
ch <- cm
}
}

for _, m := range perThreadDefragIpv6Metrics {
if cm := newConstMetric(m, defragIpv6, threadName); cm != nil {
ch <- cm
}
}

for _, m := range perThreadDefragMetrics {
if cm := newConstMetric(m, defrag, threadName); cm != nil {
ch <- cm
}
}

detect := thread["detect"].(map[string]any)
for _, m := range perThreadDetectMetrics {
if cm := newConstMetric(m, detect, threadName); cm != nil {
tcp := thread["tcp"].(map[string]any)
for _, m := range perThreadTcpMetricsReceive {
if cm := newConstMetric(m, tcp, threadName); cm != nil {
ch <- cm
}
}

// Convert all decoder entries that look like numbers
// as perThreadDecoder metric with a "kind" label.
decoder := thread["decoder"].(map[string]any)
for _, m := range perThreadDecoderMetrics {
if cm := newConstMetric(m, decoder, threadName); cm != nil {
// Extract basic IPS metrics if they exist.
if ips, ok := thread["ips"].(map[string]any); ok {
for _, m := range perThreadIpsMetrics {
if cm := newConstMetric(m, ips, threadName); cm != nil {
ch <- cm
}
}
}

}

// Receive threads have the same layout as worker threads.
func handleReceiveThread(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
handleReceiveCommon(ch, threadName, thread)
}

func handleWorkerThread(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
handleReceiveCommon(ch, threadName, thread)

tcp := thread["tcp"].(map[string]any)
for _, m := range perThreadTcpMetrics {
if cm := newConstMetric(m, tcp, threadName); cm != nil {
ch <- cm
}
}

flow := thread["flow"].(map[string]any)
for _, m := range perThreadFlowMetrics {
if cm := newConstMetric(m, flow, threadName); cm != nil {
ch <- cm
}
}

wrk := flow["wrk"].(map[string]any)
for _, m := range perThreadFlowWrkMetrics {
if cm := newConstMetric(m, wrk, threadName); cm != nil {
ch <- cm
}
}

detect := thread["detect"].(map[string]any)
for _, m := range perThreadDetectMetrics {
if cm := newConstMetric(m, detect, threadName); cm != nil {
ch <- cm
}
}
Expand Down Expand Up @@ -753,8 +798,10 @@ func produceMetrics(ch chan<- prometheus.Metric, counters map[string]any) {
// Produce per thread metrics
for threadName, thread_ := range message["threads"].(map[string]any) {
if thread, ok := thread_.(map[string]any); ok {
if strings.HasPrefix(threadName, "W#") {
if strings.HasPrefix(threadName, "W#") || strings.HasPrefix(threadName, "W-") {
handleWorkerThread(ch, threadName, thread)
} else if strings.HasPrefix(threadName, "RX") {
handleReceiveThread(ch, threadName, thread)
} else if strings.HasPrefix(threadName, "FM") {
handleFlowManagerThread(ch, threadName, thread)
} else if strings.HasPrefix(threadName, "FR") {
Expand Down Expand Up @@ -829,6 +876,7 @@ func main() {
r.MustRegister(&suricataCollector{NewSuricataClient(*socketPath), sync.Mutex{}})

http.Handle(*path, promhttp.HandlerFor(r, promhttp.HandlerOpts{}))

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(`<html>
<head><title>Suricata Exporter</title></head>
Expand Down
92 changes: 92 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,95 @@ func TestDump701(t *testing.T) {
t.Errorf("Unexpected number of suricata_flow_mgr_flows_checked_total: %v", len(tms))
}
}

func TestDump706NFQAutoFP(t *testing.T) {
data, err := os.ReadFile("./testdata/dump-counters-7.0.6-nfq-autofp.json")
if err != nil {
log.Panicf("Unable to open file: %s", err)
}

var counters map[string]any
err = json.Unmarshal(data, &counters)
if err != nil {
t.Error(err)
}

metrics := produceMetricsHelper(counters)
_ = aggregateMetrics(metrics)

// fmt.Printf("%v", agged)
}

func TestDump706NFQWorkers(t *testing.T) {
data, err := os.ReadFile("./testdata/dump-counters-7.0.6-nfq-workers.json")
if err != nil {
log.Panicf("Unable to open file: %s", err)
}

var counters map[string]any
err = json.Unmarshal(data, &counters)
if err != nil {
t.Error(err)
}

metrics := produceMetricsHelper(counters)
_ = aggregateMetrics(metrics)

// fmt.Printf("%v", _)
}

func TestDump706AFPacketAutoFP(t *testing.T) {
data, err := os.ReadFile("./testdata/dump-counters-7.0.6-afpacket-autofp.json")
if err != nil {
log.Panicf("Unable to open file: %s", err)
}

var counters map[string]any
err = json.Unmarshal(data, &counters)
if err != nil {
t.Error(err)
}

metrics := produceMetricsHelper(counters)
agged := aggregateMetrics(metrics)
tms, ok := agged["suricata_capture_kernel_packets_total"] // test metrics
if !ok {
t.Errorf("Failed to find suricata_capture_kernel_packets metrics")
}

if len(tms) != 2 {
t.Errorf("Unexpected number of suricata_kernel_packets metrics: %v", len(tms))
}

var tns []string
for _, tm := range tms {
tns = append(tns, tm.labels["thread"])
}

sort.Strings(tns)
threadNames := fmt.Sprintf("%v", tns)

if threadNames != "[RX#01 RX#02]" {
t.Errorf("Wrong threads %v", threadNames)
}

tms, ok = agged["suricata_decoder_packets_total"]
if !ok {
t.Errorf("Failed to find suricata_decoder_packets_total metrics")
}

// Decoder stats are reported for rx and worker threads.
if len(tms) != 8 {
t.Errorf("Unexpected number of suricata_decoder_packets_total metrics: %v", len(tms))
}

tms, ok = agged["suricata_tcp_syn_packets_total"]
if !ok {
t.Errorf("Failed to find suricata_tcp_syn_packets_total")
}

// TCP metrics report for rx and worker threads.
if len(tms) != 8 {
t.Errorf("Unexpected number of suricata_decoder_packets_total metrics: %v", len(tms))
}
}
Loading

0 comments on commit f293546

Please sign in to comment.