Skip to content

Commit

Permalink
Feat/add opentelemetry propagation (#42)
Browse files Browse the repository at this point in the history
* refactor tracing package and usage

* reading parent context from Check request headers

* fix tests

* fixed nil ref to finalResponse
  • Loading branch information
SamMHD authored Feb 27, 2024
1 parent bafd0ab commit 0788b5f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 33 deletions.
30 changes: 28 additions & 2 deletions internal/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package tracing
import (
"context"
"fmt"
"net/http"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
Expand All @@ -20,6 +23,7 @@ const (
ServiceName = "cerberus"
HTTPTracingProvider = "http-tracing-provider"
GRPCTracingProvider = "grpc-tracing-provider"
TimeFormat = time.RFC3339Nano
)

var cerberusTracer trace.Tracer
Expand Down Expand Up @@ -61,8 +65,30 @@ func SetTracingProvider(provider string, samplingRation float64, timeout float64
return
}

func StartSpan(ctx context.Context, spanName string) (context.Context, trace.Span) {
return cerberusTracer.Start(ctx, spanName)
func StartSpan(ctx context.Context, spanName string, extraAttrs ...attribute.KeyValue) (context.Context, trace.Span) {
newCtx, span := cerberusTracer.Start(ctx, spanName,
trace.WithSpanKind(trace.SpanKindServer),
)
extraAttrs = append(extraAttrs,
attribute.String("start-time", time.Now().Format(TimeFormat)),
)
span.SetAttributes(extraAttrs...)
return newCtx, span
}

func EndSpan(span trace.Span, start_time time.Time, extraAttrs ...attribute.KeyValue) {
extraAttrs = append(extraAttrs,
attribute.String("end-time", time.Now().Format(TimeFormat)),
attribute.Float64("duration_seconds", time.Since(start_time).Seconds()),
)
span.SetAttributes(extraAttrs...)
span.End()
}

func ReadParentSpanFromRequest(ctx context.Context, req http.Request) context.Context {
return otel.GetTextMapPropagator().Extract(
ctx, propagation.HeaderCarrier(req.Header),
)
}

func Tracer() *trace.Tracer {
Expand Down
69 changes: 42 additions & 27 deletions pkg/auth/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"net/url"
"strings"
"sync"
"time"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/snapp-incubator/Cerberus/api/v1alpha1"
"github.com/snapp-incubator/Cerberus/internal/tracing"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -137,21 +139,32 @@ func toExtraHeaders(headers CerberusExtraHeaders) ExtraHeaders {
}

// Check is the function which is used to Authenticate and Respond to gRPC envoy.CheckRequest
func (a *Authenticator) Check(ctx context.Context, request *Request) (*Response, error) {
func (a *Authenticator) Check(ctx context.Context, request *Request) (finalResponse *Response, err error) {
start_time := time.Now()
wsvc, ns, reason := readRequestContext(request)

// generate opentelemetry span with given parameters
ctx, span := tracing.StartSpan(ctx, "CheckFunction")
defer func() {
span.SetAttributes(
attribute.String("cerberus-reason", string(reason)),
)
span.End()
}()
span.SetAttributes(
parentCtx := tracing.ReadParentSpanFromRequest(ctx, request.Request)
ctx, span := tracing.StartSpan(parentCtx, "CheckFunction",
attribute.String("webservice", wsvc),
attribute.String("namespace", ns),
)
defer func() {
extraAttrs := []attribute.KeyValue{
attribute.String("cerberus-reason", string(reason)),
}
if finalResponse != nil {
extraAttrs = append(extraAttrs,
attribute.Bool("final-response-ok", finalResponse.Allow),
)
for k, v := range finalResponse.Response.Header {
extraAttrs = append(extraAttrs,
attribute.String("final-extra-headers-"+k, strings.Join(v, ",")),
)
}
}
tracing.EndSpan(span, start_time, extraAttrs...)
}()

if reason != "" {
return generateResponse(reason, nil), nil
Expand All @@ -178,12 +191,12 @@ func (a *Authenticator) Check(ctx context.Context, request *Request) (*Response,
}
}

var err error
if reason == CerberusReasonUpstreamAuthTimeout || reason == CerberusReasonUpstreamAuthFailed {
err = status.Error(codes.DeadlineExceeded, "Timeout exceeded")
}

return generateResponse(reason, extraHeaders), err
finalResponse = generateResponse(reason, extraHeaders)
return
}

func readRequestContext(request *Request) (wsvc string, ns string, reason CerberusReason) {
Expand Down Expand Up @@ -290,19 +303,18 @@ func processResponseError(err error) CerberusReason {
// checkServiceUpstreamAuth function is designed to validate the request through
// the upstream authentication for a given webservice
func (a *Authenticator) checkServiceUpstreamAuth(service WebservicesCacheEntry, request *Request, extraHeaders *ExtraHeaders, ctx context.Context) (reason CerberusReason) {
start_time := time.Now()
downstreamDeadline, hasDownstreamDeadline := ctx.Deadline()
serviceUpstreamAuthCalls.With(AddWithDownstreamDeadline(nil, hasDownstreamDeadline)).Inc()
serviceUpstreamAuthCalls.With(AddWithDownstreamDeadlineLabel(nil, hasDownstreamDeadline)).Inc()

_, span := tracing.StartSpan(ctx, "upstream-auth")
_, span := tracing.StartSpan(ctx, "cerberus-upstream-auth",
attribute.String("upstream-auth-address", service.Spec.UpstreamHttpAuth.Address),
)
defer func() {
span.SetAttributes(
attribute.String("upstream-auth-cerberus-reason", string(reason)),
tracing.EndSpan(span, start_time,
attribute.String("cerberus-reason", string(reason)),
)
span.End()
}()
span.SetAttributes(
attribute.String("upstream-auth-address", service.Spec.UpstreamHttpAuth.Address),
)

if reason := validateUpstreamAuthRequest(service); reason != "" {
return reason
Expand All @@ -318,17 +330,20 @@ func (a *Authenticator) checkServiceUpstreamAuth(service WebservicesCacheEntry,
resp, err := a.httpClient.Do(req)
reqDuration := time.Since(reqStart)

if reason := processResponseError(err); reason != "" {
return reason
}

labels := AddWithDownstreamDeadline(AddStatusLabel(nil, resp.StatusCode), hasDownstreamDeadline)
upstreamAuthRequestDuration.With(labels).Observe(reqDuration.Seconds())

span.SetAttributes(
attribute.Float64("upstream-auth-rtt-seconds", reqDuration.Seconds()),
attribute.String("upstream-http-request-start", reqStart.Format(tracing.TimeFormat)),
attribute.String("upstream-http-request-end", time.Now().Format(tracing.TimeFormat)),
attribute.Float64("upstream-http-request-rtt-seconds", time.Since(reqStart).Seconds()),
attribute.Int("upstream-auth-status-code", resp.StatusCode),
)
labels := AddWithDownstreamDeadlineLabel(AddStatusLabel(nil, resp.StatusCode), hasDownstreamDeadline)
upstreamAuthRequestDuration.With(labels).Observe(reqDuration.Seconds())

if reason := processResponseError(err); reason != "" {
span.RecordError(err)
span.SetStatus(otelcodes.Error, "upstream auth http request faild")
return reason
}

if resp.StatusCode != http.StatusOK {
return CerberusReasonUnauthorized
Expand Down
2 changes: 1 addition & 1 deletion pkg/auth/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func AddUpstreamAuthLabel(labels prometheus.Labels, hasUpstreamAuth string) prom
return labels
}

func AddWithDownstreamDeadline(labels prometheus.Labels, hasDeadline bool) prometheus.Labels {
func AddWithDownstreamDeadlineLabel(labels prometheus.Labels, hasDeadline bool) prometheus.Labels {
if labels == nil {
labels = prometheus.Labels{}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/auth/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,18 @@ func TestAddUpstreamAuthLabel(t *testing.T) {
}
func TestAddWithDownstreamDeadline(t *testing.T) {
// Test case 1: With downstream deadline
labels := AddWithDownstreamDeadline(nil, true)
labels := AddWithDownstreamDeadlineLabel(nil, true)
assert.NotNil(t, labels, "Labels should not be nil")
assert.Equal(t, "true", labels[WithDownstreamDeadlineLabel], "WithDownstreamDeadlineLabel should be true")

// Test case 2: Without downstream deadline
labels = AddWithDownstreamDeadline(nil, false)
labels = AddWithDownstreamDeadlineLabel(nil, false)
assert.NotNil(t, labels, "Labels should not be nil")
assert.Equal(t, "false", labels[WithDownstreamDeadlineLabel], "WithDownstreamDeadlineLabel should be false")

// Test case 3: Existing labels
existingLabels := prometheus.Labels{"existing": "label"}
labels = AddWithDownstreamDeadline(existingLabels, true)
labels = AddWithDownstreamDeadlineLabel(existingLabels, true)
assert.NotNil(t, labels, "Labels should not be nil")
assert.Equal(t, "true", labels[WithDownstreamDeadlineLabel], "WithDownstreamDeadlineLabel should be true")
assert.Equal(t, "label", labels["existing"], "Existing label should remain unchanged")
Expand Down

0 comments on commit 0788b5f

Please sign in to comment.