From f6241a1d8c8ac5a7da38fb9c6071fded8bdf4c1e Mon Sep 17 00:00:00 2001 From: mattdurham Date: Thu, 6 Jul 2023 15:06:11 -0400 Subject: [PATCH] Fix for loki.source.api urls (#4352) * Fix for #4317 * Add rerouted routes * Update CHANGELOG.md Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * Update docs/sources/flow/reference/components/loki.source.api.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --------- Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- CHANGELOG.md | 3 + .../api/internal/lokipush/push_api_server.go | 16 ++- .../internal/lokipush/push_api_server_test.go | 116 +++++++++++++++--- .../reference/components/loki.source.api.md | 9 +- 4 files changed, 121 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6a2e76722e3..906a7723168e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,9 @@ Main (unreleased) - Fix bug where `stage.timestamp` in `loki.process` wasn't able to correctly parse timezones. This issue only impacts the dedicated `grafana-agent-flow` binary. (@rfratto) + +- Fix bug where JSON requests to `loki.source.api` would not be handled correctly. This adds `/loki/api/v1/raw` and `/loki/api/v1/push` endpoints to `loki.source.api` and maps the `/api/v1/push` and `/api/v1/raw` to + the `/loki` prefixed endpoints. (@mattdurham) ### Other changes diff --git a/component/loki/source/api/internal/lokipush/push_api_server.go b/component/loki/source/api/internal/lokipush/push_api_server.go index b8f16a015978..c82e8fa3f18c 100644 --- a/component/loki/source/api/internal/lokipush/push_api_server.go +++ b/component/loki/source/api/internal/lokipush/push_api_server.go @@ -63,9 +63,21 @@ func (s *PushAPIServer) Run() error { level.Info(s.logger).Log("msg", "starting push API server") err := s.server.MountAndRun(func(router *mux.Router) { - router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki)) - router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext)) + // This redirecting is so we can avoid breaking changes where we originally implemented it with + // the loki prefix. + router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.URL.Path = "/loki/api/v1/push" + r.RequestURI = "/loki/api/v1/push" + s.handleLoki(w, r) + })) + router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.URL.Path = "/loki/api/v1/raw" + r.RequestURI = "/loki/api/v1/raw" + s.handlePlaintext(w, r) + })) router.Path("/ready").Methods("GET").Handler(http.HandlerFunc(s.ready)) + router.Path("/loki/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki)) + router.Path("/loki/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext)) }) return err } diff --git a/component/loki/source/api/internal/lokipush/push_api_server_test.go b/component/loki/source/api/internal/lokipush/push_api_server_test.go index c0180d9a3d17..055e62858313 100644 --- a/component/loki/source/api/internal/lokipush/push_api_server_test.go +++ b/component/loki/source/api/internal/lokipush/push_api_server_test.go @@ -35,27 +35,82 @@ const localhost = "127.0.0.1" func TestLokiPushTarget(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) + pt, port, eh := createPushServer(t, logger) - //Create PushAPIServerOld - eh := fake.NewClient(func() {}) - defer eh.Stop() - - // Get a randomly available port by open and closing a TCP socket - port := getFreePort(t) + pt.SetLabels(model.LabelSet{ + "pushserver": "pushserver1", + "dropme": "label", + }) + pt.SetKeepTimestamp(true) - serverConfig := &fnet.ServerConfig{ - HTTP: &fnet.HTTPConfig{ - ListenAddress: localhost, - ListenPort: port, - }, - GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)}, - } + relabelRule := frelabel.Config{} + relabelStr := ` +action = "labeldrop" +regex = "dropme" +` + err := river.Unmarshal([]byte(relabelStr), &relabelRule) + require.NoError(t, err) + pt.SetRelabelRules(frelabel.Rules{&relabelRule}) - pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry()) + // Build a client to send logs + serverURL := flagext.URLValue{} + err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/api/v1/push") require.NoError(t, err) - err = pt.Run() + ccfg := client.Config{ + URL: serverURL, + Timeout: 1 * time.Second, + BatchWait: 1 * time.Second, + BatchSize: 100 * 1024, + } + m := client.NewMetrics(prometheus.DefaultRegisterer, nil) + pc, err := client.New(m, ccfg, nil, 0, logger) require.NoError(t, err) + defer pc.Stop() + + // Send some logs + labels := model.LabelSet{ + "stream": "stream1", + "__anotherdroplabel": "dropme", + } + for i := 0; i < 100; i++ { + pc.Chan() <- loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: "line" + strconv.Itoa(i), + }, + } + } + + // Wait for them to appear in the test handler + countdown := 10000 + for len(eh.Received()) != 100 && countdown > 0 { + time.Sleep(1 * time.Millisecond) + countdown-- + } + + // Make sure we didn't timeout + require.Equal(t, 100, len(eh.Received())) + + // Verify labels + expectedLabels := model.LabelSet{ + "pushserver": "pushserver1", + "stream": "stream1", + } + // Spot check the first value in the result to make sure relabel rules were applied properly + require.Equal(t, expectedLabels, eh.Received()[0].Labels) + + // With keep timestamp enabled, verify timestamp + require.Equal(t, time.Unix(99, 0).Unix(), eh.Received()[99].Timestamp.Unix()) + + pt.Shutdown() +} + +func TestLokiPushTargetForRedirect(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + pt, port, eh := createPushServer(t, logger) pt.SetLabels(model.LabelSet{ "pushserver": "pushserver1", @@ -68,13 +123,13 @@ func TestLokiPushTarget(t *testing.T) { action = "labeldrop" regex = "dropme" ` - err = river.Unmarshal([]byte(relabelStr), &relabelRule) + err := river.Unmarshal([]byte(relabelStr), &relabelRule) require.NoError(t, err) pt.SetRelabelRules(frelabel.Rules{&relabelRule}) // Build a client to send logs serverURL := flagext.URLValue{} - err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/api/v1/push") + err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/loki/api/v1/push") require.NoError(t, err) ccfg := client.Config{ @@ -130,7 +185,6 @@ regex = "dropme" func TestPlaintextPushTarget(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - //Create PushAPIServerOld eh := fake.NewClient(func() {}) defer eh.Stop() @@ -265,3 +319,29 @@ func getFreePort(t *testing.T) int { require.NoError(t, err) return port } + +func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *fake.Client) { + //Create PushAPIServerOld + eh := fake.NewClient(func() {}) + t.Cleanup(func() { + eh.Stop() + }) + + // Get a randomly available port by open and closing a TCP socket + port := getFreePort(t) + + serverConfig := &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: localhost, + ListenPort: port, + }, + GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)}, + } + + pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry()) + require.NoError(t, err) + + err = pt.Run() + require.NoError(t, err) + return pt, port, eh +} diff --git a/docs/sources/flow/reference/components/loki.source.api.md b/docs/sources/flow/reference/components/loki.source.api.md index 6636c407e71e..add3d62f3516 100644 --- a/docs/sources/flow/reference/components/loki.source.api.md +++ b/docs/sources/flow/reference/components/loki.source.api.md @@ -25,9 +25,12 @@ loki.source.api "LABEL" { The component will start HTTP server on the configured port and address with the following endpoints: -- `/api/v1/push` - accepting `POST` requests compatible with [Loki push API][loki-push-api], for example, from another Grafana Agent's [`loki.write`][loki.write] component. -- `/api/v1/raw` - accepting `POST` requests with newline-delimited log lines in body. This can be used to send NDJSON or plaintext logs. This is compatible with promtail's push API endpoint - see [promtail's documentation][promtail-push-api] for more information. NOTE: when this endpoint is used, the incoming timestamps cannot be used and the `use_incoming_timestamp = true` setting will be ignored. -- `/ready` - accepting `GET` requests - can be used to confirm the server is reachable and healthy. +- `/loki/api/v1/push` - accepting `POST` requests compatible with [Loki push API][loki-push-api], for example, from another Grafana Agent's [`loki.write`][loki.write] component. +- `/loki/api/v1/raw` - accepting `POST` requests with newline-delimited log lines in body. This can be used to send NDJSON or plaintext logs. This is compatible with promtail's push API endpoint - see [promtail's documentation][promtail-push-api] for more information. NOTE: when this endpoint is used, the incoming timestamps cannot be used and the `use_incoming_timestamp = true` setting will be ignored. +- `/loki/ready` - accepting `GET` requests - can be used to confirm the server is reachable and healthy. +- `/api/v1/push` - internally reroutes to `/loki/api/v1/push` +- `/api/v1/raw` - internally reroutes to `/loki/api/v1/raw` + [promtail-push-api]: https://grafana.com/docs/loki/latest/clients/promtail/configuration/#loki_push_api