Skip to content

Commit

Permalink
Clean up idle streams in UDP syslog target
Browse files Browse the repository at this point in the history
The UDP transport in the syslog target creates a goroutine for each
remote address to handle multiple packets coming from the same address.
However, these goroutines were not cleaned up.

This commit adds an idle timeout for these goroutines waiting for
packets to be processed in order to prevent goroutine leaking.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 9, 2024
1 parent 61a4205 commit 6a70bb5
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 8 deletions.
100 changes: 92 additions & 8 deletions clients/pkg/promtail/targets/syslog/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ func (c *idleTimeoutConn) setDeadline() {
_ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout))
}

type PipeReaderWriter interface {
Read(data []byte) (n int, err error)
Write(data []byte) (n int, err error)
Close() error
Addr() string
}

type ConnPipe struct {
addr net.Addr
*io.PipeReader
Expand All @@ -149,10 +156,47 @@ func NewConnPipe(addr net.Addr) *ConnPipe {
}
}

func (pipe *ConnPipe) Addr() string {
return pipe.addr.String()
}

func (pipe *ConnPipe) Close() error {
return pipe.PipeWriter.Close()
}

type IdleTimeoutConnPipe struct {
*ConnPipe
deadline time.Time
idleTimeout time.Duration
}

func NewIdleTimeoutConnPipe(addr net.Addr, idleTimeout time.Duration) *IdleTimeoutConnPipe {
pipe := &IdleTimeoutConnPipe{
ConnPipe: NewConnPipe(addr),
idleTimeout: idleTimeout,
}
pipe.setDeadline()
return pipe
}

func (pipe *IdleTimeoutConnPipe) setDeadline() {
pipe.deadline = time.Now().Add(pipe.idleTimeout)
}

func (pipe *IdleTimeoutConnPipe) Read(data []byte) (n int, err error) {
pipe.setDeadline()
return pipe.PipeReader.Read(data)
}

func (pipe *IdleTimeoutConnPipe) Write(data []byte) (n int, err error) {
pipe.setDeadline()
return pipe.PipeWriter.Write(data)
}

func (pipe *IdleTimeoutConnPipe) IsIdle(t time.Time) bool {
return pipe.deadline.Before(t)
}

type TCPTransport struct {
*baseTransport
listener net.Listener
Expand Down Expand Up @@ -303,11 +347,14 @@ func (t *TCPTransport) Addr() net.Addr {
type UDPTransport struct {
*baseTransport
udpConn *net.UDPConn
streams map[string]*IdleTimeoutConnPipe
mtx sync.Mutex
}

func NewSyslogUDPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport {
return &UDPTransport{
baseTransport: newBaseTransport(config, handleMessage, handleError, logger),
streams: make(map[string]*IdleTimeoutConnPipe),
}
}

Expand All @@ -325,8 +372,10 @@ func (t *UDPTransport) Run() error {
_ = t.udpConn.SetReadBuffer(1024 * 1024)
level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolUDP)

t.openConnections.Add(1)
// t.openConnections does not track connections in the UDP transport, but rather running goroutines
t.openConnections.Add(2)
go t.acceptPackets()
go t.cleanupIdleConnections()
return nil
}

Expand All @@ -344,13 +393,14 @@ func (t *UDPTransport) acceptPackets() {
addr net.Addr
err error
)
streams := make(map[string]*ConnPipe)
buf := make([]byte, t.maxMessageLength())

for {
if !t.Ready() {
level.Info(t.logger).Log("msg", "syslog server shutting down", "protocol", protocolUDP, "err", t.ctx.Err())
for _, stream := range streams {
t.mtx.Lock()
defer t.mtx.Unlock()
for _, stream := range t.streams {
if err = stream.Close(); err != nil {
level.Error(t.logger).Log("msg", "failed to close pipe", "err", err)
}
Expand All @@ -363,23 +413,57 @@ func (t *UDPTransport) acceptPackets() {
continue
}

stream, ok := streams[addr.String()]
t.mtx.Lock()
stream, ok := t.streams[addr.String()]
if !ok {
stream = NewConnPipe(addr)
streams[addr.String()] = stream
stream = NewIdleTimeoutConnPipe(addr, time.Minute)
t.streams[addr.String()] = stream
t.openConnections.Add(1)
go t.handleRcv(stream)
}
if _, err := stream.Write(buf[:n]); err != nil {
level.Warn(t.logger).Log("msg", "failed to write to stream", "addr", addr, "err", err)
}
t.mtx.Unlock()
}
}

func (t *UDPTransport) cleanupIdleConnections() {
defer t.openConnections.Done()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
if !t.Ready() {
return
}
select {
case <-t.ctx.Done():
// Closing of streams when shutting down is handled by the acceptPackets loop
return
case now := <-ticker.C:
t.mtx.Lock()
for addr, stream := range t.streams {
if stream.IsIdle(now) {
if err := stream.Close(); err != nil {
level.Warn(t.logger).Log("msg", "failed to close stream during cleanup", "addr", addr, "err", err)
}
delete(t.streams, addr)
t.openConnections.Done()
}
}
t.mtx.Unlock()
default:
return
}
}
}

func (t *UDPTransport) handleRcv(c *ConnPipe) {
func (t *UDPTransport) handleRcv(c PipeReaderWriter) {
defer t.openConnections.Done()

lbs := t.connectionLabels(c.addr.String())
lbs := t.connectionLabels(c.Addr())
err := syslogparser.ParseStream(c, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
Expand Down
33 changes: 33 additions & 0 deletions clients/pkg/promtail/targets/syslog/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package syslog

import (
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestIdleTimeoutConnPipe(t *testing.T) {
addrs, _ := net.InterfaceAddrs()
timeout := 100 * time.Millisecond

p := NewIdleTimeoutConnPipe(addrs[0], timeout)
// upon creation, the idle timeout is set
require.False(t, p.IsIdle(time.Now()))
time.Sleep(50 * time.Millisecond)
require.False(t, p.IsIdle(time.Now()))

// When reading or writing, the deadline is extended
go func() {
buf := make([]byte, 0, 1024)
_, err := p.Read(buf)
require.NoError(t, err)
}()
_, err := p.Write([]byte{104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100})
require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
require.False(t, p.IsIdle(time.Now()))
time.Sleep(80 * time.Millisecond)
require.True(t, p.IsIdle(time.Now()))
}

0 comments on commit 6a70bb5

Please sign in to comment.