Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: add Unix datagram support #169

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ jobs:
- uses: actions/checkout@v4

- name: golangci-lint
uses: golangci/golangci-lint-action@v6.0.1
uses: golangci/golangci-lint-action@v6.1.0
with:
version: v1.59.1
version: v1.61.0
5 changes: 3 additions & 2 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func (b *buffer) len() int {
return len(b.data)
}

func (b *buffer) flush(w io.Writer, n int) {
_, _ = w.Write(b.data[:n])
func (b *buffer) flush(w io.Writer, n int) error {
_, err := w.Write(b.data[:n])
n = copy(b.data, b.data[n:])
b.data = b.data[:n]
return err
}
77 changes: 59 additions & 18 deletions datadog/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package datadog

import (
"io"
"log"
"net"
"net/url"
"os"
"strings"
"time"

"github.com/segmentio/stats/v4"
Expand Down Expand Up @@ -40,6 +42,8 @@ var (
// The ClientConfig type is used to configure datadog clients.
type ClientConfig struct {
// Address of the datadog database to send metrics to.
// UDP: host:port (default)
// UDS: unixgram://dir/file.ext
Address string

// Maximum size of batch of events sent to datadog.
Expand Down Expand Up @@ -106,15 +110,23 @@ func NewClientWith(config ClientConfig) *Client {
},
}

conn, bufferSize, err := dial(config.Address, config.BufferSize)
w, err := newWriter(config.Address)
if err != nil {
log.Printf("stats/datadog: %s", err)
c.err = err
w = &noopWriter{}
}

c.conn, c.err, c.bufferSize = conn, err, bufferSize
c.buffer.BufferSize = bufferSize
newBufSize, err := w.CalcBufferSize(config.BufferSize)
if err != nil {
log.Printf("stats/datadog: unable to calc writer's buffer size. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err)
newBufSize = DefaultBufferSize
}

c.bufferSize = newBufSize
c.buffer.Serializer = &c.serializer
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", bufferSize)
c.serializer.conn = w
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize)
return c
}

Expand All @@ -140,18 +152,7 @@ func (c *Client) Close() error {
return c.err
}

func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) {
var f *os.File

if conn, err = net.Dial("udp", address); err != nil {
return
}

if f, err = conn.(*net.UDPConn).File(); err != nil {
conn.Close()
return
}
defer f.Close()
func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) {
fd := int(f.Fd())

// The kernel refuses to send UDP datagrams that are larger than the size of
Expand All @@ -160,7 +161,6 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error)
// to accept larger datagrams, or fallback to the default socket buffer size
// if it failed.
if bufsize, err = unix.GetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF); err != nil {
conn.Close()
return
}

Expand Down Expand Up @@ -198,3 +198,44 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error)
_ = unix.SetNonblock(fd, true)
return
}

type ddWriter interface {
io.WriteCloser
CalcBufferSize(desiredBufSize int) (int, error)
}

func newWriter(addr string) (ddWriter, error) {
if strings.HasPrefix(addr, "unixgram://") ||
strings.HasPrefix(addr, "udp://") {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
switch u.Scheme {
case "unixgram":
return newUDSWriter(u.Path)
case "udp":
return newUDPWriter(u.Path)
}
}
// default assume addr host:port to use UDP
return newUDPWriter(addr)
}

// noopWriter is a writer that does nothing.
type noopWriter struct{}

// Write writes nothing.
func (w *noopWriter) Write(_ []byte) (int, error) {
return 0, nil
}

// Close is a noop close.
func (w *noopWriter) Close() error {
return nil
}

// CalcBufferSize returns the sizehint.
func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) {
return sizehint, nil
}
79 changes: 75 additions & 4 deletions datadog/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ func TestClientWithDistributionPrefixes(t *testing.T) {
}
}

func TestClient_UDS(t *testing.T) {
client := NewClient("unixgram://do-not-exist")

for i := 0; i != 1000; i++ {
client.HandleMeasures(time.Time{}, stats.Measure{
Name: "request",
Fields: []stats.Field{
{Name: "count", Value: stats.ValueOf(5)},
{Name: "rtt", Value: stats.ValueOf(100 * time.Millisecond)},
},
Tags: []stats.Tag{
stats.T("answer", "42"),
stats.T("hello", "world"),
},
})
}

if err := client.Close(); err != nil {
t.Error(err)
}
}

