Skip to content

Commit

Permalink
Implement remote storage configuration
Browse files Browse the repository at this point in the history
Unfortunately upstream has decided to effectively break the remote_write
client (only supports sending from a WAL directory). So for now I've
updated the fork of the old remote client. In the future I may replace
this with some other client (not sure how that'll jive with configs
etc.) but for now this seems to work

Fixes #386
  • Loading branch information
jacksontj committed Feb 4, 2021
1 parent 54ca427 commit d08e35c
Show file tree
Hide file tree
Showing 14 changed files with 2,636 additions and 12 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ require (
github.com/prometheus/common v0.15.0
github.com/prometheus/prometheus v1.8.1-0.20200513230854-c784807932c2
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.6.1
go.uber.org/atomic v1.7.0
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
gopkg.in/yaml.v2 v2.4.0
k8s.io/klog v1.0.0
)
Expand Down
25 changes: 13 additions & 12 deletions pkg/proxystorage/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/sirupsen/logrus"

"github.com/prometheus/prometheus/storage/remote"
"github.com/jacksontj/promxy/pkg/remote"

"github.com/jacksontj/promxy/pkg/logging"
"github.com/jacksontj/promxy/pkg/promhttputil"

proxyconfig "github.com/jacksontj/promxy/pkg/config"
Expand Down Expand Up @@ -120,20 +121,20 @@ func (p *ProxyStorage) ApplyConfig(c *proxyconfig.Config) error {
}
newState.remoteStorage = oldState.remoteStorage
} else {
panic("WHAT")
/*
// TODO: configure path?
remote := remote.NewStorage(logging.NewLogger(logrus.WithField("component", "remote_write").Logger), func() (int64, error) { return 0, nil }, 1*time.Second)
if err := remote.ApplyConfig(&c.PromConfig); err != nil {
return err
}
newState.remoteStorage = remote
newState.appenderCloser = remote.Close
*/
remote := remote.NewStorage(logging.NewLogger(logrus.WithField("component", "remote_write").Logger), func() (int64, error) { return 0, nil }, 1*time.Second)
if err := remote.ApplyConfig(&c.PromConfig); err != nil {
return err
}
newState.remoteStorage = remote
newState.appenderCloser = remote.Close
}

// Whether old or new, update the appender
newState.appender = newState.remoteStorage.Appender(context.TODO())
var err error
newState.appender, err = newState.remoteStorage.Appender()
if err != nil {
return err
}

} else {
newState.appender = &appenderStub{}
Expand Down
1 change: 1 addition & 0 deletions pkg/remote/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
To remove once https://github.com/prometheus/prometheus/issues/5523 is sorted out
181 changes: 181 additions & 0 deletions pkg/remote/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remote

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"

config_util "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/prompb"
)

const maxErrMsgLen = 256

var userAgent = fmt.Sprintf("Prometheus/%s", version.Version)

// Client allows reading and writing from/to a remote HTTP endpoint.
type Client struct {
index int // Used to differentiate clients in metrics.
url *config_util.URL
client *http.Client
timeout time.Duration
}

// ClientConfig configures a Client.
type ClientConfig struct {
URL *config_util.URL
Timeout model.Duration
HTTPClientConfig config_util.HTTPClientConfig
}

// NewClient creates a new Client.
func NewClient(index int, conf *ClientConfig) (*Client, error) {
httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false, false)
if err != nil {
return nil, err
}

return &Client{
index: index,
url: conf.URL,
client: httpClient,
timeout: time.Duration(conf.Timeout),
}, nil
}

type recoverableError struct {
error
}

// Store sends a batch of samples to the HTTP endpoint.
func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error {
data, err := proto.Marshal(req)
if err != nil {
return err
}

compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))
if err != nil {
// Errors from NewRequest are from unparseable URLs, so are not
// recoverable.
return err
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", userAgent)
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
httpReq = httpReq.WithContext(ctx)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

httpResp, err := c.client.Do(httpReq.WithContext(ctx))
if err != nil {
// Errors from client.Do are from (for example) network errors, so are
// recoverable.
return recoverableError{err}
}
defer httpResp.Body.Close()

if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
}
if httpResp.StatusCode/100 == 5 {
return recoverableError{err}
}
return err
}

// Name identifies the client.
func (c Client) Name() string {
return fmt.Sprintf("%d:%s", c.index, c.url)
}

// Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
req := &prompb.ReadRequest{
// TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it.
Queries: []*prompb.Query{
query,
},
}
data, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to marshal read request: %v", err)
}

compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))
if err != nil {
return nil, fmt.Errorf("unable to create request: %v", err)
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Add("Accept-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", userAgent)
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

httpResp, err := c.client.Do(httpReq.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status)
}

compressed, err = ioutil.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}

uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}

var resp prompb.ReadResponse
err = proto.Unmarshal(uncompressed, &resp)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal response body: %v", err)
}

if len(resp.Results) != len(req.Queries) {
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
}

return resp.Results[0], nil
}
84 changes: 84 additions & 0 deletions pkg/remote/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remote

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"testing"
"time"

config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)

var longErrMessage = strings.Repeat("error message", maxErrMsgLen)

func TestStoreHTTPErrorHandling(t *testing.T) {
tests := []struct {
code int
err error
}{
{
code: 200,
err: nil,
},
{
code: 300,
err: fmt.Errorf("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]),
},
{
code: 404,
err: fmt.Errorf("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]),
},
{
code: 500,
err: recoverableError{fmt.Errorf("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])},
},
}

for i, test := range tests {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, longErrMessage, test.code)
}),
)

serverURL, err := url.Parse(server.URL)
if err != nil {
t.Fatal(err)
}

c, err := NewClient(0, &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
})
if err != nil {
t.Fatal(err)
}

err = c.Store(context.Background(), &prompb.WriteRequest{})
if !reflect.DeepEqual(err, test.err) {
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err)
}

server.Close()
}
}
Loading

0 comments on commit d08e35c

Please sign in to comment.