From 0788b5f9cdfb86a143a53d2fd63465a49d4b8b59 Mon Sep 17 00:00:00 2001 From: Saman Mahdanian <46444723+SamMHD@users.noreply.github.com> Date: Wed, 28 Feb 2024 01:06:41 +0330 Subject: [PATCH] Feat/add opentelemetry propagation (#42) * refactor tracing package and usage * reading parent context from Check request headers * fix tests * fixed nil ref to finalResponse --- internal/tracing/tracing.go | 30 ++++++++++++++-- pkg/auth/authenticator.go | 69 ++++++++++++++++++++++--------------- pkg/auth/metrics.go | 2 +- pkg/auth/metrics_test.go | 6 ++-- 4 files changed, 74 insertions(+), 33 deletions(-) diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go index 20c60b7..d1adafb 100644 --- a/internal/tracing/tracing.go +++ b/internal/tracing/tracing.go @@ -3,12 +3,15 @@ package tracing import ( "context" "fmt" + "net/http" "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" @@ -20,6 +23,7 @@ const ( ServiceName = "cerberus" HTTPTracingProvider = "http-tracing-provider" GRPCTracingProvider = "grpc-tracing-provider" + TimeFormat = time.RFC3339Nano ) var cerberusTracer trace.Tracer @@ -61,8 +65,30 @@ func SetTracingProvider(provider string, samplingRation float64, timeout float64 return } -func StartSpan(ctx context.Context, spanName string) (context.Context, trace.Span) { - return cerberusTracer.Start(ctx, spanName) +func StartSpan(ctx context.Context, spanName string, extraAttrs ...attribute.KeyValue) (context.Context, trace.Span) { + newCtx, span := cerberusTracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindServer), + ) + extraAttrs = append(extraAttrs, + attribute.String("start-time", time.Now().Format(TimeFormat)), + ) + span.SetAttributes(extraAttrs...) + return newCtx, span +} + +func EndSpan(span trace.Span, start_time time.Time, extraAttrs ...attribute.KeyValue) { + extraAttrs = append(extraAttrs, + attribute.String("end-time", time.Now().Format(TimeFormat)), + attribute.Float64("duration_seconds", time.Since(start_time).Seconds()), + ) + span.SetAttributes(extraAttrs...) + span.End() +} + +func ReadParentSpanFromRequest(ctx context.Context, req http.Request) context.Context { + return otel.GetTextMapPropagator().Extract( + ctx, propagation.HeaderCarrier(req.Header), + ) } func Tracer() *trace.Tracer { diff --git a/pkg/auth/authenticator.go b/pkg/auth/authenticator.go index ddf6f93..d0c11ae 100644 --- a/pkg/auth/authenticator.go +++ b/pkg/auth/authenticator.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "net/url" + "strings" "sync" "time" @@ -12,6 +13,7 @@ import ( "github.com/snapp-incubator/Cerberus/api/v1alpha1" "github.com/snapp-incubator/Cerberus/internal/tracing" "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -137,21 +139,32 @@ func toExtraHeaders(headers CerberusExtraHeaders) ExtraHeaders { } // Check is the function which is used to Authenticate and Respond to gRPC envoy.CheckRequest -func (a *Authenticator) Check(ctx context.Context, request *Request) (*Response, error) { +func (a *Authenticator) Check(ctx context.Context, request *Request) (finalResponse *Response, err error) { + start_time := time.Now() wsvc, ns, reason := readRequestContext(request) // generate opentelemetry span with given parameters - ctx, span := tracing.StartSpan(ctx, "CheckFunction") - defer func() { - span.SetAttributes( - attribute.String("cerberus-reason", string(reason)), - ) - span.End() - }() - span.SetAttributes( + parentCtx := tracing.ReadParentSpanFromRequest(ctx, request.Request) + ctx, span := tracing.StartSpan(parentCtx, "CheckFunction", attribute.String("webservice", wsvc), attribute.String("namespace", ns), ) + defer func() { + extraAttrs := []attribute.KeyValue{ + attribute.String("cerberus-reason", string(reason)), + } + if finalResponse != nil { + extraAttrs = append(extraAttrs, + attribute.Bool("final-response-ok", finalResponse.Allow), + ) + for k, v := range finalResponse.Response.Header { + extraAttrs = append(extraAttrs, + attribute.String("final-extra-headers-"+k, strings.Join(v, ",")), + ) + } + } + tracing.EndSpan(span, start_time, extraAttrs...) + }() if reason != "" { return generateResponse(reason, nil), nil @@ -178,12 +191,12 @@ func (a *Authenticator) Check(ctx context.Context, request *Request) (*Response, } } - var err error if reason == CerberusReasonUpstreamAuthTimeout || reason == CerberusReasonUpstreamAuthFailed { err = status.Error(codes.DeadlineExceeded, "Timeout exceeded") } - return generateResponse(reason, extraHeaders), err + finalResponse = generateResponse(reason, extraHeaders) + return } func readRequestContext(request *Request) (wsvc string, ns string, reason CerberusReason) { @@ -290,19 +303,18 @@ func processResponseError(err error) CerberusReason { // checkServiceUpstreamAuth function is designed to validate the request through // the upstream authentication for a given webservice func (a *Authenticator) checkServiceUpstreamAuth(service WebservicesCacheEntry, request *Request, extraHeaders *ExtraHeaders, ctx context.Context) (reason CerberusReason) { + start_time := time.Now() downstreamDeadline, hasDownstreamDeadline := ctx.Deadline() - serviceUpstreamAuthCalls.With(AddWithDownstreamDeadline(nil, hasDownstreamDeadline)).Inc() + serviceUpstreamAuthCalls.With(AddWithDownstreamDeadlineLabel(nil, hasDownstreamDeadline)).Inc() - _, span := tracing.StartSpan(ctx, "upstream-auth") + _, span := tracing.StartSpan(ctx, "cerberus-upstream-auth", + attribute.String("upstream-auth-address", service.Spec.UpstreamHttpAuth.Address), + ) defer func() { - span.SetAttributes( - attribute.String("upstream-auth-cerberus-reason", string(reason)), + tracing.EndSpan(span, start_time, + attribute.String("cerberus-reason", string(reason)), ) - span.End() }() - span.SetAttributes( - attribute.String("upstream-auth-address", service.Spec.UpstreamHttpAuth.Address), - ) if reason := validateUpstreamAuthRequest(service); reason != "" { return reason @@ -318,17 +330,20 @@ func (a *Authenticator) checkServiceUpstreamAuth(service WebservicesCacheEntry, resp, err := a.httpClient.Do(req) reqDuration := time.Since(reqStart) - if reason := processResponseError(err); reason != "" { - return reason - } - - labels := AddWithDownstreamDeadline(AddStatusLabel(nil, resp.StatusCode), hasDownstreamDeadline) - upstreamAuthRequestDuration.With(labels).Observe(reqDuration.Seconds()) - span.SetAttributes( - attribute.Float64("upstream-auth-rtt-seconds", reqDuration.Seconds()), + attribute.String("upstream-http-request-start", reqStart.Format(tracing.TimeFormat)), + attribute.String("upstream-http-request-end", time.Now().Format(tracing.TimeFormat)), + attribute.Float64("upstream-http-request-rtt-seconds", time.Since(reqStart).Seconds()), attribute.Int("upstream-auth-status-code", resp.StatusCode), ) + labels := AddWithDownstreamDeadlineLabel(AddStatusLabel(nil, resp.StatusCode), hasDownstreamDeadline) + upstreamAuthRequestDuration.With(labels).Observe(reqDuration.Seconds()) + + if reason := processResponseError(err); reason != "" { + span.RecordError(err) + span.SetStatus(otelcodes.Error, "upstream auth http request faild") + return reason + } if resp.StatusCode != http.StatusOK { return CerberusReasonUnauthorized diff --git a/pkg/auth/metrics.go b/pkg/auth/metrics.go index df91d3f..f26c39e 100644 --- a/pkg/auth/metrics.go +++ b/pkg/auth/metrics.go @@ -173,7 +173,7 @@ func AddUpstreamAuthLabel(labels prometheus.Labels, hasUpstreamAuth string) prom return labels } -func AddWithDownstreamDeadline(labels prometheus.Labels, hasDeadline bool) prometheus.Labels { +func AddWithDownstreamDeadlineLabel(labels prometheus.Labels, hasDeadline bool) prometheus.Labels { if labels == nil { labels = prometheus.Labels{} } diff --git a/pkg/auth/metrics_test.go b/pkg/auth/metrics_test.go index 525736b..0e52869 100644 --- a/pkg/auth/metrics_test.go +++ b/pkg/auth/metrics_test.go @@ -66,18 +66,18 @@ func TestAddUpstreamAuthLabel(t *testing.T) { } func TestAddWithDownstreamDeadline(t *testing.T) { // Test case 1: With downstream deadline - labels := AddWithDownstreamDeadline(nil, true) + labels := AddWithDownstreamDeadlineLabel(nil, true) assert.NotNil(t, labels, "Labels should not be nil") assert.Equal(t, "true", labels[WithDownstreamDeadlineLabel], "WithDownstreamDeadlineLabel should be true") // Test case 2: Without downstream deadline - labels = AddWithDownstreamDeadline(nil, false) + labels = AddWithDownstreamDeadlineLabel(nil, false) assert.NotNil(t, labels, "Labels should not be nil") assert.Equal(t, "false", labels[WithDownstreamDeadlineLabel], "WithDownstreamDeadlineLabel should be false") // Test case 3: Existing labels existingLabels := prometheus.Labels{"existing": "label"} - labels = AddWithDownstreamDeadline(existingLabels, true) + labels = AddWithDownstreamDeadlineLabel(existingLabels, true) assert.NotNil(t, labels, "Labels should not be nil") assert.Equal(t, "true", labels[WithDownstreamDeadlineLabel], "WithDownstreamDeadlineLabel should be true") assert.Equal(t, "label", labels["existing"], "Existing label should remain unchanged")