Skip to content

Commit

Permalink
Fix for loki.source.api urls (#4352)
Browse files Browse the repository at this point in the history
* Fix for #4317

* Add rerouted routes

* Update CHANGELOG.md

Co-authored-by: Piotr <[email protected]>

* Update docs/sources/flow/reference/components/loki.source.api.md

Co-authored-by: Clayton Cornell <[email protected]>

---------

Co-authored-by: Piotr <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
3 people authored Jul 6, 2023
1 parent 4d18315 commit f6241a1
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 23 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ Main (unreleased)
- Fix bug where `stage.timestamp` in `loki.process` wasn't able to correctly
parse timezones. This issue only impacts the dedicated `grafana-agent-flow`
binary. (@rfratto)

- Fix bug where JSON requests to `loki.source.api` would not be handled correctly. This adds `/loki/api/v1/raw` and `/loki/api/v1/push` endpoints to `loki.source.api` and maps the `/api/v1/push` and `/api/v1/raw` to
the `/loki` prefixed endpoints. (@mattdurham)

### Other changes

Expand Down
16 changes: 14 additions & 2 deletions component/loki/source/api/internal/lokipush/push_api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,21 @@ func (s *PushAPIServer) Run() error {
level.Info(s.logger).Log("msg", "starting push API server")

err := s.server.MountAndRun(func(router *mux.Router) {
router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki))
router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext))
// This redirecting is so we can avoid breaking changes where we originally implemented it with
// the loki prefix.
router.Path("/api/v1/push").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/push"
r.RequestURI = "/loki/api/v1/push"
s.handleLoki(w, r)
}))
router.Path("/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/loki/api/v1/raw"
r.RequestURI = "/loki/api/v1/raw"
s.handlePlaintext(w, r)
}))
router.Path("/ready").Methods("GET").Handler(http.HandlerFunc(s.ready))
router.Path("/loki/api/v1/push").Methods("POST").Handler(http.HandlerFunc(s.handleLoki))
router.Path("/loki/api/v1/raw").Methods("POST").Handler(http.HandlerFunc(s.handlePlaintext))
})
return err
}
Expand Down
116 changes: 98 additions & 18 deletions component/loki/source/api/internal/lokipush/push_api_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,82 @@ const localhost = "127.0.0.1"
func TestLokiPushTarget(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
pt, port, eh := createPushServer(t, logger)

//Create PushAPIServerOld
eh := fake.NewClient(func() {})
defer eh.Stop()

// Get a randomly available port by open and closing a TCP socket
port := getFreePort(t)
pt.SetLabels(model.LabelSet{
"pushserver": "pushserver1",
"dropme": "label",
})
pt.SetKeepTimestamp(true)

serverConfig := &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: localhost,
ListenPort: port,
},
GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)},
}
relabelRule := frelabel.Config{}
relabelStr := `
action = "labeldrop"
regex = "dropme"
`
err := river.Unmarshal([]byte(relabelStr), &relabelRule)
require.NoError(t, err)
pt.SetRelabelRules(frelabel.Rules{&relabelRule})

pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry())
// Build a client to send logs
serverURL := flagext.URLValue{}
err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/api/v1/push")
require.NoError(t, err)

err = pt.Run()
ccfg := client.Config{
URL: serverURL,
Timeout: 1 * time.Second,
BatchWait: 1 * time.Second,
BatchSize: 100 * 1024,
}
m := client.NewMetrics(prometheus.DefaultRegisterer, nil)
pc, err := client.New(m, ccfg, nil, 0, logger)
require.NoError(t, err)
defer pc.Stop()

// Send some logs
labels := model.LabelSet{
"stream": "stream1",
"__anotherdroplabel": "dropme",
}
for i := 0; i < 100; i++ {
pc.Chan() <- loki.Entry{
Labels: labels,
Entry: logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: "line" + strconv.Itoa(i),
},
}
}

// Wait for them to appear in the test handler
countdown := 10000
for len(eh.Received()) != 100 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}

// Make sure we didn't timeout
require.Equal(t, 100, len(eh.Received()))

