Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(faro/receiver): propagate request metadata to downstream consumers #6515

Merged
merged 7 commits into from
Apr 4, 2024
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Main (unreleased)

- Add automatic conversion for `legacy_positions_file` in component `loki.source.file`. (@mattdurham)

- Propagate request metadata for `faro.receiver` to downstream components. (@hainenber)

### Features

- A new `loki.rules.kubernetes` component that discovers `PrometheusRule` Kubernetes resources and loads them into a Loki Ruler instance. (@EStork09)
Expand All @@ -75,7 +77,7 @@ Main (unreleased)
whenever that argument is explicitly configured. This issue only affected a
small subset of arguments across 15 components. (@erikbaranowski, @rfratto)

- Fix panic when fanning out to invalid receivers. (@hainenber)
- Fix panic when fanning out to invalid receivers. (@hainenber)

- Fix a bug where a panic could occur when reloading custom components. (@wildum)

Expand Down
1 change: 1 addition & 0 deletions docs/sources/flow/reference/components/faro.receiver.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Name | Type | Description | Default | Required
`cors_allowed_origins` | `list(string)` | Origins for which cross-origin requests are permitted. | `[]` | no
`api_key` | `secret` | Optional API key to validate client requests with. | `""` | no
`max_allowed_payload_size` | `string` | Maximum size (in bytes) for client requests. | `"5MiB"` | no
`include_metadata` | `boolean` | Propagate incoming connection metadata to downstream consumers. | `false` | no

By default, telemetry data is only accepted from applications on the same local
network as the browser. To accept telemetry data from a wider set of clients,
Expand Down
3 changes: 2 additions & 1 deletion internal/component/faro/receiver/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type ServerArguments struct {
APIKey rivertypes.Secret `river:"api_key,attr,optional"`
MaxAllowedPayloadSize units.Base2Bytes `river:"max_allowed_payload_size,attr,optional"`

RateLimiting RateLimitingArguments `river:"rate_limiting,block,optional"`
RateLimiting RateLimitingArguments `river:"rate_limiting,block,optional"`
IncludeMetadata bool `river:"include_metadata,attr,optional"`
}

func (s *ServerArguments) SetToDefault() {
Expand Down
8 changes: 8 additions & 0 deletions internal/component/faro/receiver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/agent/internal/flow/logging/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"
"go.opentelemetry.io/collector/client"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -80,6 +81,13 @@ func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
h.argsMut.RLock()
defer h.argsMut.RUnlock()

// Propagate request headers as metadata
if h.args.IncludeMetadata {
cl := client.FromContext(req.Context())
cl.Metadata = client.NewMetadata(req.Header.Clone())
req = req.WithContext(client.NewContext(req.Context(), cl))
}

if h.cors != nil {
h.cors.ServeHTTP(rw, req, h.handleRequest)
} else {
Expand Down
166 changes: 166 additions & 0 deletions internal/component/faro/receiver/receiver_otelcol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//go:build !race

package receiver

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/grafana/agent/internal/component/otelcol"
"github.com/grafana/agent/internal/component/otelcol/auth"
"github.com/grafana/agent/internal/component/otelcol/auth/headers"
otlphttp "github.com/grafana/agent/internal/component/otelcol/exporter/otlphttp"
"github.com/grafana/agent/internal/flow/componenttest"
"github.com/grafana/agent/internal/util"
"github.com/phayes/freeport"
"github.com/stretchr/testify/require"
)

func TestWithOtelcolConsumer(t *testing.T) {
ctx := componenttest.TestContext(t)

faroReceiver, err := componenttest.NewControllerFromID(
util.TestLogger(t),
"faro.receiver",
)
require.NoError(t, err)
faroReceiverPort, err := freeport.GetFreePort()
require.NoError(t, err)

otelcolAuthHeader, err := componenttest.NewControllerFromID(
util.TestLogger(t),
"otelcol.auth.headers",
)
require.NoError(t, err)

otelcolExporter, err := componenttest.NewControllerFromID(
util.TestLogger(t),
"otelcol.exporter.otlphttp",
)
require.NoError(t, err)

doneChan := make(chan struct{})
finalOtelServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "TENANTID", r.Header.Get("X-Scope-OrgId"))
close(doneChan)
w.WriteHeader(http.StatusOK)
}))
defer finalOtelServer.Close()

tenantId := "Tenant-Id"
go func() {
err := otelcolAuthHeader.Run(ctx, headers.Arguments{
Headers: []headers.Header{
{
Key: "X-Scope-OrgId",
FromContext: &tenantId,
Action: headers.ActionUpsert,
},
},
})
require.NoError(t, err)
}()

require.NoError(t, otelcolAuthHeader.WaitRunning(time.Second), "otelco.auth.headers never started")
require.NoError(t, otelcolAuthHeader.WaitExports(time.Second), "otelco.auth.headers never exported anything")
otelcolAuthHeaderExport, ok := otelcolAuthHeader.Exports().(auth.Exports)
require.True(t, ok)

go func() {
err := otelcolExporter.Run(ctx, otlphttp.Arguments{
Client: otlphttp.HTTPClientArguments(otelcol.HTTPClientArguments{
Endpoint: finalOtelServer.URL,
Auth: &otelcolAuthHeaderExport.Handler,
TLS: otelcol.TLSClientArguments{
Insecure: true,
InsecureSkipVerify: true,
},
}),
Encoding: otlphttp.EncodingJSON,
})
require.NoError(t, err)
}()

require.NoError(t, otelcolExporter.WaitRunning(time.Second), "otelco.exporter.otlphttp never started")
require.NoError(t, otelcolExporter.WaitExports(time.Second), "otelco.exporter.otlphttp never exported anything")
otelcolExporterExport, ok := otelcolExporter.Exports().(otelcol.ConsumerExports)
require.True(t, ok)

go func() {
err := faroReceiver.Run(ctx, Arguments{
LogLabels: map[string]string{
"foo": "bar",
},

Server: ServerArguments{
Host: "127.0.0.1",
Port: faroReceiverPort,
IncludeMetadata: true,
},

Output: OutputArguments{
Traces: []otelcol.Consumer{otelcolExporterExport.Input},
},
})
require.NoError(t, err)
}()

// Wait for the server to be running.
util.Eventually(t, func(t require.TestingT) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", faroReceiverPort))
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusOK, resp.StatusCode)
})

// Send a sample payload to the server.
req, err := http.NewRequest(
"POST",
fmt.Sprintf("http://localhost:%d/collect", faroReceiverPort),
strings.NewReader(`{
"traces": {
"resourceSpans": [{
"scope_spans": [{
"spans": [{
"name": "TestSpan"
}]
}]
}]
},
"logs": [{
"message": "hello, world",
"level": "info",
"context": {"env": "dev"},
"timestamp": "2021-01-01T00:00:00Z",
"trace": {
"trace_id": "0",
"span_id": "0"
}
}],
"exceptions": [],
"measurements": [],
"meta": {}
}`),
)
require.NoError(t, err)

req.Header.Add(tenantId, "TENANTID")
req.Header.Add("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusAccepted, resp.StatusCode)
select {
case <-doneChan:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for updates to finish")
}
}
17 changes: 12 additions & 5 deletions internal/component/faro/receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func Test(t *testing.T) {
},

Server: ServerArguments{
Host: "127.0.0.1",
Port: freePort,
Host: "127.0.0.1",
Port: freePort,
IncludeMetadata: true,
},

Output: OutputArguments{
Expand All @@ -62,9 +63,9 @@ func Test(t *testing.T) {
})

// Send a sample payload to the server.
resp, err := http.Post(
req, err := http.NewRequest(
"POST",
fmt.Sprintf("http://localhost:%d/collect", freePort),
"application/json",
strings.NewReader(`{
"traces": {
"resourceSpans": []
Expand All @@ -85,6 +86,13 @@ func Test(t *testing.T) {
}`),
)
require.NoError(t, err)

req.Header.Add("Tenant-Id", "TENANTID")
req.Header.Add("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusAccepted, resp.StatusCode)
Expand Down Expand Up @@ -124,7 +132,6 @@ func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver {
case <-ctx.Done():
return
case ent := <-lr.Chan():

lr.entriesMut.Lock()
lr.entries = append(lr.entries, loki.Entry{
Labels: ent.Labels,
Expand Down
Loading