func TestClientWithUseDistributions(t *testing.T) {
// Start a goroutine listening for packets and giving them back on packets chan
packets := make(chan []byte)
Expand All @@ -87,14 +109,25 @@ func TestClientWithUseDistributions(t *testing.T) {
client.Flush()

expectedPacket1 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|d|#answer:42,hello:world\n"
assert.EqualValues(t, expectedPacket1, string(<-packets))
select {
case packet := <-packets:
fmt.Println("receive packet", packet)
assert.EqualValues(t, expectedPacket1, string(packet))
case <-time.After(2 * time.Second):
t.Fatal("no response after 2 seconds")
}

client.useDistributions = false
client.HandleMeasures(time.Time{}, testMeasure)
client.Flush()

expectedPacket2 := "request.count:5|c|#answer:42,hello:world\nrequest.dist_rtt:0.1|h|#answer:42,hello:world\n"
assert.EqualValues(t, expectedPacket2, string(<-packets))
select {
case packet := <-packets:
assert.EqualValues(t, expectedPacket2, string(packet))
case <-time.After(2 * time.Second):
t.Fatal("no response after 2 seconds")
}

if err := client.Close(); err != nil {
t.Error(err)
Expand All @@ -117,7 +150,7 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
atomic.AddInt32(&count, 1)
}))
defer closer.Close()
Expand All @@ -135,6 +168,40 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
}
}

func TestClientWriteLargeMetrics_UDS(t *testing.T) {
const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.bytes:240|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.body.bytes:0|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.size:1|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.bytes:70|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.body.bytes:839|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
`

count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startUDSTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
atomic.AddInt32(&count, 1)
}))
defer closer.Close()

client := NewClient("unixgram://" + addr)

if _, err := client.Write([]byte(data)); err != nil {
t.Error(err)
}

time.Sleep(100 * time.Millisecond)

if n := atomic.LoadInt32(&count); n != expect {
t.Error("bad metric count:", n)
}
}

func BenchmarkClient(b *testing.B) {
log.SetOutput(io.Discard)

Expand Down Expand Up @@ -180,15 +247,19 @@ func isClosedNetworkConnectionErr(err error) bool {
// startUDPListener starts a goroutine listening for UDP packets on 127.0.0.1 and an available port.
// The address listened to is returned as `addr`. The payloads of packets received are copied to `packets`.
func startUDPListener(t *testing.T, packets chan []byte) (addr string, closer io.Closer) {
conn, err := net.ListenPacket("udp", "127.0.0.1:0") // :0 chooses an available port
t.Helper()
conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: 0, IP: net.ParseIP("127.0.0.1")}) // :0 chooses an available port
if err != nil {
t.Fatal(err)
}

fmt.Println("starting UDP goroutine")
go func() {
for {
packetBytes := make([]byte, 1024)
fmt.Println("call conn.ReadFrom")
n, _, err := conn.ReadFrom(packetBytes)
fmt.Println("read", n, err)
if n > 0 {
packets <- packetBytes[:n]
}
Expand Down
3 changes: 1 addition & 2 deletions datadog/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"log"
"math"
"net"
"strconv"
"strings"
"time"
Expand All @@ -16,7 +15,7 @@ import (
// Datagram format: https://docs.datadoghq.com/developers/dogstatsd/datagram_shell

type serializer struct {
conn net.Conn
conn io.WriteCloser
bufferSize int
filters map[string]struct{}
distPrefixes []string
Expand Down
70 changes: 68 additions & 2 deletions datadog/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package datadog
import (
"io"
"net"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
Expand All @@ -21,7 +23,7 @@ func TestServer(t *testing.T) {
seenGauges := make([]Metric, 0)
var mu sync.Mutex

addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
switch m.Name {
case "datadog.test.A":
atomic.AddUint32(&a, uint32(m.Value))
Expand Down Expand Up @@ -94,7 +96,7 @@ func TestServer(t *testing.T) {
}
}

func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
func startUDPTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
conn, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Error(err)
Expand All @@ -105,3 +107,67 @@ func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Clos

return conn.LocalAddr().String(), conn
}

// startUDSTestServerWithSocketFile starts a UDS server with a given socket file.
func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler Handler) (closer io.Closer) {
udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

go Serve(conn, handler)

return &testUnixgramServer{
UnixConn: conn,
pathToDelete: socketPath,
}
}

// startUDSTestServer starts a UDS server with a random socket file internally generated.
func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) {
// generate a random dir
dir, err := os.MkdirTemp("", "socket")
if err != nil {
t.Error(err)
t.FailNow()
}

socketPath = filepath.Join(dir, "dsd.socket")

udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

ts := testUnixgramServer{
UnixConn: conn,
pathToDelete: dir, // so we delete any tmp dir we created
}

go Serve(conn, handler)
return socketPath, &ts
}

type testUnixgramServer struct {
*net.UnixConn
pathToDelete string
}

func (ts *testUnixgramServer) Close() error {
os.RemoveAll(ts.pathToDelete) // clean up
return ts.UnixConn.Close()
}
Loading
Loading