Skip to content

Commit

Permalink
feat(faro/receiver): propagate request metadata to downstream consume…
Browse files Browse the repository at this point in the history
…rs (#6515)

Signed-off-by: hainenber <[email protected]>
  • Loading branch information
hainenber authored Apr 4, 2024
1 parent 644fa14 commit 2e95228
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 7 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Main (unreleased)

- Add automatic conversion for `legacy_positions_file` in component `loki.source.file`. (@mattdurham)

- Propagate request metadata for `faro.receiver` to downstream components. (@hainenber)

### Features

- A new `loki.rules.kubernetes` component that discovers `PrometheusRule` Kubernetes resources and loads them into a Loki Ruler instance. (@EStork09)
Expand All @@ -75,7 +77,7 @@ Main (unreleased)
whenever that argument is explicitly configured. This issue only affected a
small subset of arguments across 15 components. (@erikbaranowski, @rfratto)

- Fix panic when fanning out to invalid receivers. (@hainenber)
- Fix panic when fanning out to invalid receivers. (@hainenber)

- Fix a bug where a panic could occur when reloading custom components. (@wildum)

Expand Down
1 change: 1 addition & 0 deletions docs/sources/flow/reference/components/faro.receiver.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Name | Type | Description | Default | Required
`cors_allowed_origins` | `list(string)` | Origins for which cross-origin requests are permitted. | `[]` | no
`api_key` | `secret` | Optional API key to validate client requests with. | `""` | no
`max_allowed_payload_size` | `string` | Maximum size (in bytes) for client requests. | `"5MiB"` | no
`include_metadata` | `boolean` | Propagate incoming connection metadata to downstream consumers. | `false` | no

By default, telemetry data is only accepted from applications on the same local
network as the browser. To accept telemetry data from a wider set of clients,
Expand Down
3 changes: 2 additions & 1 deletion internal/component/faro/receiver/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type ServerArguments struct {
APIKey rivertypes.Secret `river:"api_key,attr,optional"`
MaxAllowedPayloadSize units.Base2Bytes `river:"max_allowed_payload_size,attr,optional"`

RateLimiting RateLimitingArguments `river:"rate_limiting,block,optional"`
RateLimiting RateLimitingArguments `river:"rate_limiting,block,optional"`
IncludeMetadata bool `river:"include_metadata,attr,optional"`
}

func (s *ServerArguments) SetToDefault() {
Expand Down
8 changes: 8 additions & 0 deletions internal/component/faro/receiver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/agent/internal/flow/logging/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"
"go.opentelemetry.io/collector/client"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -80,6 +81,13 @@ func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
h.argsMut.RLock()
defer h.argsMut.RUnlock()

// Propagate request headers as metadata
if h.args.IncludeMetadata {
cl := client.FromContext(req.Context())
cl.Metadata = client.NewMetadata(req.Header.Clone())
req = req.WithContext(client.NewContext(req.Context(), cl))
}

if h.cors != nil {
h.cors.ServeHTTP(rw, req, h.handleRequest)
} else {
Expand Down
166 changes: 166 additions & 0 deletions internal/component/faro/receiver/receiver_otelcol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//go:build !race

package receiver

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/grafana/agent/internal/component/otelcol"
"github.com/grafana/agent/internal/component/otelcol/auth"
"github.com/grafana/agent/internal/component/otelcol/auth/headers"
otlphttp "github.com/grafana/agent/internal/component/otelcol/exporter/otlphttp"
"github.com/grafana/agent/internal/flow/componenttest"
"github.com/grafana/agent/internal/util"
"github.com/phayes/freeport"
"github.com/stretchr/testify/require"
)

func TestWithOtelcolConsumer(t *testing.T) {
ctx := componenttest.TestContext(t)

faroReceiver, err := componenttest.NewControllerFromID(
util.TestLogger(t),
"faro.receiver",
)
require.NoError(t, err)
faroReceiverPort, err := freeport.GetFreePort()
require.NoError(t, err)

otelcolAuthHeader, err := componenttest.NewControllerFromID(
util.TestLogger(t),
"otelcol.auth.headers",
)
require.NoError(t, err)

otelcolExporter, err := componenttest.NewControllerFromID(
util.TestLogger(t),
"otelcol.exporter.otlphttp",
)
require.NoError(t, err)

doneChan := make(chan struct{})
finalOtelServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "TENANTID", r.Header.Get("X-Scope-OrgId"))
close(doneChan)
w.WriteHeader(http.StatusOK)
}))
defer finalOtelServer.Close()