// Verify labels
expectedLabels := model.LabelSet{
"pushserver": "pushserver1",
"stream": "stream1",
}
// Spot check the first value in the result to make sure relabel rules were applied properly
require.Equal(t, expectedLabels, eh.Received()[0].Labels)

// With keep timestamp enabled, verify timestamp
require.Equal(t, time.Unix(99, 0).Unix(), eh.Received()[99].Timestamp.Unix())

pt.Shutdown()
}

func TestLokiPushTargetForRedirect(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
pt, port, eh := createPushServer(t, logger)

pt.SetLabels(model.LabelSet{
"pushserver": "pushserver1",
Expand All @@ -68,13 +123,13 @@ func TestLokiPushTarget(t *testing.T) {
action = "labeldrop"
regex = "dropme"
`
err = river.Unmarshal([]byte(relabelStr), &relabelRule)
err := river.Unmarshal([]byte(relabelStr), &relabelRule)
require.NoError(t, err)
pt.SetRelabelRules(frelabel.Rules{&relabelRule})

// Build a client to send logs
serverURL := flagext.URLValue{}
err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/api/v1/push")
err = serverURL.Set("http://" + localhost + ":" + strconv.Itoa(port) + "/loki/api/v1/push")
require.NoError(t, err)

ccfg := client.Config{
Expand Down Expand Up @@ -130,7 +185,6 @@ regex = "dropme"
func TestPlaintextPushTarget(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

//Create PushAPIServerOld
eh := fake.NewClient(func() {})
defer eh.Stop()
Expand Down Expand Up @@ -265,3 +319,29 @@ func getFreePort(t *testing.T) int {
require.NoError(t, err)
return port
}

func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *fake.Client) {
//Create PushAPIServerOld
eh := fake.NewClient(func() {})
t.Cleanup(func() {
eh.Stop()
})

// Get a randomly available port by open and closing a TCP socket
port := getFreePort(t)

serverConfig := &fnet.ServerConfig{
HTTP: &fnet.HTTPConfig{
ListenAddress: localhost,
ListenPort: port,
},
GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)},
}

pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry())
require.NoError(t, err)

err = pt.Run()
require.NoError(t, err)
return pt, port, eh
}
9 changes: 6 additions & 3 deletions docs/sources/flow/reference/components/loki.source.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ loki.source.api "LABEL" {

The component will start HTTP server on the configured port and address with the following endpoints:

- `/api/v1/push` - accepting `POST` requests compatible with [Loki push API][loki-push-api], for example, from another Grafana Agent's [`loki.write`][loki.write] component.
- `/api/v1/raw` - accepting `POST` requests with newline-delimited log lines in body. This can be used to send NDJSON or plaintext logs. This is compatible with promtail's push API endpoint - see [promtail's documentation][promtail-push-api] for more information. NOTE: when this endpoint is used, the incoming timestamps cannot be used and the `use_incoming_timestamp = true` setting will be ignored.
- `/ready` - accepting `GET` requests - can be used to confirm the server is reachable and healthy.
- `/loki/api/v1/push` - accepting `POST` requests compatible with [Loki push API][loki-push-api], for example, from another Grafana Agent's [`loki.write`][loki.write] component.
- `/loki/api/v1/raw` - accepting `POST` requests with newline-delimited log lines in body. This can be used to send NDJSON or plaintext logs. This is compatible with promtail's push API endpoint - see [promtail's documentation][promtail-push-api] for more information. NOTE: when this endpoint is used, the incoming timestamps cannot be used and the `use_incoming_timestamp = true` setting will be ignored.
- `/loki/ready` - accepting `GET` requests - can be used to confirm the server is reachable and healthy.
- `/api/v1/push` - internally reroutes to `/loki/api/v1/push`
- `/api/v1/raw` - internally reroutes to `/loki/api/v1/raw`


[promtail-push-api]: https://grafana.com/docs/loki/latest/clients/promtail/configuration/#loki_push_api

Expand Down

0 comments on commit f6241a1

Please sign in to comment.