From 81ee94e1c74878933cdf18e9b501e84a0a1183d9 Mon Sep 17 00:00:00 2001 From: Vilsol Date: Tue, 7 Dec 2021 19:22:15 +0200 Subject: [PATCH] fix: increase taskq timeout, add tracing --- go.mod | 1 + go.sum | 4 ++++ redis/jobs/jobs.go | 15 ++++++++++++--- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 9e63a91f..46232032 100755 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/swaggo/echo-swagger v1.1.4 github.com/swaggo/swag v1.7.6 github.com/vektah/gqlparser/v2 v2.2.0 + github.com/vmihailenco/taskq/extra/taskqotel/v3 v3.2.8 github.com/vmihailenco/taskq/v3 v3.2.8 github.com/xeipuuv/gojsonschema v1.2.0 go.opentelemetry.io/contrib v1.2.0 diff --git a/go.sum b/go.sum index cf14b97f..ffdff194 100755 --- a/go.sum +++ b/go.sum @@ -732,6 +732,8 @@ github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9 github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/vmihailenco/taskq/extra/taskqotel/v3 v3.2.8 h1:keHd+7LCPUPyQMb+DsO/OiZJuMoBEsOiBdONm+1nmPI= +github.com/vmihailenco/taskq/extra/taskqotel/v3 v3.2.8/go.mod h1:svFuw6AcsvxQdwFiZF0BYk2XSfgXy1tSWE6L6sOJVeo= github.com/vmihailenco/taskq/v3 v3.2.8 h1:Smrz4Fhqi+29fALjcjlETACVbzlra/MbmFY9YKfvjQI= github.com/vmihailenco/taskq/v3 v3.2.8/go.mod h1:IFuypxi7Y0h+PcactlQOPf92Ssxg0FWxQZ8ptxYW/Zk= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= @@ -772,6 +774,7 @@ go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0 go.opentelemetry.io/contrib/propagators/b3 v1.2.0 h1:+zQjl3DBSOle9GEhHuhqzDUKtYcVSfbHSNv24hsoOJ0= go.opentelemetry.io/contrib/propagators/b3 v1.2.0/go.mod h1:kO8hNKCfa1YmQJ0lM7pzfJGvbXEipn/S7afbOfaw2Kc= go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0= +go.opentelemetry.io/otel v1.0.0-RC2/go.mod h1:w1thVQ7qbAy8MHb0IFj8a5Q2QU0l2ksf8u/CN8m3NOM= go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ= go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.2.0 h1:xzbcGykysUh776gzD1LUPsNNHKWN0kQWDnJhn1ddUuk= @@ -780,6 +783,7 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.2.0 h1:j/jXNz go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.2.0/go.mod h1:k5GnE4m4Jyy2DNh6UAzG6Nml51nuqQyszV7O1ksQAnE= go.opentelemetry.io/otel/sdk v1.2.0 h1:wKN260u4DesJYhyjxDa7LRFkuhH7ncEVKU37LWcyNIo= go.opentelemetry.io/otel/sdk v1.2.0/go.mod h1:jNN8QtpvbsKhgaC6V5lHiejMoKD+V8uadoSafgHPx1U= +go.opentelemetry.io/otel/trace v1.0.0-RC2/go.mod h1:JPQ+z6nNw9mqEGT8o3eoPTdnNI+Aj5JcxEsVGREIAy4= go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0= go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= diff --git a/redis/jobs/jobs.go b/redis/jobs/jobs.go index 3c6c29a1..6b49386e 100644 --- a/redis/jobs/jobs.go +++ b/redis/jobs/jobs.go @@ -4,11 +4,13 @@ import ( "context" "encoding/json" "fmt" + "time" - "github.com/satisfactorymodding/smr-api/redis/jobs/tasks" + "github.com/vmihailenco/taskq/extra/taskqotel/v3" "github.com/go-redis/redis/v8" "github.com/rs/zerolog/log" + "github.com/satisfactorymodding/smr-api/redis/jobs/tasks" "github.com/spf13/viper" "github.com/vmihailenco/taskq/v3" "github.com/vmihailenco/taskq/v3/redisq" @@ -28,8 +30,15 @@ func InitializeJobs(ctx context.Context) { QueueFactory := redisq.NewFactory() queue = QueueFactory.RegisterQueue(&taskq.QueueOptions{ - Name: "api-worker", - Redis: connection, + Name: "api-worker", + Redis: connection, + ReservationTimeout: time.Hour, + }) + + QueueFactory.Range(func(q taskq.Queue) bool { + consumer := q.Consumer() + consumer.AddHook(&taskqotel.OpenTelemetryHook{}) + return true }) if err := QueueFactory.StartConsumers(ctx); err != nil {