From 2f2182a832b5ddad37521888a033fbb292fab68b Mon Sep 17 00:00:00 2001 From: Joshua Pare <65102852+joshuapare@users.noreply.github.com> Date: Mon, 22 Apr 2024 18:29:07 -0500 Subject: [PATCH] fix(promtail): Fix UDP receiver on syslog transport (#10708) Co-authored-by: Callum Styan --- CHANGELOG.md | 1 + .../pkg/promtail/targets/syslog/transport.go | 33 ++++++++++++++----- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6ee3e811265..f9373bcf289a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -135,6 +135,7 @@ Starting with the 3.0 release we began using [conventional commits](https://www. ##### Fixes +* [10708](https://github.com/grafana/loki/pull/10708) **joshuapare**: Fix UDP receiver on syslog transport * [10631](https://github.com/grafana/loki/pull/10631) **thampiotr**: Fix race condition in cleaning up metrics when stopping to tail files. * [10798](https://github.com/grafana/loki/pull/10798) **hainenber**: Fix agent panicking after reloaded due to duplicate metric collector registration. * [10848](https://github.com/grafana/loki/pull/10848) **rgroothuijsen**: Correctly parse list of drop stage sources from YAML. diff --git a/clients/pkg/promtail/targets/syslog/transport.go b/clients/pkg/promtail/targets/syslog/transport.go index 6b1bdfeb91c1..b1c51df2c805 100644 --- a/clients/pkg/promtail/targets/syslog/transport.go +++ b/clients/pkg/promtail/targets/syslog/transport.go @@ -1,6 +1,7 @@ package syslog import ( + "bytes" "context" "crypto/tls" "crypto/x509" @@ -380,16 +381,32 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) { defer t.openConnections.Done() lbs := t.connectionLabels(c.addr.String()) - err := syslogparser.ParseStream(c, func(result *syslog.Result) { - if err := result.Error; err != nil { - t.handleMessageError(err) - } else { - t.handleMessage(lbs.Copy(), result.Message) + + for { + datagram := make([]byte, t.maxMessageLength()) + n, err := c.Read(datagram) + if err != nil { + if err == io.EOF { + break + } + + level.Warn(t.logger).Log("msg", "error reading from pipe", "err", err) + continue } - }, t.maxMessageLength()) - if err != nil { - level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + r := bytes.NewReader(datagram[:n]) + + err = syslogparser.ParseStream(r, func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + } else { + t.handleMessage(lbs.Copy(), result.Message) + } + }, t.maxMessageLength()) + + if err != nil { + level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + } } }