From 025d2c32b46cf77e6443c3a588df9878bfce8e4f Mon Sep 17 00:00:00 2001 From: Joshua Pare Date: Mon, 25 Sep 2023 21:05:43 -0500 Subject: [PATCH] ensure syslog parser gets an EOF-terminated reader on udp receive --- CHANGELOG.md | 1 + .../pkg/promtail/targets/syslog/transport.go | 33 ++++++++++++++----- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9a7c8af187d..f4309464a1f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ ##### Fixes * [10631](https://github.com/grafana/loki/pull/10631) **thampiotr**: Fix race condition in cleaning up metrics when stopping to tail files. +* [10708](https://github.com/grafana/loki/pull/10708) **joshuapare**: Fix UDP receiver on syslog transport #### LogCLI diff --git a/clients/pkg/promtail/targets/syslog/transport.go b/clients/pkg/promtail/targets/syslog/transport.go index 67a78136e311..aafa86ed26d5 100644 --- a/clients/pkg/promtail/targets/syslog/transport.go +++ b/clients/pkg/promtail/targets/syslog/transport.go @@ -1,6 +1,7 @@ package syslog import ( + "bytes" "context" "crypto/tls" "crypto/x509" @@ -380,16 +381,32 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) { defer t.openConnections.Done() lbs := t.connectionLabels(c.addr.String()) - err := syslogparser.ParseStream(c, func(result *syslog.Result) { - if err := result.Error; err != nil { - t.handleMessageError(err) - } else { - t.handleMessage(lbs.Copy(), result.Message) + + for { + datagram := make([]byte, t.maxMessageLength()) + n, err := c.Read(datagram) + if err != nil { + if err == io.EOF { + break + } + + level.Warn(t.logger).Log("msg", "error reading from pipe", "err", err) + continue } - }, t.maxMessageLength()) - if err != nil { - level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + r := bytes.NewReader(datagram[:n]) + + err = syslogparser.ParseStream(r, func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + } else { + t.handleMessage(lbs.Copy(), result.Message) + } + }, t.maxMessageLength()) + + if err != nil { + level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + } } }