Skip to content

Commit

Permalink
Add support for x scope org id header in loki source api (#1805)
Browse files Browse the repository at this point in the history
* feat(loki.source.api): Add support for tenant extraction from header

* Update CHANGELOG.md

* add tests and fix syntax errors

* rename userID to tenantID

* add documentation on header usage

* Update docs/sources/reference/components/loki/loki.source.api.md

Co-authored-by: Clayton Cornell <[email protected]>

* Update docs/sources/reference/components/loki/loki.source.api.md

---------

Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
QuentinBisson and clayton-cornell authored Oct 2, 2024
1 parent 67409b5 commit 35e256f
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Main (unreleased)

- Add the function `path_join` to the stdlib. (@wildum)
- Add support to `loki.source.syslog` for the RFC3164 format ("BSD syslog"). (@sushain97)
- Add support to `loki.source.api` to be able to extract the tenant from the HTTP `X-Scope-OrgID` header (@QuentinBisson)
- (_Experimental_) Add a `loki.secretfilter` component to redact secrets from collected logs.

### Enhancements
Expand Down
8 changes: 8 additions & 0 deletions docs/sources/reference/components/loki/loki.source.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ loki.source.api "loki_push_api" {
}
```

### Technical details

`loki.source.api` filters out all labels that start with `__`, for example, `__tenant_id__`.

If you need to be able to set the tenant ID, you must either make sure the `X-Scope-OrgID` header is present or use the [`loki.process`][loki.process] component.

[loki.process]: ../loki.process/

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand All @@ -22,6 +23,7 @@ import (
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/common/loki/client"
fnet "github.com/grafana/alloy/internal/component/common/net"
frelabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/runtime/logging/level"
Expand Down Expand Up @@ -64,21 +66,38 @@ func (s *PushAPIServer) Run() error {
level.Info(s.logger).Log("msg", "starting push API server")

err := s.server.MountAndRun(func(router *mux.Router) {

// Extract the tenant ID from the request and add it to the context.
tenantHeaderExtractor := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, ctx, _ := user.ExtractOrgIDFromHTTPRequest(r)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

// This redirecting is so we can avoid breaking changes where we originally implemented it with
// the loki prefix.
router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/push"
r.RequestURI = "/loki/api/v1/push"
s.handleLoki(w, r)
}))
router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/raw"
r.RequestURI = "/loki/api/v1/raw"
s.handlePlaintext(w, r)
}))
router.Path("/api/v1/push").Methods("POST").Handler(
tenantHeaderExtractor(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/push"
r.RequestURI = "/loki/api/v1/push"
s.handleLoki(w, r)
}),
),
)
router.Path("/api/v1/raw").Methods("POST").Handler(
tenantHeaderExtractor(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/raw"
r.RequestURI = "/loki/api/v1/raw"
s.handlePlaintext(w, r)
}),
),
)
router.Path("/ready").Methods("GET").Handler(http.HandlerFunc(s.ready))
router.Path("/loki/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki))
router.Path("/loki/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext))
router.Path("/loki/api/v1/push").Methods("POST").Handler(tenantHeaderExtractor(http.HandlerFunc(s.handleLoki)))
router.Path("/loki/api/v1/raw").Methods("POST").Handler(tenantHeaderExtractor(http.HandlerFunc(s.handlePlaintext)))
})
return err
}
Expand Down Expand Up @@ -137,10 +156,10 @@ func (s *PushAPIServer) getRelabelRules() []*relabel.Config {
// Only the HTTP handler functions are copied to allow for Alloy-specific server configuration and lifecycle management.
func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
tenantID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(
logger,
userID,
tenantID,
r,
nil, // tenants retention
nil, // limits
Expand Down Expand Up @@ -190,6 +209,11 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
filtered[model.LabelName(processed[i].Name)] = model.LabelValue(processed[i].Value)
}

// Add tenant ID to the filtered labels if it is set
if tenantID != "" {
filtered[model.LabelName(client.ReservedLabelTenantID)] = model.LabelValue(tenantID)
}

for _, entry := range stream.Entries {
e := loki.Entry{
Labels: filtered.Clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,98 @@ regex = "dropme"
pt.Shutdown()
}

func TestLokiPushTargetWithXScopeOrgIDHeader(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
pt, port, eh := createPushServer(t, logger)

pt.SetLabels(model.LabelSet{
"pushserver": "pushserver1",
"dropme": "label",
})
pt.SetKeepTimestamp(true)

relabelRule := frelabel.Config{}
relabelStr := `
action = "labeldrop"
regex = "dropme"
`
err := syntax.Unmarshal([]byte(relabelStr), &relabelRule)
require.NoError(t, err)
pt.SetRelabelRules(frelabel.Rules{&relabelRule})

// Build a client to send logs
serverURL := flagext.URLValue{}
err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/api/v1/push")
require.NoError(t, err)

ccfg := client.Config{
URL: serverURL,
Timeout: 1 * time.Second,
BatchWait: 1 * time.Second,
BatchSize: 100 * 1024,
Headers: map[string]string{
"X-Scope-OrgID": "tenant1",
},
}
m := client.NewMetrics(prometheus.DefaultRegisterer)
pc, err := client.New(m, ccfg, 0, 0, false, logger)
require.NoError(t, err)
defer pc.Stop()

// Send some logs
labels := model.LabelSet{
"stream": "stream1",
"__anotherdroplabel": "dropme",
}
for i := 0; i < 100; i++ {
pc.Chan() <- loki.Entry{
Labels: labels,
Entry: logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: "line" + strconv.Itoa(i),
StructuredMetadata: push.LabelsAdapter{
{Name: "i", Value: strconv.Itoa(i)},
{Name: "anotherMetaData", Value: "val"},
},
},
}
}

// Wait for them to appear in the test handler
countdown := 10000
for len(eh.Received()) != 100 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}

// Make sure we didn't timeout
require.Equal(t, 100, len(eh.Received()))

// Verify labels
expectedLabels := model.LabelSet{
"pushserver": "pushserver1",
"stream": "stream1",
"__tenant_id__": "tenant1",
}

expectedStructuredMetadata := push.LabelsAdapter{
{Name: "i", Value: strconv.Itoa(0)},
{Name: "anotherMetaData", Value: "val"},
}

// Spot check the first value in the result to make sure relabel rules were applied properly
require.Equal(t, expectedLabels, eh.Received()[0].Labels)

// Spot check the first value in the result to make sure structured metadata was received properly
require.Equal(t, expectedStructuredMetadata, eh.Received()[0].StructuredMetadata)

// With keep timestamp enabled, verify timestamp
require.Equal(t, time.Unix(99, 0).Unix(), eh.Received()[99].Timestamp.Unix())

pt.Shutdown()
}

func TestPlaintextPushTarget(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down Expand Up @@ -267,6 +359,85 @@ func TestPlaintextPushTarget(t *testing.T) {
pt.Shutdown()
}

func TestPlaintextPushTargetWithXScopeOrgIDHeader(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
//Create PushAPIServerOld
eh := fake.NewClient(func() {})
defer eh.Stop()

// Get a randomly available port by open and closing a TCP socket
addr, err := net.ResolveTCPAddr("tcp", localhost+":0")
require.NoError(t, err)
l, err := net.ListenTCP("tcp", addr)
require.NoError(t, err)
port := l.Addr().(*net.TCPAddr).Port
err = l.Close()
require.NoError(t, err)

serverConfig := &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: localhost,
ListenPort: port,
},
GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)},
}

pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry())
require.NoError(t, err)

err = pt.Run()
require.NoError(t, err)

pt.SetLabels(model.LabelSet{
"pushserver": "pushserver2",
"keepme": "label",
})
pt.SetKeepTimestamp(true)

// Send some logs
ts := time.Now()
body := new(bytes.Buffer)
client := &http.Client{}
for i := 0; i < 100; i++ {
body.WriteString("line" + strconv.Itoa(i))
url := fmt.Sprintf("http://%s:%d/api/v1/raw", localhost, port)

// Create a new request
req, err := http.NewRequest("POST", url, body)
require.NoError(t, err)
req.Header.Add("Content-Type", "text/json")
req.Header.Add("X-Scope-OrgID", "tenant1")
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
body.Reset()
}

// Wait for them to appear in the test handler
countdown := 10000
for len(eh.Received()) != 100 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}

// Make sure we didn't timeout
require.Equal(t, 100, len(eh.Received()))

// Verify labels
expectedLabels := model.LabelSet{
"pushserver": "pushserver2",
"keepme": "label",
}
// Spot check the first value in the result to make sure relabel rules were applied properly
require.Equal(t, expectedLabels, eh.Received()[0].Labels)

// Timestamp is always set in the handler, we expect received timestamps to be slightly higher than the timestamp when we started sending logs.
require.GreaterOrEqual(t, eh.Received()[99].Timestamp.Unix(), ts.Unix())

pt.Shutdown()
}

func TestReady(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down

0 comments on commit 35e256f

Please sign in to comment.