tenantId := "Tenant-Id"
go func() {
err := otelcolAuthHeader.Run(ctx, headers.Arguments{
Headers: []headers.Header{
{
Key: "X-Scope-OrgId",
FromContext: &tenantId,
Action: headers.ActionUpsert,
},
},
})
require.NoError(t, err)
}()

require.NoError(t, otelcolAuthHeader.WaitRunning(time.Second), "otelco.auth.headers never started")
require.NoError(t, otelcolAuthHeader.WaitExports(time.Second), "otelco.auth.headers never exported anything")
otelcolAuthHeaderExport, ok := otelcolAuthHeader.Exports().(auth.Exports)
require.True(t, ok)

go func() {
err := otelcolExporter.Run(ctx, otlphttp.Arguments{
Client: otlphttp.HTTPClientArguments(otelcol.HTTPClientArguments{
Endpoint: finalOtelServer.URL,
Auth: &otelcolAuthHeaderExport.Handler,
TLS: otelcol.TLSClientArguments{
Insecure: true,
InsecureSkipVerify: true,
},
}),
Encoding: otlphttp.EncodingJSON,
})
require.NoError(t, err)
}()

require.NoError(t, otelcolExporter.WaitRunning(time.Second), "otelco.exporter.otlphttp never started")
require.NoError(t, otelcolExporter.WaitExports(time.Second), "otelco.exporter.otlphttp never exported anything")
otelcolExporterExport, ok := otelcolExporter.Exports().(otelcol.ConsumerExports)
require.True(t, ok)

go func() {
err := faroReceiver.Run(ctx, Arguments{
LogLabels: map[string]string{
"foo": "bar",
},

Server: ServerArguments{
Host: "127.0.0.1",
Port: faroReceiverPort,
IncludeMetadata: true,
},

Output: OutputArguments{
Traces: []otelcol.Consumer{otelcolExporterExport.Input},
},
})
require.NoError(t, err)
}()

// Wait for the server to be running.
util.Eventually(t, func(t require.TestingT) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/-/ready", faroReceiverPort))
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusOK, resp.StatusCode)
})

// Send a sample payload to the server.
req, err := http.NewRequest(
"POST",
fmt.Sprintf("http://localhost:%d/collect", faroReceiverPort),
strings.NewReader(`{
"traces": {
"resourceSpans": [{
"scope_spans": [{
"spans": [{
"name": "TestSpan"
}]
}]
}]
},
"logs": [{
"message": "hello, world",
"level": "info",
"context": {"env": "dev"},
"timestamp": "2021-01-01T00:00:00Z",
"trace": {
"trace_id": "0",
"span_id": "0"
}
}],
"exceptions": [],
"measurements": [],
"meta": {}
}`),
)
require.NoError(t, err)

req.Header.Add(tenantId, "TENANTID")
req.Header.Add("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusAccepted, resp.StatusCode)
select {
case <-doneChan:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for updates to finish")
}
}
17 changes: 12 additions & 5 deletions internal/component/faro/receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func Test(t *testing.T) {
},

Server: ServerArguments{
Host: "127.0.0.1",
Port: freePort,
Host: "127.0.0.1",
Port: freePort,
IncludeMetadata: true,
},

Output: OutputArguments{
Expand All @@ -62,9 +63,9 @@ func Test(t *testing.T) {
})

// Send a sample payload to the server.
resp, err := http.Post(
req, err := http.NewRequest(
"POST",
fmt.Sprintf("http://localhost:%d/collect", freePort),
"application/json",
strings.NewReader(`{
"traces": {
"resourceSpans": []
Expand All @@ -85,6 +86,13 @@ func Test(t *testing.T) {
}`),
)
require.NoError(t, err)

req.Header.Add("Tenant-Id", "TENANTID")
req.Header.Add("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, http.StatusAccepted, resp.StatusCode)
Expand Down Expand Up @@ -124,7 +132,6 @@ func newFakeLogsReceiver(t *testing.T) *fakeLogsReceiver {
case <-ctx.Done():
return
case ent := <-lr.Chan():

lr.entriesMut.Lock()
lr.entries = append(lr.entries, loki.Entry{
Labels: ent.Labels,
Expand Down

0 comments on commit 2e95228

Please sign in to comment.