From c8fb8b8889f1f8cbb8d496376c6062f7a5e9c222 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Thu, 2 Nov 2023 14:24:29 -0500 Subject: [PATCH] feat: add base, sql, and telemetry from moovfinancial/go-observability --- build/log.go | 40 ++ build/log_test.go | 15 + go.mod | 37 +- go.sum | 85 ++-- makefile | 2 +- sql/db.go | 177 +++++++++ sql/sql.go | 171 ++++++++ sql/sql_test.go | 348 +++++++++++++++++ sql/stmt.go | 133 +++++++ sql/tx.go | 153 ++++++++ telemetry/README.md | 125 ++++++ telemetry/attributes.go | 228 +++++++++++ telemetry/attributes_test.go | 738 +++++++++++++++++++++++++++++++++++ telemetry/collector.go | 32 ++ telemetry/config.go | 160 ++++++++ telemetry/config_test.go | 93 +++++ telemetry/env.go | 22 ++ telemetry/exporter.go | 79 ++++ telemetry/honey.go | 49 +++ telemetry/linked.go | 91 +++++ telemetry/stdout.go | 24 ++ telemetry/tracer.go | 49 +++ telemetry/tracer_test.go | 51 +++ 23 files changed, 2854 insertions(+), 48 deletions(-) create mode 100644 build/log.go create mode 100644 build/log_test.go create mode 100644 sql/db.go create mode 100644 sql/sql.go create mode 100644 sql/sql_test.go create mode 100644 sql/stmt.go create mode 100644 sql/tx.go create mode 100644 telemetry/README.md create mode 100644 telemetry/attributes.go create mode 100644 telemetry/attributes_test.go create mode 100644 telemetry/collector.go create mode 100644 telemetry/config.go create mode 100644 telemetry/config_test.go create mode 100644 telemetry/env.go create mode 100644 telemetry/exporter.go create mode 100644 telemetry/honey.go create mode 100644 telemetry/linked.go create mode 100644 telemetry/stdout.go create mode 100644 telemetry/tracer.go create mode 100644 telemetry/tracer_test.go diff --git a/build/log.go b/build/log.go new file mode 100644 index 00000000..804f91c2 --- /dev/null +++ b/build/log.go @@ -0,0 +1,40 @@ +package build + +import ( + "runtime/debug" + "strings" + + "github.com/moov-io/base/log" +) + +func Log(logger log.Logger) { + info, ok := debug.ReadBuildInfo() + if !ok { + logger.Error().Log("unable to read build info, pleasure ensure go module support") + } + + logger = logger.With(log.Fields{ + "build_path": log.String(info.Path), + "build_go_version": log.String(info.GoVersion), + }) + + for _, mod := range info.Deps { + mod = runningModule(mod) + + if strings.Contains(strings.ToLower(mod.Path), "/moov") { + logger.With(log.Fields{ + "build_mod_path": log.String(mod.Path), + "build_mod_version": log.String(mod.Version), + }).Log("") + } + } +} + +// Recurse through all the replaces to find whats actually running +func runningModule(mod *debug.Module) *debug.Module { + if mod.Replace != nil { + return runningModule(mod.Replace) + } else { + return mod + } +} diff --git a/build/log_test.go b/build/log_test.go new file mode 100644 index 00000000..dcb59e70 --- /dev/null +++ b/build/log_test.go @@ -0,0 +1,15 @@ +package build_test + +import ( + "testing" + + "github.com/moov-io/base/build" + "github.com/moov-io/base/log" +) + +func Test_LogDeps(t *testing.T) { + _, logger := log.NewBufferLogger() + + // Running it purely to make sure it doesn't panic as it requires a compiled binary to work. + build.Log(logger) +} diff --git a/go.mod b/go.mod index 10368788..cb844d6c 100644 --- a/go.mod +++ b/go.mod @@ -19,16 +19,23 @@ require ( github.com/rickar/cal/v2 v2.1.13 github.com/spf13/viper v1.17.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/otel v1.19.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 + go.opentelemetry.io/otel/sdk v1.19.0 + go.opentelemetry.io/otel/trace v1.19.0 google.golang.org/grpc v1.59.0 ) require ( - cloud.google.com/go v0.110.8 // indirect - cloud.google.com/go/compute v1.23.1 // indirect + cloud.google.com/go v0.110.10 // indirect + cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect - cloud.google.com/go/iam v1.1.3 // indirect - cloud.google.com/go/longrunning v0.5.2 // indirect + cloud.google.com/go/iam v1.1.5 // indirect + cloud.google.com/go/longrunning v0.5.4 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect @@ -36,23 +43,25 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/envoyproxy/go-control-plane v0.11.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gobuffalo/here v0.6.7 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.7 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/magiconair/properties v1.8.7 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/sagikazarmark/locafero v0.3.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -62,6 +71,8 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/otel/metric v1.19.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.14.0 // indirect @@ -72,11 +83,11 @@ require ( golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/api v0.147.0 // indirect + google.golang.org/api v0.149.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto v0.0.0-20231012201019-e917dd12ba7a // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect + google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index a2b7dbb1..d25eb820 100644 --- a/go.sum +++ b/go.sum @@ -17,30 +17,28 @@ cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHOb cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= cloud.google.com/go v0.75.0/go.mod h1:VGuuCn7PG0dwsd5XPVm2Mm3wlh3EL55/79EKB6hlPTY= -cloud.google.com/go v0.110.8 h1:tyNdfIxjzaWctIiLYOTalaLKZ17SI44SKFW26QbOhME= -cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= +cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y= +cloud.google.com/go v0.110.10/go.mod h1:v1OoFqYxiBkUrruItNM3eT4lLByNjxmJSV/xDKJNnic= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= -cloud.google.com/go/compute v1.23.1 h1:V97tBoDaZHb6leicZ1G6DLK2BAaZLJ/7+9BB/En3hR0= -cloud.google.com/go/compute v1.23.1/go.mod h1:CqB3xpmPKKt3OJpW2ndFIXnA9A4xAy/F3Xp1ixncW78= +cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= +cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= -cloud.google.com/go/iam v1.1.3 h1:18tKG7DzydKWUnLjonWcJO6wjSCAtzh4GcRKlH/Hrzc= -cloud.google.com/go/iam v1.1.3/go.mod h1:3khUlaBXfPKKe7huYgEpDn6FtgRyMEqbkvBxrQyY5SE= -cloud.google.com/go/longrunning v0.5.2 h1:u+oFqfEwwU7F9dIELigxbe0XVnBAo9wqMuQLA50CZ5k= -cloud.google.com/go/longrunning v0.5.2/go.mod h1:nqo6DQbNV2pXhGDbDMoN2bWz68MjZUzqv2YttZiveCs= +cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= +cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= +cloud.google.com/go/longrunning v0.5.4 h1:w8xEcbZodnA2BbW6sVirkkoC+1gP8wS57EUUgGS0GVg= +cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= -cloud.google.com/go/spanner v1.50.0 h1:QrJFOpaxCXdXF+GkiruLz642PHxkdj68PbbnLw3O2Zw= -cloud.google.com/go/spanner v1.50.0/go.mod h1:eGj9mQGK8+hkgSVbHNQ06pQ4oS+cyc4tXXd6Dif1KoM= cloud.google.com/go/spanner v1.51.0 h1:l3exhhsVMKsx1E7Xd1QajYSvHmI1KZoWPW5tRxIIdvQ= cloud.google.com/go/spanner v1.51.0/go.mod h1:c5KNo5LQ1X5tJwma9rSQZsXNBDNvj4/n8BVc3LNahq0= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= @@ -57,6 +55,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= @@ -94,8 +94,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -105,6 +105,11 @@ github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/gobuffalo/here v0.6.0/go.mod h1:wAG085dHOYqUpf+Ap+WOdrPTp5IYcDAs/x7PLa8Y5fM= @@ -114,6 +119,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/golang-migrate/migrate/v4 v4.16.2 h1:8coYbMKUyInrFk1lfGfRovTLAW7PhWp8qQDT2iKfuoA= github.com/golang-migrate/migrate/v4 v4.16.2/go.mod h1:pfcJX4nPHaVdc5nmdCikFBWtm+UBpiZjRNNsyBbp0/o= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -158,7 +164,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -176,12 +181,10 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/googleapis/enterprise-certificate-proxy v0.3.1 h1:SBWmZhjUDRorQxrN0nwzf+AHBxnbFjViHQS4P0yVpmQ= -github.com/googleapis/enterprise-certificate-proxy v0.3.1/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= +github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= +github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= @@ -191,6 +194,8 @@ github.com/googleapis/go-sql-spanner v1.1.1/go.mod h1:e12AKZmltQH/2XGqR/2SAPWPKs github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 h1:RtRsiaGvWxcwd8y3BiRZxsylPT8hLWZ5SPcfI+3IDNk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0/go.mod h1:TzP6duP4Py2pHLVPPQp42aoYI92+PCrVotyR5e8Vqlk= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -212,16 +217,14 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= -github.com/madflojo/testcerts v1.1.0 h1:kopRnZB2jH1yKhC2d27+GVQLDy/fL9/65UrkpJkF4GA= -github.com/madflojo/testcerts v1.1.0/go.mod h1:G+ucVds7Pj79qA9/ue9FygnXiBCm622IdzKWna621io= github.com/madflojo/testcerts v1.1.1 h1:YsSHWV79nMNZK0mJtwXjKoYHjJEbLPFefR8TxmmWupY= github.com/madflojo/testcerts v1.1.1/go.mod h1:MW8sh39gLnkKh4K0Nc55AyHEDl9l/FBLDUsQhpmkuo0= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/markbates/pkger v0.17.1 h1:/MKEtWqtc0mZvu9OinB9UzVN9iYCwLWuyUv4Bw+PCno= github.com/markbates/pkger v0.17.1/go.mod h1:0JoVlrol20BSywW79rN3kdFFsE5xYM+rSCQDXbLhiuI= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= @@ -241,8 +244,8 @@ github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+L github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= -github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rickar/cal/v2 v2.1.13 h1:FENBPXxDPyL1OWGf9ZdpWGcEiGoSjt0UZED8VOxvK0c= @@ -290,8 +293,25 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 h1:Nw7Dv4lwvGrI68+wULbcq7su9K2cebeCUrDjVrUJHxM= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0/go.mod h1:1MsF6Y7gTqosgoZvHlzcaaM8DIMNZgJh87ykokoNH7Y= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -438,7 +458,6 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -531,8 +550,8 @@ google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz513 google.golang.org/api v0.35.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg= google.golang.org/api v0.36.0/go.mod h1:+z5ficQTmoYpPn8LCUNVpK5I7hwkpjbcgqA7I34qYtE= google.golang.org/api v0.40.0/go.mod h1:fYKFpnQN0DsDSKRVRcQSDQNtqWPfM9i+zNPxepjRCQ8= -google.golang.org/api v0.147.0 h1:Can3FaQo9LlVqxJCodNmeZW/ib3/qKAY3rFeXiHo5gc= -google.golang.org/api v0.147.0/go.mod h1:pQ/9j83DcmPd/5C9e2nFOdjjNkDZ1G+zkbK2uvdkJMs= +google.golang.org/api v0.149.0 h1:b2CqT6kG+zqJIVKRQ3ELJVLN1PwHZ6DJ3dW8yl82rgY= +google.golang.org/api v0.149.0/go.mod h1:Mwn1B7JTXrzXtnvmzQE2BD6bYZQ8DShKZDZbeN9I7qI= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -578,12 +597,12 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20231012201019-e917dd12ba7a h1:fwgW9j3vHirt4ObdHoYNwuO24BEZjSzbh+zPaNWoiY8= -google.golang.org/genproto v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:EMfReVxb80Dq1hhioy0sOsY9jCE46YDgHlJ7fWVUWRE= -google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a h1:myvhA4is3vrit1a6NZCWBIwN0kNEnX21DJOJX/NvIfI= -google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:SUBoKXbI1Efip18FClrQVGjWcyd0QZd8KkvdP34t7ww= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0= +google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 h1:I6WNifs6pF9tNdSob2W24JtyxIYjzFB9qDlpUC76q+U= +google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405/go.mod h1:3WDQMjmJk36UQhjQ89emUzb1mdaHcPeeAh4SCBKznB4= +google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405 h1:HJMDndgxest5n2y77fnErkM62iUsptE/H8p0dC2Huo4= +google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405/go.mod h1:oT32Z4o8Zv2xPQTg0pbVaPr0MPOH6f14RgXt7zfIpwg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -600,8 +619,6 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= -google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/makefile b/makefile index 51b6b551..905bd5eb 100644 --- a/makefile +++ b/makefile @@ -9,7 +9,7 @@ ifeq ($(OS),Windows_NT) else @wget -O lint-project.sh https://raw.githubusercontent.com/moov-io/infra/master/go/lint-project.sh @chmod +x ./lint-project.sh - GOCYCLO_LIMIT=26 COVER_THRESHOLD=50.0 GOLANGCI_LINTERS=gosec ./lint-project.sh + COVER_THRESHOLD=80.0 GOLANGCI_LINTERS=gosec ./lint-project.sh endif .PHONY: clean diff --git a/sql/db.go b/sql/db.go new file mode 100644 index 00000000..baeea45d --- /dev/null +++ b/sql/db.go @@ -0,0 +1,177 @@ +package sql + +import ( + "context" + gosql "database/sql" + "time" + + "github.com/moov-io/base/log" +) + +type DB struct { + *gosql.DB + + logger log.Logger + slowQueryThresholdMs int64 + + id string + stopTimer context.CancelFunc +} + +func ObserveDB(innerDB *gosql.DB, logger log.Logger, id string) (*DB, error) { + cancel := MonitorSQLDriver(innerDB, id) + + return &DB{ + DB: innerDB, + id: id, + stopTimer: cancel, + logger: logger, + + slowQueryThresholdMs: (time.Second * 2).Milliseconds(), + }, nil +} + +func (w *DB) lazyLogger() log.Logger { + return w.logger +} + +func (w *DB) start(op string, qry string, args int) func() int64 { + return MeasureQuery(w.lazyLogger, w.slowQueryThresholdMs, w.id, op, qry, args) +} + +func (w *DB) error(err error) error { + return MeasureError(w.id, err) +} + +func (w *DB) Close() error { + return w.DB.Close() +} + +func (w *DB) SetSlowQueryThreshold(d time.Duration) { + w.slowQueryThresholdMs = d.Milliseconds() +} + +func (w *DB) Prepare(query string) (*Stmt, error) { + done := w.start("prepare", query, 0) + defer done() + + return newStmt(context.Background(), w.logger, w.DB, query, w.id, w.slowQueryThresholdMs) +} + +func (w *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) { + done := w.start("prepare", query, 0) + defer done() + + return newStmt(ctx, w.logger, w.DB, query, w.id, w.slowQueryThresholdMs) +} + +func (w *DB) Exec(query string, args ...interface{}) (gosql.Result, error) { + done := w.start("exec", query, len(args)) + defer done() + + r, err := w.DB.Exec(query, args...) + return r, w.error(err) +} + +func (w *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (gosql.Result, error) { + done := w.start("exec", query, len(args)) + ctx, end := span(ctx, w.id, "exec", query, len(args)) + defer func() { + end() + done() + }() + + r, err := w.DB.ExecContext(ctx, query, args...) + return r, w.error(err) +} + +func (w *DB) Query(query string, args ...interface{}) (*gosql.Rows, error) { + done := w.start("query", query, len(args)) + defer done() + + r, err := w.DB.Query(query, args...) + return r, w.error(err) +} + +func (w *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*gosql.Rows, error) { + done := w.start("query", query, len(args)) + ctx, end := span(ctx, w.id, "query", query, len(args)) + defer func() { + end() + done() + }() + + r, err := w.DB.QueryContext(ctx, query, args...) + return r, w.error(err) +} + +func (w *DB) QueryRow(query string, args ...interface{}) *gosql.Row { + done := w.start("query-row", query, len(args)) + defer done() + + r := w.DB.QueryRow(query, args...) + w.error(r.Err()) + + return r +} + +func (w *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *gosql.Row { + done := w.start("query-row", query, len(args)) + ctx, end := span(ctx, w.id, "query-row", query, len(args)) + defer func() { + end() + done() + }() + + r := w.DB.QueryRowContext(ctx, query, args...) + w.error(r.Err()) + + return r +} + +func (w *DB) Begin() (*Tx, error) { + t, err := w.DB.Begin() + if err != nil { + return nil, w.error(err) + } + + tx := &Tx{ + Tx: t, + logger: w.logger, + id: w.id, + ctx: context.Background(), + slowQueryThresholdMs: w.slowQueryThresholdMs, + } + + tx.done = MeasureQuery(tx.lazyLogger, w.slowQueryThresholdMs, tx.id, "tx", "Transaction", 0) + + return tx, nil +} + +type TxOptions = gosql.TxOptions + +func (w *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) { + ctx, end := span(ctx, w.id, "tx", "BEGIN TRANSACTION", 0) + + t, err := w.DB.BeginTx(ctx, opts) + if err != nil { + return nil, w.error(err) + } + + tx := &Tx{ + Tx: t, + logger: w.logger, + id: w.id, + ctx: ctx, + slowQueryThresholdMs: w.slowQueryThresholdMs, + } + + done := MeasureQuery(tx.lazyLogger, w.slowQueryThresholdMs, tx.id, "tx", "Transaction", 0) + + tx.done = func() int64 { + end() + return done() + } + + return tx, nil +} diff --git a/sql/sql.go b/sql/sql.go new file mode 100644 index 00000000..498ee967 --- /dev/null +++ b/sql/sql.go @@ -0,0 +1,171 @@ +package sql + +import ( + "context" + "database/sql" + "strings" + "sync" + "time" + + "github.com/moov-io/base/log" + "github.com/moov-io/base/telemetry" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var ( + statusLock = &sync.Mutex{} + + sqlConnections = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "sql_connections", + Help: "How many MySQL connections and what status they're in.", + }, []string{"state", "id"}) + + sqlConnectionsCounters = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "sql_connections_counters", + Help: `Counters specific to the sql connections. + wait_count: The total number of connections waited for. + wait_duration: The total time blocked waiting for a new connection. + max_idle_closed: The total number of connections closed due to SetMaxIdleConns. + max_idle_time_closed: The total number of connections closed due to SetConnMaxIdleTime. + max_lifetime_closed: The total number of connections closed due to SetConnMaxLifetime. + `, + }, []string{"counter", "id"}) + + sqlQueries = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "sql_queries", + Help: `Histogram that measures the time in milliseconds queries take`, + Buckets: []float64{10, 25, 50, 100, 250, 500, 1000, 2500, 5000}, + }, []string{"operation", "id"}) + + sqlErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "sql_errors", + Help: `Histogram that measures the time in milliseconds queries take`, + }, []string{"id"}) + + // Adding in aliases for the usual error cases + ErrNoRows = sql.ErrNoRows + ErrConnDone = sql.ErrConnDone + ErrTxDone = sql.ErrTxDone +) + +func MonitorSQLDriver(db *sql.DB, id string) context.CancelFunc { + ctx, cancel := context.WithCancel(context.Background()) + + // Setup metrics after the database is setup + go func(db *sql.DB, id string) { + t := time.NewTicker(60 * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + MeasureStats(db, id) + } + } + }(db, id) + + return cancel +} + +func MeasureStats(db *sql.DB, id string) error { + statusLock.Lock() + defer statusLock.Unlock() + + stats := db.Stats() + + sqlConnections.With(prometheus.Labels{"state": "idle", "id": id}).Set(float64(stats.Idle)) + sqlConnections.With(prometheus.Labels{"state": "inuse", "id": id}).Set(float64(stats.InUse)) + sqlConnections.With(prometheus.Labels{"state": "open", "id": id}).Set(float64(stats.OpenConnections)) + + sqlConnectionsCounters.With(prometheus.Labels{"counter": "wait_count", "id": id}).Set(float64(stats.WaitCount)) + sqlConnectionsCounters.With(prometheus.Labels{"counter": "wait_ms", "id": id}).Set(float64(stats.WaitDuration.Milliseconds())) + sqlConnectionsCounters.With(prometheus.Labels{"counter": "max_idle_closed", "id": id}).Set(float64(stats.MaxIdleClosed)) + sqlConnectionsCounters.With(prometheus.Labels{"counter": "max_idle_time_closed", "id": id}).Set(float64(stats.MaxIdleTimeClosed)) + sqlConnectionsCounters.With(prometheus.Labels{"counter": "max_lifetime_closed", "id": id}).Set(float64(stats.MaxLifetimeClosed)) + + return nil +} + +type LazyLogger func() log.Logger + +func MeasureQuery(logger LazyLogger, slowQueryThresholdMs int64, id string, op string, qry string, args int) func() int64 { + s := time.Now().UnixMilli() + + once := sync.Once{} + + return func() int64 { + d := int64(-1) + + once.Do(func() { + d = time.Now().UnixMilli() - s + + sqlQueries.With(prometheus.Labels{"id": id, "operation": op}).Observe(float64(d)) + + if d >= slowQueryThresholdMs { + logger().Warn().With(log.Fields{ + "query": log.String(CleanQuery(qry)), + "query_id": log.String(id), + "query_op": log.String(op), + "query_time_ms": log.Int64(d), + "query_args": log.Int(args), + }).Log("slow query detected") + } + + // Lazy loggers could self reference, so lets nil it out. + logger = nil + }) + + return d + } +} + +func MeasureError(id string, err error) error { + if err != nil && err != ErrNoRows { + sqlErrors.With(prometheus.Labels{"id": id}).Inc() + } + return err +} + +func CleanQuery(s string) string { + cleaner := strings.ReplaceAll(s, "\n", " ") + cleaner = strings.ReplaceAll(cleaner, "\t", " ") + cleaner = strings.Trim(cleaner, "\n\t ") + + for { + spaces := strings.ReplaceAll(cleaner, " ", " ") + + // Check if it didn't change after the last replace + if spaces == cleaner { + break + } + + cleaner = spaces + } + + return cleaner +} + +func span(ctx context.Context, id string, op string, query string, args int) (context.Context, func()) { + start := time.Now() + + ctx, span := telemetry.StartSpan(ctx, "sql "+op, + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes( + attribute.String("sql.query", CleanQuery(query)), + attribute.String("sql.query_id", id), + attribute.String("sql.query_op", op), + attribute.Int("sql.query_args", args), + ), + ) + + return ctx, func() { + took := time.Since(start) + span.SetAttributes(attribute.Int64("sql.query_time_ms", took.Milliseconds())) + + span.End() + } +} diff --git a/sql/sql_test.go b/sql/sql_test.go new file mode 100644 index 00000000..62e3e42c --- /dev/null +++ b/sql/sql_test.go @@ -0,0 +1,348 @@ +package sql_test + +import ( + "context" + gosql "database/sql" + "errors" + "fmt" + "testing" + "time" + + "github.com/moov-io/base/log" + "github.com/moov-io/base/sql" + + _ "github.com/go-sql-driver/mysql" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_SQL_Connect(t *testing.T) { + a := assert.New(t) + + db, _ := ConnectTestDB(t) + a.NotNil(db) +} + +func Test_SQL_Prepare(t *testing.T) { + a := assert.New(t) + + db, _ := ConnectTestDB(t) + a.NotNil(db) + + sql := "INSERT INTO moov.test(id, value) VALUES (?, ?)" + istmt, err := db.PrepareContext(context.Background(), sql) + a.NoError(err) + t.Cleanup(func() { a.NoError(istmt.Close()) }) + + first := uuid.NewString() + res, err := istmt.Exec(first, uuid.NewString()) + a.NoError(err) + n, err := res.RowsAffected() + a.NoError(err) + a.Equal(int64(1), n) + + second := uuid.NewString() + res, err = istmt.ExecContext(context.Background(), second, uuid.NewString()) + a.NoError(err) + n, err = res.RowsAffected() + a.NoError(err) + a.Equal(int64(1), n) + + sql = "SELECT * FROM moov.test WHERE id = ? LIMIT 1" + sstmt, err := db.Prepare(sql) + a.NoError(err) + t.Cleanup(func() { a.NoError(sstmt.Close()) }) + + rows, err := sstmt.Query(first) + a.NoError(err) + a.NoError(rows.Err()) + t.Cleanup(func() { a.NoError(rows.Close()) }) + + row := sstmt.QueryRow(first) + a.NoError(row.Err()) + + rows2, err := sstmt.QueryContext(context.Background(), second) + a.NoError(err) + a.NoError(rows2.Err()) + t.Cleanup(func() { a.NoError(rows2.Close()) }) + + row = sstmt.QueryRowContext(context.Background(), second) + a.NoError(row.Err()) +} + +func Test_SQL_Exec(t *testing.T) { + a := assert.New(t) + + db, _ := ConnectTestDB(t) + a.NotNil(db) + + sql := "INSERT INTO moov.test(id, value) VALUES (?, ?)" + + _, err := db.Exec(sql, uuid.NewString(), uuid.NewString()) + a.NoError(err) + + _, err = db.ExecContext(context.Background(), sql, uuid.NewString(), uuid.NewString()) + a.NoError(err) + + tx, err := db.Begin() + a.NoError(err) + + _, err = tx.Exec(sql, uuid.NewString(), uuid.NewString()) + a.NoError(err) + + _, err = tx.ExecContext(context.Background(), sql, uuid.NewString(), uuid.NewString()) + a.NoError(err) + + err = tx.Rollback() + a.NoError(err) +} + +func Test_SQL_Query(t *testing.T) { + a := assert.New(t) + + db, _ := ConnectTestDB(t) + a.NotNil(db) + + id := AddRecord(t, db) + + sql := "SELECT * FROM moov.test WHERE id = ? LIMIT 1" + + r, err := db.Query(sql, id) + a.NoError(err) + a.NoError(r.Err()) + defer r.Close() + + r, err = db.QueryContext(context.Background(), sql, id) + a.NoError(err) + a.NoError(r.Err()) + defer r.Close() + + row := db.QueryRow(sql, id) + a.NoError(row.Err()) + + row = db.QueryRowContext(context.Background(), sql, id) + a.NoError(row.Err()) +} + +func Test_SQL_Query_Tx(t *testing.T) { + a := assert.New(t) + + db, _ := ConnectTestDB(t) + a.NotNil(db) + + id := AddRecord(t, db) + + sql := "SELECT * FROM moov.test WHERE id = ? LIMIT 1" + + tx, err := db.BeginTx(context.Background(), &gosql.TxOptions{}) + a.NoError(err) + + r, err := tx.Query(sql, id) + a.NoError(err) + a.NoError(r.Err()) + defer r.Close() + r.Close() + + r, err = tx.QueryContext(context.Background(), sql, id) + a.NoError(err) + a.NoError(r.Err()) + defer r.Close() + r.Close() + + err = tx.Commit() + a.NoError(err) +} + +func Test_SQL_Query_Tx_Row(t *testing.T) { + a := assert.New(t) + + db, logBuilder := ConnectTestDB(t) + a.NotNil(db) + + id := AddRecord(t, db) + + // to be able to run multiple queries we have to dump the scanned value + dump := "" + + sql := "SELECT * FROM moov.test WHERE id = ? LIMIT 1" + + tx, err := db.BeginTx(context.Background(), &gosql.TxOptions{}) + a.NoError(err) + + row := tx.QueryRow(sql, id) + row.Scan(&dump) + a.NoError(row.Err()) + + row = tx.QueryRow(sql, id) + row.Scan(&dump) + a.NoError(row.Err()) + + err = tx.Commit() + a.NoError(err) + + logs := logBuilder.String() + a.Contains(logs, "0_query") + a.Contains(logs, "0_query_op") + a.Contains(logs, "0_query_time_ms") + a.Contains(logs, "1_query") + a.Contains(logs, "1_query_op") + a.Contains(logs, "1_query_time_ms") +} + +func Test_SQL_Query_Tx_RowCtx(t *testing.T) { + a := assert.New(t) + + db, _ := ConnectTestDB(t) + + db.SetSlowQueryThreshold(0 * time.Millisecond) + + a.NotNil(db) + + id := AddRecord(t, db) + + sql := "SELECT * FROM moov.test WHERE id = ? LIMIT 1" + + tx, err := db.BeginTx(context.Background(), &gosql.TxOptions{}) + a.NoError(err) + + row := tx.QueryRowContext(context.Background(), sql, id) + a.NoError(row.Err()) + + err = tx.Commit() + a.NoError(err) +} + +func Test_SQL_Create(t *testing.T) { + a := assert.New(t) + + db, err := sql.ObserveDB(&gosql.DB{}, log.NewNopLogger(), "test1") + a.NoError(err) + a.NotNil(db) +} + +func Test_SQL_Monitor(t *testing.T) { + a := assert.New(t) + + a.NoError(sql.MeasureStats(&gosql.DB{}, "test1")) +} + +func Test_SQL_Monitor_Query(t *testing.T) { + done := sql.MeasureQuery(LazyNopLogger, time.Minute.Milliseconds(), "1", "tx", "select * from test", 0) + done() + + t.Run("slow query", func(t *testing.T) { + threshold := time.Second.Milliseconds() + require.Equal(t, int64(1000), threshold) + + buf, logger := log.NewBufferLogger() + lazyLogger := func() log.Logger { + return logger + } + + done = sql.MeasureQuery(lazyLogger, threshold, "2", "exec", "delete from things;", 0) + time.Sleep(250 * time.Millisecond) + done() + + fmt.Printf("\n\n%s\n", buf.String()) + buf.Reset() + + done = sql.MeasureQuery(lazyLogger, threshold, "2", "exec", "delete from things;", 0) + time.Sleep(900 * time.Millisecond) + done() + + done = sql.MeasureQuery(lazyLogger, threshold, "2", "exec", "delete from things;", 0) + time.Sleep(2 * time.Second) + done() + + fmt.Printf("\n\n%s\n", buf.String()) + }) +} + +func Test_SQL_Monitor_Error(t *testing.T) { + sql.MeasureError("1", errors.New("error!")) +} + +func LazyNopLogger() log.Logger { + return log.NewNopLogger() +} + +func ConnectTestDB(t *testing.T) (*sql.DB, *log.BufferedLogger) { + t.Helper() + open := func() (*gosql.DB, error) { + db, err := gosql.Open("mysql", "moov:moov@tcp(localhost:3306)/") + if err != nil { + return nil, err + } + + if err := db.Ping(); err != nil { + db.Close() + return nil, err + } + + return db, nil + } + + db, err := open() + for i := 0; err != nil && i < 22; i++ { + time.Sleep(time.Second * 1) + db, err = open() + } + if err != nil { + t.Fatal(err) + } + + lines, logger := log.NewBufferLogger() + + odb, err := sql.ObserveDB(db, logger, "test") + if err != nil { + t.Fatal(err) + } + + odb.SetSlowQueryThreshold(0) + + t.Cleanup(func() { + db.Close() + }) + + createTable := ` + CREATE TABLE IF NOT EXISTS moov.test ( + id VARCHAR(36) NOT NULL, + value VARCHAR(255), + + CONSTRAINT connection_pk PRIMARY KEY (id) + ) + ` + + _, err = odb.Exec(createTable) + if err != nil { + t.Fatal(err) + } + + return odb, lines +} + +func AddRecord(t *testing.T, db *sql.DB) string { + t.Helper() + // Add a record + id := uuid.NewString() + sql := "INSERT INTO moov.test(id, value) VALUES (?, ?)" + _, err := db.Exec(sql, id, uuid.NewString()) + if err != nil { + assert.NoError(t, err) + } + + return id +} + +func Test_CleanQuery(t *testing.T) { + query := ` + SELECT * + FROM sometable + WHERE sometable.field = ? + ` + + cleaned := sql.CleanQuery(query) + + assert.Equal(t, `SELECT * FROM sometable WHERE sometable.field = ?`, cleaned) +} diff --git a/sql/stmt.go b/sql/stmt.go new file mode 100644 index 00000000..93ba454f --- /dev/null +++ b/sql/stmt.go @@ -0,0 +1,133 @@ +package sql + +import ( + "context" + gosql "database/sql" + + "github.com/moov-io/base/log" +) + +type Stmt struct { + logger log.Logger + + id string + + slowQueryThresholdMs int64 + + query string + ss *gosql.Stmt +} + +func newStmt(ctx context.Context, logger log.Logger, db *gosql.DB, query, id string, slowQueryThresholdMs int64) (*Stmt, error) { + // This statement is closed by (*Stmt).Close() and the responsibility of callers. + // We want to keep the *gosql.Stmt alive + ss, err := db.PrepareContext(ctx, query) + if err != nil { + return nil, err + } + return newWrappedStmt(logger, ss, query, id, slowQueryThresholdMs) +} + +func newTxStmt(ctx context.Context, logger log.Logger, tx *gosql.Tx, query, id string, slowQueryThresholdMs int64) (*Stmt, error) { + // This statement is closed by (*Stmt).Close() and the responsibility of callers. + // We want to keep the *gosql.Stmt alive + ss, err := tx.PrepareContext(ctx, query) + if err != nil { + return nil, err + } + return newWrappedStmt(logger, ss, query, id, slowQueryThresholdMs) +} + +func newWrappedStmt(logger log.Logger, ss *gosql.Stmt, query, id string, slowQueryThresholdMs int64) (*Stmt, error) { + return &Stmt{ + logger: logger, + id: id, + query: query, + ss: ss, + + slowQueryThresholdMs: slowQueryThresholdMs, + }, nil +} + +func (s *Stmt) lazyLogger() log.Logger { + return s.logger +} + +func (s *Stmt) start(op string, qry string, args int) func() int64 { + return MeasureQuery(s.lazyLogger, s.slowQueryThresholdMs, s.id, op, qry, args) +} + +func (s *Stmt) error(err error) error { + return MeasureError(s.id, err) +} + +func (s *Stmt) Close() error { + if s != nil && s.ss != nil { + return s.ss.Close() + } + return nil +} + +func (s *Stmt) Exec(args ...any) (gosql.Result, error) { + done := s.start("exec", s.query, len(args)) + defer done() + + r, err := s.ss.Exec(args...) + return r, s.error(err) +} + +func (s *Stmt) ExecContext(ctx context.Context, args ...any) (gosql.Result, error) { + done := s.start("exec", s.query, len(args)) + ctx, end := span(ctx, s.id, "exec", s.query, len(args)) + defer func() { + end() + done() + }() + + r, err := s.ss.ExecContext(ctx, args...) + return r, s.error(err) +} + +func (s *Stmt) Query(args ...any) (*gosql.Rows, error) { + done := s.start("query", s.query, len(args)) + defer done() + + r, err := s.ss.Query(args...) + return r, s.error(err) +} + +func (s *Stmt) QueryContext(ctx context.Context, args ...any) (*gosql.Rows, error) { + done := s.start("query", s.query, len(args)) + ctx, end := span(ctx, s.id, "query", s.query, len(args)) + defer func() { + end() + done() + }() + + r, err := s.ss.QueryContext(ctx, args...) + return r, s.error(err) +} + +func (s *Stmt) QueryRow(args ...any) *gosql.Row { + done := s.start("query-row", s.query, len(args)) + defer done() + + r := s.ss.QueryRow(args...) + s.error(r.Err()) + + return r +} + +func (s *Stmt) QueryRowContext(ctx context.Context, args ...any) *gosql.Row { + done := s.start("query-row", s.query, len(args)) + ctx, end := span(ctx, s.id, "query-row", s.query, len(args)) + defer func() { + end() + done() + }() + + r := s.ss.QueryRowContext(ctx, args...) + s.error(r.Err()) + + return r +} diff --git a/sql/tx.go b/sql/tx.go new file mode 100644 index 00000000..abe81bcf --- /dev/null +++ b/sql/tx.go @@ -0,0 +1,153 @@ +package sql + +import ( + "context" + "database/sql" + gosql "database/sql" + "fmt" + "time" + + "github.com/moov-io/base/log" +) + +type Tx struct { + *gosql.Tx + + logger log.Logger + + id string + done func() int64 + ctx context.Context + + slowQueryThresholdMs int64 + + queries []ranQuery +} + +type ranQuery struct { + op string + qry string + dur int64 + args int +} + +func (w *Tx) lazyLogger() log.Logger { + return w.logger +} + +func (w *Tx) Context() context.Context { + return w.ctx +} + +func (w *Tx) start(op string, query string, args int) func() int64 { + _, end := span(w.ctx, w.id, op, query, args) + + s := time.Now().UnixMilli() + return func() int64 { + end() + d := time.Now().UnixMilli() - s + + w.queries = append(w.queries, ranQuery{ + op: op, + qry: query, + dur: d, + args: args, + }) + + return d + } +} + +func (w *Tx) error(err error) error { + return MeasureError(w.id, err) +} + +func (w *Tx) Commit() error { + defer w.finished() + return w.error(w.Tx.Commit()) +} + +func (w *Tx) Rollback() error { + defer w.finished() + return w.error(w.Tx.Rollback()) +} + +func (w *Tx) finished() { + w.logger = w.logger.With(log.Fields{ + "query_id": log.String(w.id), + "query_cnt": log.Int(len(w.queries)), + }) + + for i, q := range w.queries { + if i < 7 { + pre := fmt.Sprintf("%d_", i) + w.logger = w.logger.With(log.Fields{ + pre + "query": log.String(CleanQuery(q.qry)), + pre + "query_op": log.String(q.op), + pre + "query_time_ms": log.Int64(q.dur), + pre + "query_args": log.Int(q.args), + }) + } + } + + w.done() +} + +func (w *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + done := w.start("exec", query, len(args)) + defer done() + + r, err := w.Tx.ExecContext(ctx, query, args...) + return r, w.error(err) +} + +func (w *Tx) Exec(query string, args ...interface{}) (sql.Result, error) { + done := w.start("exec", query, len(args)) + defer done() + + r, err := w.Tx.Exec(query, args...) + return r, w.error(err) +} + +func (w *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + done := w.start("query", query, len(args)) + defer done() + + r, err := w.Tx.QueryContext(ctx, query, args...) + return r, w.error(err) +} + +func (w *Tx) Query(query string, args ...interface{}) (*sql.Rows, error) { + done := w.start("query", query, len(args)) + defer done() + + r, err := w.Tx.Query(query, args...) + return r, w.error(err) +} + +func (w *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row { + done := w.start("query-row", query, len(args)) + defer done() + + r := w.Tx.QueryRowContext(ctx, query, args...) + w.error(r.Err()) + + return r +} + +func (w *Tx) QueryRow(query string, args ...interface{}) *sql.Row { + done := w.start("query-row", query, len(args)) + defer done() + + r := w.Tx.QueryRow(query, args...) + w.error(r.Err()) + + return r +} + +func (w *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) { + done := w.start("prepare", query, 0) + defer done() + + return newTxStmt(ctx, w.logger, w.Tx, query, w.id, w.slowQueryThresholdMs) +} diff --git a/telemetry/README.md b/telemetry/README.md new file mode 100644 index 00000000..2af52210 --- /dev/null +++ b/telemetry/README.md @@ -0,0 +1,125 @@ +# Telemetry + +This package acts to wire up the various components of OpenTelemetry transparently to Moov developers. +As such, basic understanding of how OpenTelemetry project works is critical to understanding of this library. +The documentation can be found here: https://opentelemetry.io/docs/instrumentation/go/ + +------------------ +### OpenTelemetry Purpose +To allow users, especially of distributed systems, to track execution through the stack via a single Trace that is composed of a series of Spans. +The Trace will persist from ingress in the system until processing is complete. Traces are seperated by spans, tht are typically started and stopped at microservice boundaries but can also be started/stopped anywhere the client chooses. Spans started within other spans have a praent child relationship to the span than they were derived from. + +OpenTelemetry not only lets you trace execution, allowing you to delineate functional locks of the overarching execution via spans, but also allows you to include meta-data such as attibutes and baggage (both covered below) to ensure the metrics being generated contain adequate information to properly query from. + +------------------ +### OpenTelemetry Components + +**Attributes**: Key/value pairs associated with a span, can be added at any time during execution. Attaching attributes to span is one of the most common ways we as Moov developers will add tracing information. + +**Context**: While technically not an OpenTelemetry construct, it is very important to understand that this is how spans are propagated (they are embedded inside the context, and passed across microservice boundaries via baggage). + +**Baggage**: A bucket for metadata that is passed along the entire Trace. Library warns that any data you pass in this you should expect to be visible and to be careful. Can somewhat be thought of Attributes associated with a Trace instead of a Span. Baggage is the means that transfers span information allowing it to continue through all the microservices. Baggage is added to all internal API requests and to the producing of events. + +**Exporter**: Sends spans (that have been sent to it via a batch processor) to the system to be recorded. Two most common variants are the stdoutexporter and the gRPC exporter. Only used by this code base's boilerplate. + +**Instrumentation Libraries**: Pre-made libraries that provide quality of life instrumentation, these typically cover auxiliary libraries such as kafka or net/http. + +**Propagators**: Establishes the technique to pass Spans across microservice boundaries. Example below allows the Baggage to be included in the context that is marshalled/unmarshalled across boundaries. Only used by this code base's boilerplate. + +**Resource**: Represents metadata about the physical device the code is executing on. Only used by this code base's boilerplate. Example below: + +**Sampler**: Determines what percent of spans are recorded. You can always send, never send, or any ratio between. Only used by this code base's boilerplate. + +**Span**: The main unit of work in OpenTelemetry, and the API Moov developers need to be most familiar with, spans delineates a functional block of execution. Creating a span from a tracer accepts a ctx, if that context already contains a span the new span becomes the child of that span. Contains an API that allows adding attributes, span status, and error details. +``` +oneotel.Tracer(packageName).Start(ctx, “SpanName”) +``` + +**SpanProcessor**: Receives span information and is the pipeline that provides it to the exporter. It is configured within the TraceProvider, multiple SpanProcessors can be configured. BatchSpanProcessor which sends telemetry in batches and should be used for production applications (use NewBatchSpanProcessor). SimpleSpanProcessor is a non-production debugging processor (use NewSimpleSpanProcessor) + +**Tracer**: Created from its factory object TracerProvider and provided a name. By OpenTelemetry standards this name should match the package name, but we do not follow this explicitly. Tracers are factories for Spans, and the main purpose of the Tracer is to associate the spans it spawns with the correct package name. + +**TracerProvider**: The "glue" of OpenTelemetry, you define a factory and provide it with Sampler, Batcher, A factory of Tracers, defined only one time in an application. Accessed globally via the following: +otel.SetTracerProvider(tp) +otel.GetTracerProvider() + +------------------ +### OpenTelemetry "Putting it all Together" + +To configure a system to generate and export spans the following steps must be done: + +1. Define your exporter, where do you want your data to go? In this package, honey.go, stdout.go and collector.go all store methods to create Exporters. The beginning blocks of logic in SetupTelemetry determines which one should be used based on configuration values. Honeycomb exporter shown as example: +```go +opts := []otlptracegrpc.Option{ + otlptracegrpc.WithCompressor("gzip"), + otlptracegrpc.WithEndpoint(config.URL), + otlptracegrpc.WithHeaders(map[string]string{ + "x-honeycomb-team": config.Team, + }), + otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, "")), +} + +client := otlptracegrpc.NewClient(opts...) +return otlptrace.New(ctx, client) +``` +2. Define your TracerProvider, typically this will require you to define the other constructs TracerProvider uses at the same time: +```go +resource := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(config.ServiceName), + semconv.ServiceVersionKey.String(version), +) + +return trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(exp, + trace.WithMaxQueueSize(3*trace.DefaultMaxQueueSize), + trace.WithMaxExportBatchSize(3*trace.DefaultMaxExportBatchSize), + trace.WithBatchTimeout(5*time.Second), + ), + trace.WithResource(resource), +) +``` +3. Globally set the defined TracerProvider: +```go +otel.SetTracerProvider(tp) +``` +4. Allow Trace information to be propagated across microservice boundaries: +```go +otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, +)) +``` +5. Acquire a Tracer from the TracerProvider, and start a Span wrapping your desired block of execution* +[IMPORTANT NOTE]: Our libraries (usually) automatically start spans for you: + 1. Kakfa Consumers: [start a span for processing each consumed event](https://github.com/moovfinancial/events/blob/82ed357686f9bc920568274828a36dffc1f37fb4/go/consumer/processor_event.go#L137). + 2. Kafka Producers: [start a span for producing each event](https://github.com/moovfinancial/events/blob/ef809bc0d63f3ddec07f39da47edad3e39145dab/go/producer/producer.go#L170) + 3. HTTPS Endpoints: [start a Span for each request](https://github.com/moovfinancial/go-zero-trust/blob/v2/pkg/middleware/middleware.go#L50) and are protected by the go-zero-trust-middleware. +6. Add attributes, state code, or error messages at any location during a span, acquiring the span from the context if needed: +```go +span := telemetry.SpanFromContext(r.Context()) +attributes := []attribute.KeyValue{ + attribute.String("account_id", accountID), + attribute.String("mode", claims.CallingAccountMode.String()), +} +if claims.CallingAccountID != nil { + attributes = append(attributes, attribute.String("mode", *claims.CallingAccountID)) +} +span.SetAttributes(attributes...) +``` +------------------ +#### Example of how HTTPS endpoints gain tracing via Zero-Trust library +```go + if env.ZeroTrustMiddleware == nil { + gatewayMiddleware, err := middleware.NewServerFromConfig(env.Logger, env.TimeService, env.Config.Gateway) + if err != nil { + return nil, env.Logger.Fatal().LogErrorf("failed to startup Gateway middleware: %w", err).Err() + } + env.ZeroTrustMiddleware = gatewayMiddleware.Handler + } +``` +followed by attaching it to your router of choice: +```go + env.PublicRouter.Use(env.ZeroTrustMiddleware) +``` diff --git a/telemetry/attributes.go b/telemetry/attributes.go new file mode 100644 index 00000000..c159c06d --- /dev/null +++ b/telemetry/attributes.go @@ -0,0 +1,228 @@ +package telemetry + +import ( + "fmt" + "reflect" + "strings" + "time" + + "go.opentelemetry.io/otel/attribute" +) + +const DropSpanKey = "span.drop" + +// DropSpan informs the sampler to skip this event if theirs no links tied to it. +func DropSpan() attribute.KeyValue { + return attribute.Bool(DropSpanKey, true) +} + +const MoovKnownIssueKey = "moov.known_issue" + +// AttributeMoovKnownIssue is an attribute to mark a trace as a previously observed issue. +// IMPORTANT: if a trace has this attribute it will NOT fire a critical PD alert defined in +// https://github.com/moovfinancial/infra/blob/master/terraform-modules/apps/go-service/honeycomb.tf#L42 +func AttributeMoovKnownIssue() attribute.KeyValue { + return attribute.Bool(MoovKnownIssueKey, true) +} + +// StructAttributes creates an attribute.KeyValue for each field in the struct that has an "otel" tag defined. +// Nested structs will also be included, with attribute names formatted as "parent_attribute.nested_field_attribute". +func StructAttributes(s interface{}) (kv []attribute.KeyValue) { + rVal := reflect.ValueOf(s) + if !rVal.IsValid() { + return kv // ignore values that can't be handled by reflection + } + + return structAttributes(rVal, "") // no prefix for top level +} + +func structAttributes(rVal reflect.Value, prefix string) (kv []attribute.KeyValue) { + defer func() { // recover from panics + if recovered := recover(); recovered != nil { + return + } + }() + + // if rVal is an interface or pointer, get the underlying value + if rVal.Kind() == reflect.Interface || rVal.Kind() == reflect.Pointer { + if rVal.IsNil() { + return kv + } + // get the underlying value from this interface or pointer + rVal = rVal.Elem() + } + + if rVal.Kind() != reflect.Struct { + return kv // only structs have tags to parse + } + + for i := 0; i < rVal.NumField(); i++ { + field := rVal.Type().Field(i) + // skip non-exported fields + if !field.IsExported() && !field.Anonymous { + continue // exported fields only + } + + otelTag := parseTag(prefix, field.Tag) + if otelTag == nil { + continue + } + + kv = append(kv, createAttributes(rVal.Field(i), otelTag)...) + } + + return kv +} + +type otelTag struct { + attributeName string + omitEmpty bool +} + +func parseTag(prefix string, stag reflect.StructTag) *otelTag { + tagParts := strings.Split(stag.Get(AttributeTag), ",") + attributeName := tagParts[0] // strings.Split is guaranteed to always return at least 1 element + if attributeName == "" { + return nil + } + + if prefix != "" { + attributeName = fmt.Sprintf("%s.%s", prefix, attributeName) + } + + omitEmpty := false + if len(tagParts) > 1 && tagParts[1] == "omitempty" { + omitEmpty = true + } + + return &otelTag{ + attributeName: attributeName, + omitEmpty: omitEmpty, + } +} + +type stringValuer interface { + Value() string +} + +type intValuer interface { + Value() int +} + +func createAttributes(val reflect.Value, tag *otelTag) (kv []attribute.KeyValue) { + if tag.omitEmpty && val.IsZero() { + return kv + } + + switch val.Kind() { + case reflect.String: + kv = append(kv, attribute.String(tag.attributeName, val.String())) + case reflect.Bool: + kv = append(kv, attribute.Bool(tag.attributeName, val.Bool())) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + kv = append(kv, attribute.Int64(tag.attributeName, val.Int())) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + kv = append(kv, attribute.Int64(tag.attributeName, int64(val.Uint()))) + case reflect.Float32, reflect.Float64: + kv = append(kv, attribute.Float64(tag.attributeName, val.Float())) + case reflect.Complex64, reflect.Complex128: + complexVal := val.Complex() + kv = append(kv, + attribute.Float64(tag.attributeName+"_real", real(complexVal)), + attribute.Float64(tag.attributeName+"_imag", imag(complexVal)), + ) + case reflect.Map: + // check map key type + if val.Len() == 0 || len(val.MapKeys()) == 0 { + break + } + keyKind := val.MapKeys()[0].Kind() + if keyKind == reflect.Pointer { + keyKind = val.MapKeys()[0].Elem().Kind() + } + + if !supportedMapKeyKinds[keyKind] { + break + } + + mapIter := val.MapRange() + count := 0 + for mapIter.Next() { + // only support simple map key types + if count == MaxArrayAttributes { + break + } + + key := mapIter.Key() + if key.Kind() == reflect.Pointer { + key = key.Elem() + } + + kv = append(kv, createAttributes(mapIter.Value(), &otelTag{ + attributeName: fmt.Sprintf("%s.%v", tag.attributeName, key.Interface()), + omitEmpty: false, + })...) + count++ + } + case reflect.Array, reflect.Slice: + for i := 0; i < val.Len(); i++ { + if i == MaxArrayAttributes { + break + } + + kv = append(kv, createAttributes(val.Index(i), &otelTag{ + attributeName: fmt.Sprintf("%s.%d", tag.attributeName, i), + omitEmpty: false, + })...) + } + case reflect.Struct: + // if this is a non-zero time.Time, format as string and append to attributes + if t, ok := val.Interface().(time.Time); ok && !t.IsZero() { + kv = append(kv, attribute.String(tag.attributeName, t.Format(time.RFC3339))) + } else if t, ok := val.Interface().(stringValuer); ok { + stringValue := t.Value() + kv = append(kv, attribute.String(tag.attributeName, stringValue)) + } else if t, ok := val.Interface().(intValuer); ok { + intValue := t.Value() + kv = append(kv, attribute.Int(tag.attributeName, intValue)) + } else { // otherwise recursively handle the struct + kv = append(kv, structAttributes(val, tag.attributeName)...) + } + case reflect.Pointer: + // before we check the value element, see if the pointer implements any of + // our known interfaces + if t, ok := val.Interface().(stringValuer); ok { + stringValue := t.Value() + kv = append(kv, attribute.String(tag.attributeName, stringValue)) + } else if t, ok := val.Interface().(intValuer); ok { + intValue := t.Value() + kv = append(kv, attribute.Int(tag.attributeName, intValue)) + } else { + kv = append(kv, createAttributes(val.Elem(), tag)...) + } + case reflect.Chan, reflect.Func, reflect.Interface, reflect.Uintptr, + reflect.UnsafePointer, reflect.Invalid: + return // not supported + } + + return kv +} + +// supportedMapKeyKinds defines the map key types that are supported for attribute names. +// Only types that can be easily represented as strings are allowed to be used in attribute names. +var supportedMapKeyKinds = map[reflect.Kind]bool{ + reflect.Bool: true, + reflect.Int: true, + reflect.Int8: true, + reflect.Int16: true, + reflect.Int32: true, + reflect.Int64: true, + reflect.Uint: true, + reflect.Uint8: true, + reflect.Uint16: true, + reflect.Uint32: true, + reflect.Uint64: true, + reflect.Float32: true, + reflect.Float64: true, + reflect.String: true, +} diff --git a/telemetry/attributes_test.go b/telemetry/attributes_test.go new file mode 100644 index 00000000..5b5df927 --- /dev/null +++ b/telemetry/attributes_test.go @@ -0,0 +1,738 @@ +package telemetry_test + +import ( + "encoding/json" + "encoding/xml" + "fmt" + "math" + "testing" + "time" + + "github.com/moov-io/base/telemetry" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" +) + +func TestStructAttributes_inputTypes(t *testing.T) { + type s struct { + Field string `otel:"field,omitempty"` + } + + // nil pointer to struct doesn't cause panic + var m *s + require.NotPanics(t, func() { + require.Empty(t, telemetry.StructAttributes(m)) + }) + + // non-struct types don't cause panic + require.NotPanics(t, func() { + require.Empty(t, telemetry.StructAttributes("abc")) + }) + + // zero value struct ignored + require.NotPanics(t, func() { + require.Empty(t, telemetry.StructAttributes(s{})) + require.Empty(t, telemetry.StructAttributes(&s{})) + }) + + // empty interfaces don't cause panic + require.NotPanics(t, func() { + var m interface{} + require.Empty(t, telemetry.StructAttributes(m)) + require.Empty(t, telemetry.StructAttributes(&m)) + }) +} + +func TestStructAttributes_strings(t *testing.T) { + str := "string" + type foo struct { + Field string `otel:"field"` + FieldPtr *string `otel:"field_ptr,omitempty"` + } + + // full model + require.Equal(t, + []attribute.KeyValue{ + attribute.String("field", "string"), + attribute.String("field_ptr", "string"), + }, + telemetry.StructAttributes(foo{Field: "string", FieldPtr: &str}), + ) + + // omitempty (nil ptr) + require.Equal(t, + []attribute.KeyValue{ + attribute.String("field", "string"), + }, + telemetry.StructAttributes(foo{Field: "string"}), + ) + + // omitempty (ptr to empty string) + emptyStr := "" + require.Equal(t, + []attribute.KeyValue{ + attribute.String("field", "string"), + }, + telemetry.StructAttributes(foo{ + Field: "string", + FieldPtr: &emptyStr, + }), + ) + + // includes empty string + got := telemetry.StructAttributes(foo{FieldPtr: &str}) + require.Equal(t, + []attribute.KeyValue{ + attribute.String("field", ""), + attribute.String("field_ptr", "string"), + }, + got, + ) +} + +type testString struct { + s string +} + +func (t testString) Value() string { + return t.s +} + +type testInt struct { + i int +} + +func (t testInt) Value() int { + return t.i +} + +type testStrPtr struct { + s string +} + +func (t *testStrPtr) Value() string { + return t.s +} + +type testIntPtr struct { + i int +} + +func (t *testIntPtr) Value() int { + return t.i +} + +func TestStructAttributes_stringValuerintValuer(t *testing.T) { + type test struct { + String testString `otel:"custom_string"` + StringPtr *testStrPtr `otel:"custom_string_ptr"` + Str *string `otel:"str_ptr"` + Number testInt `otel:"custom_number"` + NumberPtr *testIntPtr `otel:"custom_number_ptr"` + } + + sVal := "CustomValue" + intVal := 123 + + require.Equal(t, + []attribute.KeyValue{ + attribute.String("custom_string", sVal), + attribute.String("custom_string_ptr", sVal), + attribute.String("str_ptr", sVal), + attribute.Int("custom_number", intVal), + attribute.Int("custom_number_ptr", intVal), + }, + telemetry.StructAttributes(test{ + String: testString{sVal}, + StringPtr: &testStrPtr{sVal}, + Str: &sVal, + Number: testInt{intVal}, + NumberPtr: &testIntPtr{intVal}, + }), + ) +} + +func TestStructAttributes_int(t *testing.T) { + type foo struct { + Int int `otel:"int"` + IntPtr *int `otel:"int_ptr"` + Int8 int8 `otel:"int8"` + Int16 int16 `otel:"int16"` + Int32 int32 `otel:"int32"` + Int64 int64 `otel:"int64"` + OmitEmptyInt int `otel:"omit_empty_int,omitempty"` + } + intVal := 4 + + m := foo{ + Int: math.MaxInt, + IntPtr: &intVal, + Int8: math.MinInt8, + Int16: math.MaxInt16, + Int32: math.MaxInt32, + Int64: math.MaxInt64, + OmitEmptyInt: 0, // should be excluded + } + + require.ElementsMatch(t, + []attribute.KeyValue{ + attribute.Int64("int", math.MaxInt), + attribute.Int64("int_ptr", 4), + attribute.Int64("int8", math.MinInt8), + attribute.Int64("int16", math.MaxInt16), + attribute.Int64("int32", math.MaxInt32), + attribute.Int64("int64", math.MaxInt64), + }, + telemetry.StructAttributes(m), + ) +} + +func TestStructAttributes_uint(t *testing.T) { + type foo struct { + Uint uint `otel:"uint"` + UintPtr *uint `otel:"uint_ptr"` + Uint8 uint8 `otel:"uint8"` + Uint16 uint16 `otel:"uint16"` + Uint32 uint32 `otel:"uint32"` + Uint64 uint64 `otel:"uint64"` + OmitEmptyUint uint `otel:"omit_empty_uint,omitempty"` + } + + uintVal := uint(123) + + m := foo{ + Uint: 123, + UintPtr: &uintVal, + Uint8: 123, + Uint16: 123, + Uint32: 123, + Uint64: 123, + OmitEmptyUint: 0, // should be excluded + } + + require.ElementsMatch(t, + []attribute.KeyValue{ + attribute.Int64("uint", 123), + attribute.Int64("uint_ptr", 123), + attribute.Int64("uint8", 123), + attribute.Int64("uint16", 123), + attribute.Int64("uint32", 123), + attribute.Int64("uint64", 123), + }, + telemetry.StructAttributes(m), + ) +} + +func TestStructAttributes_float(t *testing.T) { + type foo struct { + Float32 float32 `otel:"float32"` + Float64 float64 `otel:"float64"` + FloatPtr *float64 `otel:"float_ptr"` + OmitEmptyFloat float32 `otel:"omit_empty_float,omitempty"` + } + + floatVal := 123.45 + + m := foo{ + Float32: 123.45, + Float64: -583.43, + FloatPtr: &floatVal, + OmitEmptyFloat: 0, + } + + require.ElementsMatch(t, + []attribute.KeyValue{ + attribute.Float64("float32", float64(m.Float32)), + attribute.Float64("float64", m.Float64), + attribute.Float64("float_ptr", floatVal), + }, + telemetry.StructAttributes(m), + ) +} + +func TestStructAttributes_complexNumbers(t *testing.T) { + type foo struct { + Complex64 complex64 `otel:"complex64"` + Complex128 complex128 `otel:"complex128"` + ComplexPtr *complex128 `otel:"complex_ptr"` + OmitEmptyComplex complex64 `otel:"omit_empty_complex,omitempty"` + } + + complexVal := complex(4, 6) + + m := foo{ + Complex64: complex64(complex(58.3, 12)), + Complex128: complex(2355.3, 3.2), + ComplexPtr: &complexVal, + OmitEmptyComplex: 0, + } + + require.ElementsMatch(t, + []attribute.KeyValue{ + attribute.Float64("complex64_real", float64(real(m.Complex64))), + attribute.Float64("complex64_imag", float64(imag(m.Complex64))), + attribute.Float64("complex128_real", real(m.Complex128)), + attribute.Float64("complex128_imag", imag(m.Complex128)), + attribute.Float64("complex_ptr_real", 4), + attribute.Float64("complex_ptr_imag", 6), + }, + telemetry.StructAttributes(m), + ) +} + +func TestStructAttributes_sliceAndArray(t *testing.T) { + type bar struct { + Field string `otel:"field,omitempty"` + Slice []string `otel:"slice"` + } + type foo struct { + Slice []int `otel:"slice"` + SlicePtr []*int `otel:"slice_ptr"` + StructSlice []bar `otel:"struct_slice"` + StructPtrSlice []*bar `otel:"struct_slice_ptr"` + Array [3]string `otel:"array"` + OmitEmptySlice []string `otel:"omit_empty_slice,omitempty"` + OmitEmptyArray [3]string `otel:"omit_empty_array,omitempty"` + } + intVal := 438 + + // full model + m := foo{ + Slice: []int{43, 82, -4}, + SlicePtr: []*int{&intVal, &intVal}, + StructSlice: []bar{ + { + Field: "field", + Slice: []string{"foo", "bar"}, + }, + { + Field: "field1", + Slice: []string{"foo1", "bar1"}, + }, + }, + StructPtrSlice: []*bar{ + { + Field: "fieldPtr", + Slice: []string{"fooPtr", "barPtr"}, + }, + { + Field: "", // omitempty + Slice: []string{"fooPtr1", "barPtr1"}, + }, + }, + Array: [3]string{"1", "2", "3"}, + OmitEmptySlice: []string{}, // should be omitted + OmitEmptyArray: [3]string{}, // should be omitted + } + require.Equal(t, + []attribute.KeyValue{ + // foo.Slice + attribute.Int64("slice.0", 43), + attribute.Int64("slice.1", 82), + attribute.Int64("slice.2", -4), + + // foo.SlicePtr + attribute.Int64("slice_ptr.0", int64(intVal)), + attribute.Int64("slice_ptr.1", int64(intVal)), + + // foo.StructSlice + attribute.String("struct_slice.0.field", "field"), + attribute.String("struct_slice.0.slice.0", "foo"), + attribute.String("struct_slice.0.slice.1", "bar"), + attribute.String("struct_slice.1.field", "field1"), + attribute.String("struct_slice.1.slice.0", "foo1"), + attribute.String("struct_slice.1.slice.1", "bar1"), + + // foo.StructPtrSlice + attribute.String("struct_slice_ptr.0.field", "fieldPtr"), + attribute.String("struct_slice_ptr.0.slice.0", "fooPtr"), + attribute.String("struct_slice_ptr.0.slice.1", "barPtr"), + attribute.String("struct_slice_ptr.1.slice.0", "fooPtr1"), + attribute.String("struct_slice_ptr.1.slice.1", "barPtr1"), + + // foo.Array + attribute.String("array.0", "1"), + attribute.String("array.1", "2"), + attribute.String("array.2", "3"), + }, + telemetry.StructAttributes(&m), + ) +} + +func TestStructAttributes_maps(t *testing.T) { + type foo struct { + Map map[string]string `otel:"map"` + IgnoredMap map[string]string + } + m := foo{ + Map: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + IgnoredMap: map[string]string{ + "ignoredKey1": "ignoredVal1", + "ignoredKey2": "ignoredVal2", + }, + } + + got := telemetry.StructAttributes(m) + require.Len(t, got, 2) + require.Contains(t, got, attribute.String("map.key1", "val1")) + require.Contains(t, got, attribute.String("map.key2", "val2")) + + gotPtr := telemetry.StructAttributes(&m) + require.ElementsMatch(t, got, gotPtr) + + // map of structs + type s struct { + Field string `otel:"field,omitempty"` + AnotherField string `otel:"another_field,omitempty"` + IgnoredField string + Struct struct { + StructField []int `otel:"struct_field,omitempty"` + } `otel:"struct,omitempty"` + } + type fooTheSecond struct { + Map map[int]s `otel:"map"` + } + m1 := fooTheSecond{ + Map: map[int]s{ + 0: { // make sure attributes are created for map of structs + Field: "found me", + AnotherField: "and me", + IgnoredField: "not me", + }, + 1: { // make sure omitempty is still honored + Field: "", + AnotherField: "", + IgnoredField: "", + Struct: struct { + StructField []int `otel:"struct_field,omitempty"` + }{[]int{4, 564, -4}}, + }, + }, + } + + got1 := telemetry.StructAttributes(m1) + require.Len(t, got1, 5) + require.Contains(t, got1, attribute.String("map.0.field", "found me")) + require.Contains(t, got1, attribute.String("map.0.another_field", "and me")) + require.Contains(t, got1, attribute.Int("map.1.struct.struct_field.0", 4)) + require.Contains(t, got1, attribute.Int("map.1.struct.struct_field.1", 564)) + require.Contains(t, got1, attribute.Int("map.1.struct.struct_field.2", -4)) + + // no attributes for empty struct containing maps + require.Empty(t, telemetry.StructAttributes(fooTheSecond{})) +} + +func TestStructAttributes_bool(t *testing.T) { + type foo struct { + Bool bool `otel:"bool"` + BoolPtr *bool `otel:"bool_ptr"` + OmitEmptyBool bool `otel:"omit_empty_bool,omitempty"` + } + boolVal := false + + require.Equal(t, + []attribute.KeyValue{ + attribute.Bool("bool", false), + attribute.Bool("bool_ptr", false), + }, + telemetry.StructAttributes(foo{Bool: false, BoolPtr: &boolVal}), + ) +} + +func TestStructAttributes_time(t *testing.T) { + type foo struct { + Time time.Time `otel:"time"` + TimePtr *time.Time `otel:"time_ptr"` + } + + // handles non-zero time and ptr to time + current := time.Now() + m := foo{ + Time: current, + TimePtr: ¤t, + } + got := telemetry.StructAttributes(m) + require.ElementsMatch(t, + []attribute.KeyValue{ + attribute.String("time", current.Format(time.RFC3339)), + attribute.String("time_ptr", current.Format(time.RFC3339)), + }, + got, + ) + // sanity check with ptr to struct + require.Equal(t, got, telemetry.StructAttributes(&m)) + + // handles zero value time and nil ptr + require.Empty(t, telemetry.StructAttributes(foo{})) + require.Empty(t, telemetry.StructAttributes(&foo{})) +} + +func TestStructAttributes_fieldsWithMultipleTags(t *testing.T) { + // make sure none of the tags interfere with each other + type foo struct { + Field string `json:"field" xml:"field" otel:"field"` + } + m := foo{ + Field: "value", + } + + require.Equal(t, + []attribute.KeyValue{attribute.String("field", "value")}, + telemetry.StructAttributes(m), + ) + + // sanity check json and xml to ensure the tags all still work as expected + js, err := json.Marshal(m) + require.NoError(t, err) + require.Equal(t, `{"field":"value"}`, string(js)) + + x, err := xml.Marshal(m) + require.NoError(t, err) + require.Equal(t, `value`, string(x)) +} + +func TestStructAttributes_maxLengthHonored(t *testing.T) { + type foo struct { + Map map[int]string `otel:"map"` + List []int `otel:"list"` + } + + numElements := telemetry.MaxArrayAttributes + 5 + + m := foo{ + Map: make(map[int]string), + List: make([]int, numElements), + } + for i := 0; i < numElements; i++ { + m.Map[i] = "bar" + m.List[i] = i + } + require.Len(t, m.Map, numElements) + require.Len(t, m.List, numElements) + + require.Len(t, telemetry.StructAttributes(m), telemetry.MaxArrayAttributes*2) // include map+list +} + +func TestStructAttributes_noAttributesForUntaggedFields(t *testing.T) { + type foo struct { + NoTag struct { + Field string + } + } + m := foo{ + NoTag: struct { + Field string + }{ + Field: "not included", + }, + } + require.Empty(t, telemetry.StructAttributes(m)) +} + +func TestStructAttributes_supportedDataTypes(t *testing.T) { + type nested struct { + NestedField complex128 `otel:"nested_field"` + Nestception struct { + NestceptionField string `otel:"nestception_field,omitempty"` + } `otel:"nestception,omitempty"` + } + type Embedded struct { + EmbeddedField string `otel:"embedded_field"` + } + + type model struct { + IgnoreMe string // no attribute should be added for this + ignoreMe string `otel:"ignore_me"` // no attribute should be added for this + BoolField bool `otel:"bool_field"` + BoolPtrField *bool `otel:"bool_ptr_field"` + IntField int `otel:"int_field"` + EmptyIntField int `otel:"empty_int_field"` + StrField string `otel:"str_field"` + StrPtrField *string `otel:"str_ptr_field"` + UintField uint32 `otel:"uint_field"` + MapStrInt *map[string]int `otel:"map_str_int"` + MapUintPtr map[uint32]*string `otel:"map_uint_ptr"` + SliceField []float64 `otel:"slice_field"` + SlicePtrField []*string `otel:"slice_ptr_field"` + FloatField float64 `otel:"float_field"` + AnonymousStruct struct { + Field1 string `otel:"field_1"` + Ignored string + } `otel:"anonymous_struct"` + Nested nested `otel:"nested_struct"` + *Embedded `otel:"embedded_struct"` + OmitMe string `otel:"omit_me,omitempty"` + FuncField func() `otel:"func_field"` // should be ignored even though it has an attribute tag + EmptyInterface interface{} `otel:"empty_interface"` // should be ignored even though it has an attribute tag + } + + boolVal := true + strVal := "strPtrField" + strVal1 := "strPtrField1" + m := model{ + IgnoreMe: "nothing to see here", + ignoreMe: "nothing to see here", + BoolField: true, + BoolPtrField: &boolVal, + StrField: "strField", + StrPtrField: &strVal, + IntField: 3, + UintField: uint32(53), + MapStrInt: &map[string]int{ + "foo": 21, + "bar": -5, + }, + MapUintPtr: map[uint32]*string{ + uint32(43): &strVal, + uint32(584459): &strVal, + }, + SliceField: []float64{458.32, -5483.6482}, + SlicePtrField: []*string{&strVal, &strVal1}, + FloatField: 4.483, + Nested: nested{ + NestedField: complex(3, 5), + }, + Embedded: &Embedded{EmbeddedField: "embedded"}, + FuncField: func() { + fmt.Println("hi") + }, + EmptyInterface: "hi", + } + m.AnonymousStruct.Field1 = "foo" + m.AnonymousStruct.Ignored = "foo" + m.Nested.Nestception.NestceptionField = "all the nesting" + + wantAttrs := []attribute.KeyValue{ + attribute.Bool("bool_field", true), + attribute.Bool("bool_ptr_field", true), + attribute.Int("int_field", 3), + attribute.Int("empty_int_field", 0), + attribute.String("str_field", "strField"), + attribute.String("str_ptr_field", "strPtrField"), + attribute.Int64("uint_field", 53), + attribute.Int64("map_str_int.foo", 21), + attribute.Int64("map_str_int.bar", -5), + attribute.String("map_uint_ptr.43", "strPtrField"), + attribute.String("map_uint_ptr.584459", "strPtrField"), + attribute.Float64("slice_field.0", 458.32), + attribute.Float64("slice_field.1", -5483.6482), + attribute.String("slice_ptr_field.0", "strPtrField"), + attribute.String("slice_ptr_field.1", "strPtrField1"), + attribute.Float64("float_field", 4.483), + attribute.String("anonymous_struct.field_1", "foo"), + attribute.Float64("nested_struct.nested_field_real", 3), + attribute.Float64("nested_struct.nested_field_imag", 5), + attribute.String("nested_struct.nestception.nestception_field", "all the nesting"), + attribute.String("embedded_struct.embedded_field", "embedded"), + } + + require.ElementsMatch(t, wantAttrs, telemetry.StructAttributes(m)) + require.ElementsMatch(t, wantAttrs, telemetry.StructAttributes(&m)) +} + +func TestStructAttributes_nestedStructs(t *testing.T) { + type s3 struct { + Field string `otel:"field"` + } + type s2 struct { + Field string `otel:"field"` + S3Struct s3 `otel:"s3"` + OmitS3Struct s3 `otel:"empty_s3,omitempty"` + } + type s1 struct { + Field []*s3 `otel:"s3_slice"` + S2Struct *s2 `otel:"s2"` + } + type s struct { + S1Struct s1 `otel:"s1,omitempty"` + } + + m := s{ + S1Struct: s1{ + Field: []*s3{ + { + Field: "first entry", + }, + { + Field: "second entry", + }, + }, + S2Struct: &s2{ + Field: "s2Field", + S3Struct: s3{ + Field: "s3Field", + }, + OmitS3Struct: s3{}, // should omit zero value + }, + }, + } + + got := telemetry.StructAttributes(&m) + + require.Contains(t, got, attribute.String("s1.s3_slice.0.field", "first entry")) + require.Contains(t, got, attribute.String("s1.s3_slice.1.field", "second entry")) + require.Contains(t, got, attribute.String("s1.s2.field", "s2Field")) + require.Contains(t, got, attribute.String("s1.s2.s3.field", "s3Field")) + require.Len(t, got, 4) + + // no attributes for zero value struct containing nested structs + require.Empty(t, telemetry.StructAttributes(s{})) +} + +func TestStructAttributes_allowedMapKeys(t *testing.T) { + type structKey struct { + Field string `otel:"field"` + } + stringKey := "stringKey" + var interfaceKey interface{} = "key" + type s struct { + // supported + BoolMap map[bool]string `otel:"bool_map"` + UintMap map[uint32]string `otel:"uint_map"` + FloatMap map[float64]string `otel:"float_map"` + StringMap map[string]string `otel:"string_map"` + PointerStringMap map[*string]string `otel:"pointer_string_map"` + + // not supported + StructMap map[structKey]string `otel:"struct_map"` + PointerStructMap map[*structKey]string `otel:"pointer_struct_map"` + InterfaceMap map[interface{}]string `otel:"interface_map"` + } + + got := telemetry.StructAttributes(s{ + BoolMap: map[bool]string{ + true: "true", + }, + UintMap: map[uint32]string{ + 8: "uint", + }, + FloatMap: map[float64]string{ + 4.95: "float", + }, + StringMap: map[string]string{ + "string": "string", + }, + PointerStringMap: map[*string]string{ + &stringKey: "string", + }, + StructMap: map[structKey]string{ + {Field: "ignore_this_key"}: "ignore me", + }, + PointerStructMap: map[*structKey]string{ + {Field: "ignore_this_key"}: "ignore me", + }, + InterfaceMap: map[interface{}]string{ + interfaceKey: "string", + }, + }) + require.Len(t, got, 5) + require.Contains(t, got, attribute.String("bool_map.true", "true")) + require.Contains(t, got, attribute.String("uint_map.8", "uint")) + require.Contains(t, got, attribute.String("float_map.4.95", "float")) + require.Contains(t, got, attribute.String("string_map.string", "string")) + require.Contains(t, got, attribute.String("pointer_string_map.stringKey", "string")) +} diff --git a/telemetry/collector.go b/telemetry/collector.go new file mode 100644 index 00000000..5efbf9bf --- /dev/null +++ b/telemetry/collector.go @@ -0,0 +1,32 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + + // Add in gzip + "google.golang.org/grpc/credentials" + _ "google.golang.org/grpc/encoding/gzip" +) + +type OtelConfig struct { + Host string + TLS bool +} + +func newOpenTelementyCollectorExporter(ctx context.Context, config OtelConfig) (*otlptrace.Exporter, error) { + opts := []otlptracegrpc.Option{ + otlptracegrpc.WithEndpoint(config.Host), + } + + if config.TLS { + opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, ""))) + } else { + opts = append(opts, otlptracegrpc.WithInsecure()) + } + + client := otlptracegrpc.NewClient(opts...) + return otlptrace.New(ctx, client) +} diff --git a/telemetry/config.go b/telemetry/config.go new file mode 100644 index 00000000..64efc177 --- /dev/null +++ b/telemetry/config.go @@ -0,0 +1,160 @@ +package telemetry + +import ( + "context" + "io" + "os" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.7.0" + "go.opentelemetry.io/otel/trace" +) + +type Config struct { + ServiceName string + ServiceNamespace *string + Stdout bool + OpenTelemetryCollector *OtelConfig + Honeycomb *HoneycombConfig + + // Allows for testing of the output of telemetry without affecting use with config files + testWriter io.Writer +} + +// Allows for testing where the output of the traces are sent to a io.Writer instance. +func TestConfig(w io.Writer) Config { + return Config{ + ServiceName: "test-service", + testWriter: w, + } +} + +type ShutdownFunc func() error + +var NoopShutdown ShutdownFunc = func() error { + return nil +} + +func SetupTelemetry(ctx context.Context, config Config, version string) (ShutdownFunc, error) { + var ( + err error + exp tracesdk.SpanExporter + ) + + if config.testWriter != nil { + exp, err = newJsonExporter(config.testWriter) + if err != nil { + return NoopShutdown, err + } + + } else if isOtelEnvironmentSet() { + exp, err = newOtelExporterFromEnvironment(ctx) + if err != nil { + return NoopShutdown, err + } + + } else if isHoneycombEnvironmentSet() { + exp, err = newHoneycombExporterFromEnvironment(ctx) + if err != nil { + return NoopShutdown, err + } + + } else if config.Stdout { + exp, err = newStdoutExporter() + if err != nil { + return NoopShutdown, err + } + + } else if config.OpenTelemetryCollector != nil { + exp, err = newOpenTelementyCollectorExporter(ctx, *config.OpenTelemetryCollector) + if err != nil { + return NoopShutdown, err + } + + } else if config.Honeycomb != nil { + exp, err = newHoneycombExporterFromConfig(ctx, *config.Honeycomb) + if err != nil { + return NoopShutdown, err + } + } + + // Make sure something is set for the exporter + if exp == nil { + exp, err = newDiscardExporter() + if err != nil { + return NoopShutdown, err + } + } + + tp := newTraceProvider(exp, config, version) + + otel.SetTracerProvider(tp) + + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + return func() error { + ctx := context.Background() + tp.ForceFlush(ctx) + return tp.Shutdown(ctx) + }, nil +} + +func newTraceProvider(exp tracesdk.SpanExporter, config Config, version string) TracerProvider { + if config.ServiceName == "" { + config.ServiceName = os.Getenv("MOOV_SERVICE_NAME") + } + + if config.ServiceNamespace == nil || *config.ServiceNamespace == "" { + ns := os.Getenv("MOOV_SERVICE_NAMESPACE") + config.ServiceNamespace = &ns + } + + // Wrap it so we can filter out useless traces from consuming + exp = NewFilteredExporter(exp) + + batcher := tracesdk.WithBatcher(exp, + tracesdk.WithMaxQueueSize(3*tracesdk.DefaultMaxQueueSize), + tracesdk.WithMaxExportBatchSize(3*tracesdk.DefaultMaxExportBatchSize), + tracesdk.WithBatchTimeout(5*time.Second), + ) + + // If we're using the testWriter we want to make sure its not buffering anything in the background + if config.testWriter != nil { + batcher = tracesdk.WithSyncer(exp) + } + + resource := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(config.ServiceName), + semconv.ServiceVersionKey.String(version), + semconv.ServiceNamespaceKey.String(*config.ServiceNamespace), + ) + + tp := tracesdk.NewTracerProvider( + tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.AlwaysSample())), + batcher, + tracesdk.WithResource(resource), + ) + + return &tracerProvider{ + TracerProvider: tp, + } +} + +type TracerProvider interface { + trace.TracerProvider + + ForceFlush(ctx context.Context) error + Shutdown(ctx context.Context) error +} + +type tracerProvider struct { + *tracesdk.TracerProvider +} diff --git a/telemetry/config_test.go b/telemetry/config_test.go new file mode 100644 index 00000000..1adeb81f --- /dev/null +++ b/telemetry/config_test.go @@ -0,0 +1,93 @@ +package telemetry_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/moov-io/base/telemetry" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" +) + +func Test_Setup_Honey(t *testing.T) { + shutdown, err := telemetry.SetupTelemetry(context.Background(), telemetry.Config{ + ServiceName: "test", + Honeycomb: &telemetry.HoneycombConfig{ + URL: "api.honeycomb.io:443", + Team: "HoneycombAPIKey", + }, + }, "v0.0.1") + + require.NoError(t, err) + + err = shutdown() + require.NoError(t, err) +} + +func Test_Setup_Otel(t *testing.T) { + shutdown, err := telemetry.SetupTelemetry(context.Background(), telemetry.Config{ + ServiceName: "test", + OpenTelemetryCollector: &telemetry.OtelConfig{ + Host: "collector", + }, + }, "v0.0.1") + + require.NoError(t, err) + + err = shutdown() + require.NoError(t, err) +} + +func Test_Setup_Stdout(t *testing.T) { + shutdown, err := telemetry.SetupTelemetry(context.Background(), telemetry.Config{ + ServiceName: "test", + Stdout: true, + }, "v0.0.1") + + require.NoError(t, err) + + _, spn := telemetry.StartSpan(context.Background(), "test") + spn.AddEvent("added an event!") + spn.End() + + err = shutdown() + require.NoError(t, err) +} + +func Test_Keeping_Consumers(t *testing.T) { + buf := setupTelemetry(t) + + _, spn := telemetry.StartSpan(context.Background(), "consuming", trace.WithSpanKind(trace.SpanKindConsumer)) + time.Sleep(5 * time.Millisecond) + spn.End() + + require.Contains(t, buf.String(), `"Name":"consuming"`) +} + +func Test_Dropping_Empty_Consumers(t *testing.T) { + buf := setupTelemetry(t) + + _, spn := telemetry.StartSpan(context.Background(), "consuming", trace.WithSpanKind(trace.SpanKindConsumer)) + // instantaneously returns + spn.End() + + require.NotContains(t, buf.String(), "consuming") +} + +func setupTelemetry(t *testing.T) *bytes.Buffer { + t.Helper() + buf := &bytes.Buffer{} + config := telemetry.TestConfig(buf) + + shutdown, err := telemetry.SetupTelemetry(context.Background(), config, "v0.0.1") + t.Cleanup(func() { + shutdown() + }) + + require.NoError(t, err) + + return buf +} diff --git a/telemetry/env.go b/telemetry/env.go new file mode 100644 index 00000000..5467539d --- /dev/null +++ b/telemetry/env.go @@ -0,0 +1,22 @@ +package telemetry + +import ( + "context" + "os" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" +) + +func isOtelEnvironmentSet() bool { + return os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" +} + +// Creates a exporter thats completely built by environment flags. +// References: +// - https://opentelemetry.io/docs/specs/otel/protocol/exporter/ +// - https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ +func newOtelExporterFromEnvironment(ctx context.Context) (*otlptrace.Exporter, error) { + client := otlptracegrpc.NewClient() + return otlptrace.New(ctx, client) +} diff --git a/telemetry/exporter.go b/telemetry/exporter.go new file mode 100644 index 00000000..b80a8b06 --- /dev/null +++ b/telemetry/exporter.go @@ -0,0 +1,79 @@ +package telemetry + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/codes" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +var _ tracesdk.SpanExporter = &filteredExporter{} + +func NewFilteredExporter(inner tracesdk.SpanExporter) tracesdk.SpanExporter { + return &filteredExporter{inner: inner} +} + +type filteredExporter struct { + inner tracesdk.SpanExporter +} + +func (fe *filteredExporter) Shutdown(ctx context.Context) error { + return fe.inner.Shutdown(ctx) +} + +func (fe *filteredExporter) ExportSpans(ctx context.Context, spans []tracesdk.ReadOnlySpan) error { + in := []tracesdk.ReadOnlySpan{} + + for _, span := range spans { + if fe.AlwaysInclude(span) { + in = append(in, span) + continue + } + + if HasSpanDrop(span) { + continue + } + + if IsEmptyConsume(span) { + continue + } + + in = append(in, span) + } + + return fe.inner.ExportSpans(ctx, in) +} + +func (fe *filteredExporter) AlwaysInclude(s tracesdk.ReadOnlySpan) bool { + return len(s.Links()) > 0 || + len(s.Events()) > 0 || + s.ChildSpanCount() > 0 || + s.Status().Code == codes.Error +} + +// Allows for services to just flag a span to be dropped. +func HasSpanDrop(s tracesdk.ReadOnlySpan) bool { + for _, attr := range s.Attributes() { + if attr.Key == DropSpanKey && attr.Value.AsBool() { + return true + } + } + + return false +} + +// Detects if its an event that was consumed but ignored. +// These can cause a lot of cluttering in the traces and we want to filter them out. +func IsEmptyConsume(s tracesdk.ReadOnlySpan) bool { + if s.SpanKind() == trace.SpanKindConsumer { + + // If it took less than a millisecond and has no child spans, the event was most likely ignored... + if s.EndTime().Sub(s.StartTime()) < time.Millisecond { + return true + } + } + + return false +} diff --git a/telemetry/honey.go b/telemetry/honey.go new file mode 100644 index 00000000..66a88bd7 --- /dev/null +++ b/telemetry/honey.go @@ -0,0 +1,49 @@ +package telemetry + +import ( + "context" + "os" + + "google.golang.org/grpc/credentials" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + + // Add in gzip + _ "google.golang.org/grpc/encoding/gzip" +) + +type HoneycombConfig struct { + URL string + Team string +} + +func newHoneycombExporterFromConfig(ctx context.Context, config HoneycombConfig) (*otlptrace.Exporter, error) { + return newHoneycombExporter(ctx, config.URL, config.Team) +} + +func isHoneycombEnvironmentSet() bool { + return os.Getenv("HONEYCOMB_API_KEY") != "" +} + +func newHoneycombExporterFromEnvironment(ctx context.Context) (*otlptrace.Exporter, error) { + return newHoneycombExporter(ctx, "api.honeycomb.io:443", os.Getenv("HONEYCOMB_API_KEY")) +} + +func newHoneycombExporter(ctx context.Context, endpoint string, team string) (*otlptrace.Exporter, error) { + // Configuration to export data to Honeycomb: + // + // 1. The Honeycomb endpoint + // 2. Your API key, set as the x-honeycomb-team header + opts := []otlptracegrpc.Option{ + otlptracegrpc.WithCompressor("gzip"), + otlptracegrpc.WithEndpoint(endpoint), + otlptracegrpc.WithHeaders(map[string]string{ + "x-honeycomb-team": team, + }), + otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, "")), + } + + client := otlptracegrpc.NewClient(opts...) + return otlptrace.New(ctx, client) +} diff --git a/telemetry/linked.go b/telemetry/linked.go new file mode 100644 index 00000000..cb540b21 --- /dev/null +++ b/telemetry/linked.go @@ -0,0 +1,91 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// StartLinkedRootSpan starts a new root span where the parent and child spans share links to each other. This +// is particularly useful in batch processing applications where separate spans are wanted for each subprocess +// in the batch, but without cluttering the parent span. +func StartLinkedRootSpan(ctx context.Context, name string, options ...trace.SpanStartOption) *LinkedSpan { + + // new root for the children + childOpts := append([]trace.SpanStartOption{ + trace.WithNewRoot(), + trace.WithLinks(trace.LinkFromContext(ctx, attribute.String("link.name", "parent"))), // link to parent from child + }, options...) + + childCtx, childSpan := StartSpan(ctx, name, childOpts...) + + // start a new span on the parent and link to the child span from the parent one. + parentOpts := append([]trace.SpanStartOption{ + trace.WithLinks(trace.LinkFromContext(childCtx, attribute.String("link.name", "child"))), // link to parent from child + }, options...) + + parentCtx, parentSpan := StartSpan(ctx, name, parentOpts...) + + return &LinkedSpan{ + childCtx: childCtx, + childSpan: childSpan, + parentCtx: parentCtx, + parentSpan: parentSpan, + } +} + +type LinkedSpan struct { + childCtx context.Context + childSpan trace.Span + + parentCtx context.Context + parentSpan trace.Span +} + +func (l *LinkedSpan) End(options ...trace.SpanEndOption) { + l.childSpan.End(options...) + l.parentSpan.End(options...) +} + +func (l *LinkedSpan) AddEvent(name string, options ...trace.EventOption) { + l.childSpan.AddEvent(name, options...) + l.parentSpan.AddEvent(name, options...) +} + +func (l *LinkedSpan) RecordError(err error, options ...trace.EventOption) { + l.childSpan.RecordError(err, options...) + l.parentSpan.RecordError(err, options...) +} + +func (l *LinkedSpan) SetStatus(code codes.Code, description string) { + l.childSpan.SetStatus(code, description) + l.parentSpan.SetStatus(code, description) +} + +func (l *LinkedSpan) SetAttributes(kv ...attribute.KeyValue) { + l.childSpan.SetAttributes(kv...) + l.parentSpan.SetAttributes(kv...) +} + +func (l *LinkedSpan) SetName(name string) { + l.childSpan.SetName(name) + l.parentSpan.SetName(name) +} + +func (l *LinkedSpan) ChildSpan() trace.Span { + return l.childSpan +} + +func (l *LinkedSpan) ChildContext() context.Context { + return l.childCtx +} + +func (l *LinkedSpan) ParentSpan() trace.Span { + return l.parentSpan +} + +func (l *LinkedSpan) ParentContext() context.Context { + return l.parentCtx +} diff --git a/telemetry/stdout.go b/telemetry/stdout.go new file mode 100644 index 00000000..dcd8338f --- /dev/null +++ b/telemetry/stdout.go @@ -0,0 +1,24 @@ +package telemetry + +import ( + "io" + "os" + + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/sdk/trace" +) + +func newDiscardExporter() (trace.SpanExporter, error) { + return newJsonExporter(io.Discard) +} + +func newStdoutExporter() (trace.SpanExporter, error) { + return newJsonExporter(os.Stdout) +} + +// newExporter returns a console exporter. +func newJsonExporter(w io.Writer) (trace.SpanExporter, error) { + return stdouttrace.New( + stdouttrace.WithWriter(w), + ) +} diff --git a/telemetry/tracer.go b/telemetry/tracer.go new file mode 100644 index 00000000..68f559a1 --- /dev/null +++ b/telemetry/tracer.go @@ -0,0 +1,49 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +const ( + InstrumentationName = "moov.io" + AttributeTag = "otel" + MaxArrayAttributes = 10 +) + +func GetTracer(opts ...trace.TracerOption) trace.Tracer { + return otel.GetTracerProvider().Tracer(InstrumentationName, opts...) +} + +func StartSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + if ctx == nil { + ctx = context.Background() + } + return GetTracer().Start(ctx, spanName, opts...) +} + +func SpanFromContext(ctx context.Context) trace.Span { + return trace.SpanFromContext(ctx) +} + +func AddEvent(ctx context.Context, name string, options ...trace.EventOption) { + SpanFromContext(ctx).AddEvent(name, options...) +} + +// RecordError will record err as an exception span event for this span. It will also +// return the err passed in. +func RecordError(ctx context.Context, err error, options ...trace.EventOption) error { + options = append(options, trace.WithStackTrace(true)) + SpanFromContext(ctx).RecordError(err, options...) + return err +} + +// SetAttributes sets kv as attributes of the Span. If a key from kv +// already exists for an attribute of the Span it will be overwritten with +// the value contained in kv. +func SetAttributes(ctx context.Context, kv ...attribute.KeyValue) { + SpanFromContext(ctx).SetAttributes(kv...) +} diff --git a/telemetry/tracer_test.go b/telemetry/tracer_test.go new file mode 100644 index 00000000..d2b8ca5a --- /dev/null +++ b/telemetry/tracer_test.go @@ -0,0 +1,51 @@ +package telemetry_test + +import ( + "context" + "fmt" + "testing" + + "github.com/moov-io/base/telemetry" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" +) + +func TestStartSpan__NoPanic(t *testing.T) { + ctx, span := telemetry.StartSpan(nil, "no-panics") //nolint:staticcheck + require.NotNil(t, ctx) + require.NotNil(t, span) +} + +func TestSpan_SetAttributes(t *testing.T) { + var conf telemetry.Config + shutdown, err := telemetry.SetupTelemetry(context.Background(), conf, "v0.0.0") + require.NoError(t, err) + t.Cleanup(func() { shutdown() }) + + ctx, span := telemetry.StartSpan(context.Background(), "set-attributes") + defer span.End() + + // First Set + span.SetAttributes(attribute.String("kafka.topic", "test.cmd.v1")) + + // Second Set + span.SetAttributes(attribute.String("event.type", "my-favorite-event")) + + // Verify the attributes which are set + ss := telemetry.SpanFromContext(ctx) + require.Equal(t, "*trace.recordingSpan", fmt.Sprintf("%T", ss)) + ro, ok := ss.(trace.ReadOnlySpan) + require.True(t, ok) + + attrs := ro.Attributes() + for i := range attrs { + switch attrs[i].Key { + case "kafka.topic", "event.type": + // do nothing + default: + t.Errorf("attribute[%d]=%#v\n", i, attrs[i]) + } + } +}