diff --git a/go.mod b/go.mod index de60748de..ae47a466a 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,9 @@ require ( github.com/prometheus/common v0.15.0 github.com/prometheus/prometheus v1.8.1-0.20200513230854-c784807932c2 github.com/sirupsen/logrus v1.6.0 + github.com/stretchr/testify v1.6.1 go.uber.org/atomic v1.7.0 + golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 gopkg.in/yaml.v2 v2.4.0 k8s.io/klog v1.0.0 ) diff --git a/pkg/proxystorage/proxy.go b/pkg/proxystorage/proxy.go index d8afe351a..801925038 100644 --- a/pkg/proxystorage/proxy.go +++ b/pkg/proxystorage/proxy.go @@ -15,8 +15,9 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/sirupsen/logrus" - "github.com/prometheus/prometheus/storage/remote" + "github.com/jacksontj/promxy/pkg/remote" + "github.com/jacksontj/promxy/pkg/logging" "github.com/jacksontj/promxy/pkg/promhttputil" proxyconfig "github.com/jacksontj/promxy/pkg/config" @@ -120,20 +121,20 @@ func (p *ProxyStorage) ApplyConfig(c *proxyconfig.Config) error { } newState.remoteStorage = oldState.remoteStorage } else { - panic("WHAT") - /* - // TODO: configure path? - remote := remote.NewStorage(logging.NewLogger(logrus.WithField("component", "remote_write").Logger), func() (int64, error) { return 0, nil }, 1*time.Second) - if err := remote.ApplyConfig(&c.PromConfig); err != nil { - return err - } - newState.remoteStorage = remote - newState.appenderCloser = remote.Close - */ + remote := remote.NewStorage(logging.NewLogger(logrus.WithField("component", "remote_write").Logger), func() (int64, error) { return 0, nil }, 1*time.Second) + if err := remote.ApplyConfig(&c.PromConfig); err != nil { + return err + } + newState.remoteStorage = remote + newState.appenderCloser = remote.Close } // Whether old or new, update the appender - newState.appender = newState.remoteStorage.Appender(context.TODO()) + var err error + newState.appender, err = newState.remoteStorage.Appender() + if err != nil { + return err + } } else { newState.appender = &appenderStub{} diff --git a/pkg/remote/README.md b/pkg/remote/README.md new file mode 100644 index 000000000..17f84a600 --- /dev/null +++ b/pkg/remote/README.md @@ -0,0 +1 @@ +To remove once https://github.com/prometheus/prometheus/issues/5523 is sorted out diff --git a/pkg/remote/client.go b/pkg/remote/client.go new file mode 100644 index 000000000..2880bf8bd --- /dev/null +++ b/pkg/remote/client.go @@ -0,0 +1,181 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/common/model" + "github.com/prometheus/common/version" + + config_util "github.com/prometheus/common/config" + "github.com/prometheus/prometheus/prompb" +) + +const maxErrMsgLen = 256 + +var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) + +// Client allows reading and writing from/to a remote HTTP endpoint. +type Client struct { + index int // Used to differentiate clients in metrics. + url *config_util.URL + client *http.Client + timeout time.Duration +} + +// ClientConfig configures a Client. +type ClientConfig struct { + URL *config_util.URL + Timeout model.Duration + HTTPClientConfig config_util.HTTPClientConfig +} + +// NewClient creates a new Client. +func NewClient(index int, conf *ClientConfig) (*Client, error) { + httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false, false) + if err != nil { + return nil, err + } + + return &Client{ + index: index, + url: conf.URL, + client: httpClient, + timeout: time.Duration(conf.Timeout), + }, nil +} + +type recoverableError struct { + error +} + +// Store sends a batch of samples to the HTTP endpoint. +func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error { + data, err := proto.Marshal(req) + if err != nil { + return err + } + + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) + if err != nil { + // Errors from NewRequest are from unparseable URLs, so are not + // recoverable. + return err + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("User-Agent", userAgent) + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + httpReq = httpReq.WithContext(ctx) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + httpResp, err := c.client.Do(httpReq.WithContext(ctx)) + if err != nil { + // Errors from client.Do are from (for example) network errors, so are + // recoverable. + return recoverableError{err} + } + defer httpResp.Body.Close() + + if httpResp.StatusCode/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) + } + if httpResp.StatusCode/100 == 5 { + return recoverableError{err} + } + return err +} + +// Name identifies the client. +func (c Client) Name() string { + return fmt.Sprintf("%d:%s", c.index, c.url) +} + +// Read reads from a remote endpoint. +func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { + req := &prompb.ReadRequest{ + // TODO: Support batching multiple queries into one read request, + // as the protobuf interface allows for it. + Queries: []*prompb.Query{ + query, + }, + } + data, err := proto.Marshal(req) + if err != nil { + return nil, fmt.Errorf("unable to marshal read request: %v", err) + } + + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) + if err != nil { + return nil, fmt.Errorf("unable to create request: %v", err) + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Add("Accept-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("User-Agent", userAgent) + httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + httpResp, err := c.client.Do(httpReq.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("error sending request: %v", err) + } + defer httpResp.Body.Close() + if httpResp.StatusCode/100 != 2 { + return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status) + } + + compressed, err = ioutil.ReadAll(httpResp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response: %v", err) + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, fmt.Errorf("error reading response: %v", err) + } + + var resp prompb.ReadResponse + err = proto.Unmarshal(uncompressed, &resp) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal response body: %v", err) + } + + if len(resp.Results) != len(req.Queries) { + return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) + } + + return resp.Results[0], nil +} diff --git a/pkg/remote/client_test.go b/pkg/remote/client_test.go new file mode 100644 index 000000000..73ec875a5 --- /dev/null +++ b/pkg/remote/client_test.go @@ -0,0 +1,84 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "strings" + "testing" + "time" + + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" +) + +var longErrMessage = strings.Repeat("error message", maxErrMsgLen) + +func TestStoreHTTPErrorHandling(t *testing.T) { + tests := []struct { + code int + err error + }{ + { + code: 200, + err: nil, + }, + { + code: 300, + err: fmt.Errorf("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]), + }, + { + code: 404, + err: fmt.Errorf("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]), + }, + { + code: 500, + err: recoverableError{fmt.Errorf("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])}, + }, + } + + for i, test := range tests { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, longErrMessage, test.code) + }), + ) + + serverURL, err := url.Parse(server.URL) + if err != nil { + t.Fatal(err) + } + + c, err := NewClient(0, &ClientConfig{ + URL: &config_util.URL{URL: serverURL}, + Timeout: model.Duration(time.Second), + }) + if err != nil { + t.Fatal(err) + } + + err = c.Store(context.Background(), &prompb.WriteRequest{}) + if !reflect.DeepEqual(err, test.err) { + t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) + } + + server.Close() + } +} diff --git a/pkg/remote/codec.go b/pkg/remote/codec.go new file mode 100644 index 000000000..2613b9e82 --- /dev/null +++ b/pkg/remote/codec.go @@ -0,0 +1,427 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "sort" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +// decodeReadLimit is the maximum size of a read request body in bytes. +const decodeReadLimit = 32 * 1024 * 1024 + +type HTTPError struct { + msg string + status int +} + +func (e HTTPError) Error() string { + return e.msg +} + +func (e HTTPError) Status() int { + return e.status +} + +// DecodeReadRequest reads a remote.Request from a http.Request. +func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) { + compressed, err := ioutil.ReadAll(io.LimitReader(r.Body, decodeReadLimit)) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} + +// EncodeReadResponse writes a remote.Response to a http.ResponseWriter. +func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error { + data, err := proto.Marshal(resp) + if err != nil { + return err + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + compressed := snappy.Encode(nil, data) + _, err = w.Write(compressed) + return err +} + +// ToWriteRequest converts an array of samples into a WriteRequest proto. +func ToWriteRequest(samples []*model.Sample) *prompb.WriteRequest { + req := &prompb.WriteRequest{ + Timeseries: make([]prompb.TimeSeries, 0, len(samples)), + } + + for _, s := range samples { + ts := prompb.TimeSeries{ + Labels: MetricToLabelProtos(s.Metric), + Samples: []prompb.Sample{ + { + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }, + }, + } + req.Timeseries = append(req.Timeseries, ts) + } + + return req +} + +// ToQuery builds a Query proto. +func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectHints) (*prompb.Query, error) { + ms, err := toLabelMatchers(matchers) + if err != nil { + return nil, err + } + + var rp *prompb.ReadHints + if p != nil { + rp = &prompb.ReadHints{ + StepMs: p.Step, + Func: p.Func, + StartMs: p.Start, + EndMs: p.End, + } + } + + return &prompb.Query{ + StartTimestampMs: from, + EndTimestampMs: to, + Matchers: ms, + Hints: rp, + }, nil +} + +// FromQuery unpacks a Query proto. +func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, *storage.SelectHints, error) { + matchers, err := fromLabelMatchers(req.Matchers) + if err != nil { + return 0, 0, nil, nil, err + } + var SelectHints *storage.SelectHints + if req.Hints != nil { + SelectHints = &storage.SelectHints{ + Start: req.Hints.StartMs, + End: req.Hints.EndMs, + Step: req.Hints.StepMs, + Func: req.Hints.Func, + } + } + + return req.StartTimestampMs, req.EndTimestampMs, matchers, SelectHints, nil +} + +// ToQueryResult builds a QueryResult proto. +func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) { + numSamples := 0 + resp := &prompb.QueryResult{} + for ss.Next() { + series := ss.At() + iter := series.Iterator() + samples := []prompb.Sample{} + + for iter.Next() { + numSamples++ + if sampleLimit > 0 && numSamples > sampleLimit { + return nil, HTTPError{ + msg: fmt.Sprintf("exceeded sample limit (%d)", sampleLimit), + status: http.StatusBadRequest, + } + } + ts, val := iter.At() + samples = append(samples, prompb.Sample{ + Timestamp: ts, + Value: val, + }) + } + if err := iter.Err(); err != nil { + return nil, err + } + + resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{ + Labels: labelsToLabelsProto(series.Labels()), + Samples: samples, + }) + } + if err := ss.Err(); err != nil { + return nil, err + } + return resp, nil +} + +// FromQueryResult unpacks a QueryResult proto. +func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet { + series := make([]storage.Series, 0, len(res.Timeseries)) + for _, ts := range res.Timeseries { + labels := labelProtosToLabels(ts.Labels) + if err := validateLabelsAndMetricName(labels); err != nil { + return errSeriesSet{err: err} + } + + series = append(series, &concreteSeries{ + labels: labels, + samples: ts.Samples, + }) + } + if sortSeries { + sort.Sort(byLabel(series)) + } + return &concreteSeriesSet{ + series: series, + } +} + +type byLabel []storage.Series + +func (a byLabel) Len() int { return len(a) } +func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } + +// errSeriesSet implements storage.SeriesSet, just returning an error. +type errSeriesSet struct { + err error +} + +func (errSeriesSet) Next() bool { + return false +} + +func (errSeriesSet) At() storage.Series { + return nil +} + +func (e errSeriesSet) Err() error { + return e.err +} + +func (e errSeriesSet) Warnings() storage.Warnings { return nil } + +// concreteSeriesSet implements storage.SeriesSet. +type concreteSeriesSet struct { + cur int + series []storage.Series +} + +func (c *concreteSeriesSet) Next() bool { + c.cur++ + return c.cur-1 < len(c.series) +} + +func (c *concreteSeriesSet) At() storage.Series { + return c.series[c.cur-1] +} + +func (c *concreteSeriesSet) Err() error { + return nil +} + +func (c *concreteSeriesSet) Warnings() storage.Warnings { return nil } + +// concreteSeries implements storage.Series. +type concreteSeries struct { + labels labels.Labels + samples []prompb.Sample +} + +func (c *concreteSeries) Labels() labels.Labels { + return labels.New(c.labels...) +} + +func (c *concreteSeries) Iterator() chunkenc.Iterator { + return newConcreteSeriersIterator(c) +} + +// concreteSeriesIterator implements storage.SeriesIterator. +type concreteSeriesIterator struct { + cur int + series *concreteSeries +} + +func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator { + return &concreteSeriesIterator{ + cur: -1, + series: series, + } +} + +// Seek implements storage.SeriesIterator. +func (c *concreteSeriesIterator) Seek(t int64) bool { + c.cur = sort.Search(len(c.series.samples), func(n int) bool { + return c.series.samples[n].Timestamp >= t + }) + return c.cur < len(c.series.samples) +} + +// At implements storage.SeriesIterator. +func (c *concreteSeriesIterator) At() (t int64, v float64) { + s := c.series.samples[c.cur] + return s.Timestamp, s.Value +} + +// Next implements storage.SeriesIterator. +func (c *concreteSeriesIterator) Next() bool { + c.cur++ + return c.cur < len(c.series.samples) +} + +// Err implements storage.SeriesIterator. +func (c *concreteSeriesIterator) Err() error { + return nil +} + +// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read. +func validateLabelsAndMetricName(ls labels.Labels) error { + for _, l := range ls { + if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) { + return fmt.Errorf("invalid metric name: %v", l.Value) + } + if !model.LabelName(l.Name).IsValid() { + return fmt.Errorf("invalid label name: %v", l.Name) + } + if !model.LabelValue(l.Value).IsValid() { + return fmt.Errorf("invalid label value: %v", l.Value) + } + } + return nil +} + +func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) { + pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers)) + for _, m := range matchers { + var mType prompb.LabelMatcher_Type + switch m.Type { + case labels.MatchEqual: + mType = prompb.LabelMatcher_EQ + case labels.MatchNotEqual: + mType = prompb.LabelMatcher_NEQ + case labels.MatchRegexp: + mType = prompb.LabelMatcher_RE + case labels.MatchNotRegexp: + mType = prompb.LabelMatcher_NRE + default: + return nil, fmt.Errorf("invalid matcher type") + } + pbMatchers = append(pbMatchers, &prompb.LabelMatcher{ + Type: mType, + Name: m.Name, + Value: m.Value, + }) + } + return pbMatchers, nil +} + +func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) { + result := make([]*labels.Matcher, 0, len(matchers)) + for _, matcher := range matchers { + var mtype labels.MatchType + switch matcher.Type { + case prompb.LabelMatcher_EQ: + mtype = labels.MatchEqual + case prompb.LabelMatcher_NEQ: + mtype = labels.MatchNotEqual + case prompb.LabelMatcher_RE: + mtype = labels.MatchRegexp + case prompb.LabelMatcher_NRE: + mtype = labels.MatchNotRegexp + default: + return nil, fmt.Errorf("invalid matcher type") + } + matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value) + if err != nil { + return nil, err + } + result = append(result, matcher) + } + return result, nil +} + +// MetricToLabelProtos builds a []*prompb.Label from a model.Metric +func MetricToLabelProtos(metric model.Metric) []prompb.Label { + labels := make([]prompb.Label, 0, len(metric)) + for k, v := range metric { + labels = append(labels, prompb.Label{ + Name: string(k), + Value: string(v), + }) + } + sort.Slice(labels, func(i int, j int) bool { + return labels[i].Name < labels[j].Name + }) + return labels +} + +// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric +func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { + metric := make(model.Metric, len(labelPairs)) + for _, l := range labelPairs { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} + +func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels { + result := make(labels.Labels, 0, len(labelPairs)) + for _, l := range labelPairs { + result = append(result, labels.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Sort(result) + return result +} + +func labelsToLabelsProto(labels labels.Labels) []prompb.Label { + result := make([]prompb.Label, 0, len(labels)) + for _, l := range labels { + result = append(result, prompb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + return result +} + +func labelsToMetric(ls labels.Labels) model.Metric { + metric := make(model.Metric, len(ls)) + for _, l := range ls { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} diff --git a/pkg/remote/codec_test.go b/pkg/remote/codec_test.go new file mode 100644 index 000000000..b875065a2 --- /dev/null +++ b/pkg/remote/codec_test.go @@ -0,0 +1,147 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" +) + +func TestValidateLabelsAndMetricName(t *testing.T) { + tests := []struct { + input labels.Labels + expectedErr string + shouldPass bool + }{ + { + input: labels.FromStrings( + "__name__", "name", + "labelName", "labelValue", + ), + expectedErr: "", + shouldPass: true, + }, + { + input: labels.FromStrings( + "__name__", "name", + "_labelName", "labelValue", + ), + expectedErr: "", + shouldPass: true, + }, + { + input: labels.FromStrings( + "__name__", "name", + "@labelName", "labelValue", + ), + expectedErr: "Invalid label name: @labelName", + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "name", + "123labelName", "labelValue", + ), + expectedErr: "Invalid label name: 123labelName", + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "name", + "", "labelValue", + ), + expectedErr: "Invalid label name: ", + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "name", + "labelName", string([]byte{0xff}), + ), + expectedErr: "Invalid label value: " + string([]byte{0xff}), + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "@invalid_name", + ), + expectedErr: "Invalid metric name: @invalid_name", + shouldPass: false, + }, + } + + for _, test := range tests { + err := validateLabelsAndMetricName(test.input) + if test.shouldPass != (err == nil) { + if test.shouldPass { + t.Fatalf("Test should pass, got unexpected error: %v", err) + } else { + t.Fatalf("Test should fail, unexpected error, got: %v, expected: %v", err, test.expectedErr) + } + } + } +} + +func TestConcreteSeriesSet(t *testing.T) { + series1 := &concreteSeries{ + labels: labels.FromStrings("foo", "bar"), + samples: []prompb.Sample{{Value: 1, Timestamp: 2}}, + } + series2 := &concreteSeries{ + labels: labels.FromStrings("foo", "baz"), + samples: []prompb.Sample{{Value: 3, Timestamp: 4}}, + } + c := &concreteSeriesSet{ + series: []storage.Series{series1, series2}, + } + if !c.Next() { + t.Fatalf("Expected Next() to be true.") + } + if c.At() != series1 { + t.Fatalf("Unexpected series returned.") + } + if !c.Next() { + t.Fatalf("Expected Next() to be true.") + } + if c.At() != series2 { + t.Fatalf("Unexpected series returned.") + } + if c.Next() { + t.Fatalf("Expected Next() to be false.") + } +} + +func TestConcreteSeriesClonesLabels(t *testing.T) { + lbls := labels.Labels{ + labels.Label{Name: "a", Value: "b"}, + labels.Label{Name: "c", Value: "d"}, + } + cs := concreteSeries{ + labels: labels.New(lbls...), + } + + gotLabels := cs.Labels() + require.Equal(t, lbls, gotLabels) + + gotLabels[0].Value = "foo" + gotLabels[1].Value = "bar" + + gotLabels = cs.Labels() + require.Equal(t, lbls, gotLabels) +} diff --git a/pkg/remote/ewma.go b/pkg/remote/ewma.go new file mode 100644 index 000000000..82b6dd101 --- /dev/null +++ b/pkg/remote/ewma.go @@ -0,0 +1,68 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "sync" + "sync/atomic" + "time" +) + +// ewmaRate tracks an exponentially weighted moving average of a per-second rate. +type ewmaRate struct { + newEvents int64 + alpha float64 + interval time.Duration + lastRate float64 + init bool + mutex sync.Mutex +} + +// newEWMARate always allocates a new ewmaRate, as this guarantees the atomically +// accessed int64 will be aligned on ARM. See prometheus#2666. +func newEWMARate(alpha float64, interval time.Duration) *ewmaRate { + return &ewmaRate{ + alpha: alpha, + interval: interval, + } +} + +// rate returns the per-second rate. +func (r *ewmaRate) rate() float64 { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.lastRate +} + +// tick assumes to be called every r.interval. +func (r *ewmaRate) tick() { + newEvents := atomic.LoadInt64(&r.newEvents) + atomic.AddInt64(&r.newEvents, -newEvents) + instantRate := float64(newEvents) / r.interval.Seconds() + + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.init { + r.lastRate += r.alpha * (instantRate - r.lastRate) + } else { + r.init = true + r.lastRate = instantRate + } +} + +// inc counts one event. +func (r *ewmaRate) incr(incr int64) { + atomic.AddInt64(&r.newEvents, incr) +} diff --git a/pkg/remote/queue_manager.go b/pkg/remote/queue_manager.go new file mode 100644 index 000000000..2e5542255 --- /dev/null +++ b/pkg/remote/queue_manager.go @@ -0,0 +1,563 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "math" + "sync" + "sync/atomic" + "time" + + "golang.org/x/time/rate" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/prompb" +) + +// String constants for instrumentation. +const ( + namespace = "promxy" + subsystem = "remote_storage" + queue = "queue" + + // We track samples in/out and how long pushes take using an Exponentially + // Weighted Moving Average. + ewmaWeight = 0.2 + shardUpdateDuration = 10 * time.Second + + // Allow 30% too many shards before scaling down. + shardToleranceFraction = 0.3 + + // Limit to 1 log event every 10s + logRateLimit = 0.1 + logBurst = 10 +) + +var ( + // DefaultQueueConfig is the default remote queue configuration. + DefaultQueueConfig = config.QueueConfig{ + // With a maximum of 1000 shards, assuming an average of 100ms remote write + // time and 100 samples per batch, we will be able to push 1M samples/s. + MaxShards: 1000, + MinShards: 1, + MaxSamplesPerSend: 100, + + // Each shard will have a max of 10 samples pending in it's channel, plus the pending + // samples that have been enqueued. Theoretically we should only ever have about 110 samples + // per shard pending. At 1000 shards that's 110k. + Capacity: 10000, + BatchSendDeadline: model.Duration(5 * time.Second), + + // Max number of times to retry a batch on recoverable errors. + MinBackoff: model.Duration(30 * time.Millisecond), + MaxBackoff: model.Duration(100 * time.Millisecond), + } + + succeededSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "succeeded_samples_total", + Help: "Total number of samples successfully sent to remote storage.", + }, + []string{queue}, + ) + failedSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_samples_total", + Help: "Total number of samples which failed on send to remote storage.", + }, + []string{queue}, + ) + droppedSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dropped_samples_total", + Help: "Total number of samples which were dropped due to the queue being full.", + }, + []string{queue}, + ) + sentBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + Buckets: prometheus.DefBuckets, + }, + []string{queue}, + ) + queueLength = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_length", + Help: "The number of processed samples queued to be sent to the remote storage.", + }, + []string{queue}, + ) + shardCapacity = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shard_capacity", + Help: "The capacity of each shard of the queue used for parallel sending to the remote storage.", + }, + []string{queue}, + ) + numShards = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards", + Help: "The number of shards used for parallel sending to the remote storage.", + }, + []string{queue}, + ) +) + +func init() { + prometheus.MustRegister(succeededSamplesTotal) + prometheus.MustRegister(failedSamplesTotal) + prometheus.MustRegister(droppedSamplesTotal) + prometheus.MustRegister(sentBatchDuration) + prometheus.MustRegister(queueLength) + prometheus.MustRegister(shardCapacity) + prometheus.MustRegister(numShards) +} + +// StorageClient defines an interface for sending a batch of samples to an +// external timeseries database. +type StorageClient interface { + // Store stores the given samples in the remote storage. + Store(context.Context, *prompb.WriteRequest) error + // Name identifies the remote storage implementation. + Name() string +} + +// QueueManager manages a queue of samples to be sent to the Storage +// indicated by the provided StorageClient. +type QueueManager struct { + logger log.Logger + + flushDeadline time.Duration + cfg config.QueueConfig + externalLabels labels.Labels + relabelConfigs []*relabel.Config + client StorageClient + queueName string + logLimiter *rate.Limiter + + shardsMtx sync.RWMutex + shards *shards + numShards int + reshardChan chan int + quit chan struct{} + wg sync.WaitGroup + + samplesIn, samplesOut, samplesOutDuration *ewmaRate + integralAccumulator float64 +} + +// NewQueueManager builds a new QueueManager. +func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { + if logger == nil { + logger = log.NewNopLogger() + } else { + logger = log.With(logger, "queue", client.Name()) + } + t := &QueueManager{ + logger: logger, + flushDeadline: flushDeadline, + cfg: cfg, + externalLabels: externalLabels, + relabelConfigs: relabelConfigs, + client: client, + queueName: client.Name(), + + logLimiter: rate.NewLimiter(logRateLimit, logBurst), + numShards: cfg.MinShards, + reshardChan: make(chan int), + quit: make(chan struct{}), + + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), + } + t.shards = t.newShards(t.numShards) + numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) + shardCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) + + // Initialize counter labels to zero. + sentBatchDuration.WithLabelValues(t.queueName) + succeededSamplesTotal.WithLabelValues(t.queueName) + failedSamplesTotal.WithLabelValues(t.queueName) + droppedSamplesTotal.WithLabelValues(t.queueName) + + return t +} + +// Append queues a sample to be sent to the remote storage. It drops the +// sample on the floor if the queue is full. +// Always returns nil. +func (t *QueueManager) Append(s *model.Sample) error { + snew := *s + snew.Metric = s.Metric.Clone() + + b := labels.NewBuilder(t.externalLabels) + for k, v := range s.Metric { + if !t.externalLabels.Has(string(k)) { + b.Set(string(k), string(v)) + } + } + + ls := relabel.Process(b.Labels(), t.relabelConfigs...) + + snew.Metric = make(model.Metric, len(ls)) + for _, label := range ls { + snew.Metric[model.LabelName(label.Name)] = model.LabelValue(label.Value) + } + + if snew.Metric == nil { + return nil + } + + t.shardsMtx.RLock() + enqueued := t.shards.enqueue(&snew) + t.shardsMtx.RUnlock() + + if enqueued { + queueLength.WithLabelValues(t.queueName).Inc() + } else { + droppedSamplesTotal.WithLabelValues(t.queueName).Inc() + if t.logLimiter.Allow() { + level.Warn(t.logger).Log("msg", "Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.") + } + } + return nil +} + +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (*QueueManager) NeedsThrottling() bool { + return false +} + +// Start the queue manager sending samples to the remote storage. +// Does not block. +func (t *QueueManager) Start() { + t.wg.Add(2) + go t.updateShardsLoop() + go t.reshardLoop() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.start() +} + +// Stop stops sending samples to the remote storage and waits for pending +// sends to complete. +func (t *QueueManager) Stop() { + level.Info(t.logger).Log("msg", "Stopping remote storage...") + close(t.quit) + t.wg.Wait() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.stop(t.flushDeadline) + + level.Info(t.logger).Log("msg", "Remote storage stopped.") +} + +func (t *QueueManager) updateShardsLoop() { + defer t.wg.Done() + + ticker := time.NewTicker(shardUpdateDuration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + t.calculateDesiredShards() + case <-t.quit: + return + } + } +} + +func (t *QueueManager) calculateDesiredShards() { + t.samplesIn.tick() + t.samplesOut.tick() + t.samplesOutDuration.tick() + + // We use the number of incoming samples as a prediction of how much work we + // will need to do next iteration. We add to this any pending samples + // (received - send) so we can catch up with any backlog. We use the average + // outgoing batch latency to work out how many shards we need. + var ( + samplesIn = t.samplesIn.rate() + samplesOut = t.samplesOut.rate() + samplesPending = samplesIn - samplesOut + samplesOutDuration = t.samplesOutDuration.rate() + ) + + // We use an integral accumulator, like in a PID, to help dampen oscillation. + t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1) + + if samplesOut <= 0 { + return + } + + var ( + timePerSample = samplesOutDuration / samplesOut + desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second) + ) + level.Debug(t.logger).Log("msg", "QueueManager.caclulateDesiredShards", + "samplesIn", samplesIn, "samplesOut", samplesOut, + "samplesPending", samplesPending, "desiredShards", desiredShards) + + // Changes in the number of shards must be greater than shardToleranceFraction. + var ( + lowerBound = float64(t.numShards) * (1. - shardToleranceFraction) + upperBound = float64(t.numShards) * (1. + shardToleranceFraction) + ) + level.Debug(t.logger).Log("msg", "QueueManager.updateShardsLoop", + "lowerBound", lowerBound, "desiredShards", desiredShards, "upperBound", upperBound) + if lowerBound <= desiredShards && desiredShards <= upperBound { + return + } + + numShards := int(math.Ceil(desiredShards)) + if numShards > t.cfg.MaxShards { + numShards = t.cfg.MaxShards + } else if numShards < t.cfg.MinShards { + numShards = t.cfg.MinShards + } + if numShards == t.numShards { + return + } + + // Resharding can take some time, and we want this loop + // to stay close to shardUpdateDuration. + select { + case t.reshardChan <- numShards: + level.Info(t.logger).Log("msg", "Remote storage resharding", "from", t.numShards, "to", numShards) + t.numShards = numShards + default: + level.Info(t.logger).Log("msg", "Currently resharding, skipping.") + } +} + +func (t *QueueManager) reshardLoop() { + defer t.wg.Done() + + for { + select { + case numShards := <-t.reshardChan: + t.reshard(numShards) + case <-t.quit: + return + } + } +} + +func (t *QueueManager) reshard(n int) { + numShards.WithLabelValues(t.queueName).Set(float64(n)) + + t.shardsMtx.Lock() + newShards := t.newShards(n) + oldShards := t.shards + t.shards = newShards + t.shardsMtx.Unlock() + + oldShards.stop(t.flushDeadline) + + // We start the newShards after we have stopped (the therefore completely + // flushed) the oldShards, to guarantee we only every deliver samples in + // order. + newShards.start() +} + +type shards struct { + qm *QueueManager + queues []chan *model.Sample + done chan struct{} + running int32 + ctx context.Context + cancel context.CancelFunc +} + +func (t *QueueManager) newShards(numShards int) *shards { + queues := make([]chan *model.Sample, numShards) + for i := 0; i < numShards; i++ { + queues[i] = make(chan *model.Sample, t.cfg.Capacity) + } + ctx, cancel := context.WithCancel(context.Background()) + s := &shards{ + qm: t, + queues: queues, + done: make(chan struct{}), + running: int32(numShards), + ctx: ctx, + cancel: cancel, + } + return s +} + +func (s *shards) start() { + for i := 0; i < len(s.queues); i++ { + go s.runShard(i) + } +} + +func (s *shards) stop(deadline time.Duration) { + // Attempt a clean shutdown. + for _, shard := range s.queues { + close(shard) + } + select { + case <-s.done: + return + case <-time.After(deadline): + level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") + } + + // Force an unclean shutdown. + s.cancel() + <-s.done +} + +func (s *shards) enqueue(sample *model.Sample) bool { + s.qm.samplesIn.incr(1) + + fp := sample.Metric.FastFingerprint() + shard := uint64(fp) % uint64(len(s.queues)) + + select { + case s.queues[shard] <- sample: + return true + default: + return false + } +} + +func (s *shards) runShard(i int) { + defer func() { + if atomic.AddInt32(&s.running, -1) == 0 { + close(s.done) + } + }() + + queue := s.queues[i] + + // Send batches of at most MaxSamplesPerSend samples to the remote storage. + // If we have fewer samples than that, flush them out after a deadline + // anyways. + pendingSamples := model.Samples{} + + timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) + stop := func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + } + defer stop() + + for { + select { + case <-s.ctx.Done(): + return + + case sample, ok := <-queue: + if !ok { + if len(pendingSamples) > 0 { + level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", len(pendingSamples)) + s.sendSamples(pendingSamples) + level.Debug(s.qm.logger).Log("msg", "Done flushing.") + } + return + } + + queueLength.WithLabelValues(s.qm.queueName).Dec() + pendingSamples = append(pendingSamples, sample) + + if len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { + s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) + pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] + + stop() + timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) + } + + case <-timer.C: + if len(pendingSamples) > 0 { + s.sendSamples(pendingSamples) + pendingSamples = pendingSamples[:0] + } + timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) + } + } +} + +func (s *shards) sendSamples(samples model.Samples) { + begin := time.Now() + s.sendSamplesWithBackoff(samples) + + // These counters are used to calculate the dynamic sharding, and as such + // should be maintained irrespective of success or failure. + s.qm.samplesOut.incr(int64(len(samples))) + s.qm.samplesOutDuration.incr(int64(time.Since(begin))) +} + +// sendSamples to the remote storage with backoff for recoverable errors. +func (s *shards) sendSamplesWithBackoff(samples model.Samples) { + backoff := s.qm.cfg.MinBackoff + req := ToWriteRequest(samples) + + for { // Upstream now retries indefinitely + begin := time.Now() + err := s.qm.client.Store(s.ctx, req) + + sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) + if err == nil { + succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + return + } + + level.Warn(s.qm.logger).Log("msg", "Error sending samples to remote storage", "count", len(samples), "err", err) + if _, ok := err.(recoverableError); !ok { + break + } + time.Sleep(time.Duration(backoff)) + backoff = backoff * 2 + if backoff > s.qm.cfg.MaxBackoff { + backoff = s.qm.cfg.MaxBackoff + } + } + + failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) +} diff --git a/pkg/remote/queue_manager_test.go b/pkg/remote/queue_manager_test.go new file mode 100644 index 000000000..b335dcf81 --- /dev/null +++ b/pkg/remote/queue_manager_test.go @@ -0,0 +1,329 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "fmt" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" +) + +const defaultFlushDeadline = 1 * time.Minute + +type TestStorageClient struct { + receivedSamples map[string][]prompb.Sample + expectedSamples map[string][]prompb.Sample + wg sync.WaitGroup + mtx sync.Mutex +} + +func NewTestStorageClient() *TestStorageClient { + return &TestStorageClient{ + receivedSamples: map[string][]prompb.Sample{}, + expectedSamples: map[string][]prompb.Sample{}, + } +} + +func (c *TestStorageClient) expectSamples(ss model.Samples) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.expectedSamples = map[string][]prompb.Sample{} + c.receivedSamples = map[string][]prompb.Sample{} + + for _, s := range ss { + ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String() + c.expectedSamples[ts] = append(c.expectedSamples[ts], prompb.Sample{ + Timestamp: int64(s.Timestamp), + Value: float64(s.Value), + }) + } + c.wg.Add(len(ss)) +} + +func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { + c.wg.Wait() + + c.mtx.Lock() + defer c.mtx.Unlock() + for ts, expectedSamples := range c.expectedSamples { + if !reflect.DeepEqual(expectedSamples, c.receivedSamples[ts]) { + t.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts]) + } + } +} + +func (c *TestStorageClient) Store(_ context.Context, req *prompb.WriteRequest) error { + c.mtx.Lock() + defer c.mtx.Unlock() + count := 0 + for _, ts := range req.Timeseries { + labels := labelProtosToLabels(ts.Labels).String() + for _, sample := range ts.Samples { + count++ + c.receivedSamples[labels] = append(c.receivedSamples[labels], sample) + } + } + c.wg.Add(-count) + return nil +} + +func (c *TestStorageClient) Name() string { + return "teststorageclient" +} + +func TestSampleDelivery(t *testing.T) { + // Let's create an even number of send batches so we don't run into the + // batch timeout case. + n := DefaultQueueConfig.Capacity * 2 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples[:len(samples)/2]) + + cfg := DefaultQueueConfig + cfg.MaxShards = 1 + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) + + // These should be received by the client. + for _, s := range samples[:len(samples)/2] { + m.Append(s) + } + // These will be dropped because the queue is full. + for _, s := range samples[len(samples)/2:] { + m.Append(s) + } + m.Start() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + +func TestSampleDeliveryTimeout(t *testing.T) { + // Let's send one less sample than batch size, and wait the timeout duration + n := DefaultQueueConfig.Capacity - 1 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + + cfg := DefaultQueueConfig + cfg.MaxShards = 1 + cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) + m.Start() + defer m.Stop() + + // Send the samples twice, waiting for the samples in the meantime. + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) + + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) +} + +func TestSampleDeliveryOrder(t *testing.T) { + ts := 10 + n := DefaultQueueConfig.MaxSamplesPerSend * ts + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples) + m := NewQueueManager(nil, DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + + // These should be received by the client. + for _, s := range samples { + m.Append(s) + } + m.Start() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + +// TestBlockingStorageClient is a queue_manager StorageClient which will block +// on any calls to Store(), until the `block` channel is closed, at which point +// the `numCalls` property will contain a count of how many times Store() was +// called. +type TestBlockingStorageClient struct { + numCalls uint64 + block chan bool +} + +func NewTestBlockedStorageClient() *TestBlockingStorageClient { + return &TestBlockingStorageClient{ + block: make(chan bool), + numCalls: 0, + } +} + +func (c *TestBlockingStorageClient) Store(ctx context.Context, _ *prompb.WriteRequest) error { + atomic.AddUint64(&c.numCalls, 1) + select { + case <-c.block: + case <-ctx.Done(): + } + return nil +} + +func (c *TestBlockingStorageClient) NumCalls() uint64 { + return atomic.LoadUint64(&c.numCalls) +} + +func (c *TestBlockingStorageClient) unlock() { + close(c.block) +} + +func (c *TestBlockingStorageClient) Name() string { + return "testblockingstorageclient" +} + +func (t *QueueManager) queueLen() int { + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + queueLength := 0 + for _, shard := range t.shards.queues { + queueLength += len(shard) + } + return queueLength +} + +func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { + // Our goal is to fully empty the queue: + // `MaxSamplesPerSend*Shards` samples should be consumed by the + // per-shard goroutines, and then another `MaxSamplesPerSend` + // should be left on the queue. + n := DefaultQueueConfig.MaxSamplesPerSend * 2 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestBlockedStorageClient() + cfg := DefaultQueueConfig + cfg.MaxShards = 1 + cfg.Capacity = n + m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) + + m.Start() + + defer func() { + c.unlock() + m.Stop() + }() + + for _, s := range samples { + m.Append(s) + } + + // Wait until the runShard() loops drain the queue. If things went right, it + // should then immediately block in sendSamples(), but, in case of error, + // it would spawn too many goroutines, and thus we'd see more calls to + // client.Store() + // + // The timed wait is maybe non-ideal, but, in order to verify that we're + // not spawning too many concurrent goroutines, we have to wait on the + // Run() loop to consume a specific number of elements from the + // queue... and it doesn't signal that in any obvious way, except by + // draining the queue. We cap the waiting at 1 second -- that should give + // plenty of time, and keeps the failure fairly quick if we're not draining + // the queue properly. + for i := 0; i < 100 && m.queueLen() > 0; i++ { + time.Sleep(10 * time.Millisecond) + } + + if m.queueLen() != DefaultQueueConfig.MaxSamplesPerSend { + t.Fatalf("Failed to drain QueueManager queue, %d elements left", + m.queueLen(), + ) + } + + numCalls := c.NumCalls() + if numCalls != uint64(1) { + t.Errorf("Saw %d concurrent sends, expected 1", numCalls) + } +} + +func TestShutdown(t *testing.T) { + deadline := 10 * time.Second + c := NewTestBlockedStorageClient() + m := NewQueueManager(nil, DefaultQueueConfig, nil, nil, c, deadline) + for i := 0; i < DefaultQueueConfig.MaxSamplesPerSend; i++ { + m.Append(&model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: model.LabelValue(fmt.Sprintf("test_metric_%d", i)), + }, + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }) + } + m.Start() + + start := time.Now() + m.Stop() + duration := time.Since(start) + if duration > deadline+(deadline/10) { + t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) + } +} diff --git a/pkg/remote/read.go b/pkg/remote/read.go new file mode 100644 index 000000000..7f137196d --- /dev/null +++ b/pkg/remote/read.go @@ -0,0 +1,257 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +var remoteReadQueries = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "remote_read_queries", + Help: "The number of in-flight remote read queries.", + }, + []string{"client"}, +) + +func init() { + prometheus.MustRegister(remoteReadQueries) +} + +// QueryableClient returns a storage.Queryable which queries the given +// Client to select series sets. +func QueryableClient(c *Client) storage.Queryable { + remoteReadQueries.WithLabelValues(c.Name()) + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &querier{ + ctx: ctx, + mint: mint, + maxt: maxt, + client: c, + }, nil + }) +} + +// querier is an adapter to make a Client usable as a storage.Querier. +type querier struct { + ctx context.Context + mint, maxt int64 + client *Client +} + +// Select implements storage.Querier and uses the given matchers to read series +// sets from the Client. +func (q *querier) Select(sortSeries bool, p *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + query, err := ToQuery(q.mint, q.maxt, matchers, p) + if err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "toQuery")) + } + + remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.Name()) + remoteReadGauge.Inc() + defer remoteReadGauge.Dec() + + res, err := q.client.Read(q.ctx, query) + if err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "Read")) + } + + return FromQueryResult(sortSeries, res) +} + +// LabelValues implements storage.Querier and is a noop. +func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { + // TODO implement? + return nil, nil, nil +} + +// LabelNames implements storage.Querier and is a noop. +func (q *querier) LabelNames() ([]string, storage.Warnings, error) { + // TODO implement? + return nil, nil, nil +} + +// Close implements storage.Querier and is a noop. +func (q *querier) Close() error { + return nil +} + +// ExternalLabelsHandler returns a storage.Queryable which creates a +// externalLabelsQuerier. +func ExternalLabelsHandler(next storage.Queryable, externalLabels model.LabelSet) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := next.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil + }) +} + +// externalLabelsQuerier is a querier which ensures that Select() results match +// the configured external labels. +type externalLabelsQuerier struct { + storage.Querier + + externalLabels model.LabelSet +} + +// Select adds equality matchers for all external labels to the list of matchers +// before calling the wrapped storage.Queryable. The added external labels are +// removed from the returned series sets. +func (q externalLabelsQuerier) Select(sortSeries bool, p *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + m, added := q.addExternalLabels(matchers) + s := q.Querier.Select(sortSeries, p, m...) + return newSeriesSetFilter(s, added) +} + +// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier +// if requested timeframe can be answered completely by the local TSDB, and +// reduces maxt if the timeframe can be partially answered by TSDB. +func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + localStartTime, err := cb() + if err != nil { + return nil, err + } + cmaxt := maxt + // Avoid queries whose timerange is later than the first timestamp in local DB. + if mint > localStartTime { + return storage.NoopQuerier(), nil + } + // Query only samples older than the first timestamp in local DB. + if maxt > localStartTime { + cmaxt = localStartTime + } + return next.Querier(ctx, mint, cmaxt) + }) +} + +// RequiredMatchersFilter returns a storage.Queryable which creates a +// requiredMatchersQuerier. +func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := next.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil + }) +} + +// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls +// to match the given labelSet. +type requiredMatchersQuerier struct { + storage.Querier + + requiredMatchers []*labels.Matcher +} + +// Select returns a NoopSeriesSet if the given matchers don't match the label +// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. +func (q requiredMatchersQuerier) Select(sortSeries bool, p *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + ms := q.requiredMatchers + for _, m := range matchers { + for i, r := range ms { + if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value { + ms = append(ms[:i], ms[i+1:]...) + break + } + } + if len(ms) == 0 { + break + } + } + if len(ms) > 0 { + return storage.NoopSeriesSet() + } + return q.Querier.Select(sortSeries, p, matchers...) +} + +// addExternalLabels adds matchers for each external label. External labels +// that already have a corresponding user-supplied matcher are skipped, as we +// assume that the user explicitly wants to select a different value for them. +// We return the new set of matchers, along with a map of labels for which +// matchers were added, so that these can later be removed from the result +// time series again. +func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { + el := make(model.LabelSet, len(q.externalLabels)) + for k, v := range q.externalLabels { + el[k] = v + } + for _, m := range ms { + delete(el, model.LabelName(m.Name)) + } + for k, v := range el { + m, err := labels.NewMatcher(labels.MatchEqual, string(k), string(v)) + if err != nil { + panic(err) + } + ms = append(ms, m) + } + return ms, el +} + +func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet { + return &seriesSetFilter{ + SeriesSet: ss, + toFilter: toFilter, + } +} + +type seriesSetFilter struct { + storage.SeriesSet + toFilter model.LabelSet + querier storage.Querier +} + +func (ssf *seriesSetFilter) GetQuerier() storage.Querier { + return ssf.querier +} + +func (ssf *seriesSetFilter) SetQuerier(querier storage.Querier) { + ssf.querier = querier +} + +func (ssf seriesSetFilter) At() storage.Series { + return seriesFilter{ + Series: ssf.SeriesSet.At(), + toFilter: ssf.toFilter, + } +} + +type seriesFilter struct { + storage.Series + toFilter model.LabelSet +} + +func (sf seriesFilter) Labels() labels.Labels { + labels := sf.Series.Labels() + for i := 0; i < len(labels); { + if _, ok := sf.toFilter[model.LabelName(labels[i].Name)]; ok { + labels = labels[:i+copy(labels[i:], labels[i+1:])] + continue + } + i++ + } + return labels +} diff --git a/pkg/remote/read_test.go b/pkg/remote/read_test.go new file mode 100644 index 000000000..e2462aa9b --- /dev/null +++ b/pkg/remote/read_test.go @@ -0,0 +1,334 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "reflect" + "sort" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" +) + +func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher { + m, err := labels.NewMatcher(mt, name, val) + if err != nil { + panic(err) + } + return m +} + +func TestExternalLabelsQuerierSelect(t *testing.T) { + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + } + q := &externalLabelsQuerier{ + Querier: mockQuerier{}, + externalLabels: model.LabelSet{"region": "europe"}, + } + want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) + have := q.Select(false, nil, matchers...) + if err := have.Err(); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(want, have) { + t.Errorf("expected series set %+v, got %+v", want, have) + } +} + +func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { + tests := []struct { + el model.LabelSet + inMatchers []*labels.Matcher + outMatchers []*labels.Matcher + added model.LabelSet + }{ + { + el: model.LabelSet{}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + added: model.LabelSet{}, + }, + { + el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"), + }, + added: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + }, + { + el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), + }, + added: model.LabelSet{"region": "europe"}, + }, + } + + for i, test := range tests { + q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el} + matchers, added := q.addExternalLabels(test.inMatchers) + + sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) + sort.Slice(matchers, func(i, j int) bool { return matchers[i].Name < matchers[j].Name }) + + if !reflect.DeepEqual(matchers, test.outMatchers) { + t.Fatalf("%d. unexpected matchers; want %v, got %v", i, test.outMatchers, matchers) + } + if !reflect.DeepEqual(added, test.added) { + t.Fatalf("%d. unexpected added labels; want %v, got %v", i, test.added, added) + } + } +} + +func TestSeriesSetFilter(t *testing.T) { + tests := []struct { + in *prompb.QueryResult + toRemove model.LabelSet + + expected *prompb.QueryResult + }{ + { + toRemove: model.LabelSet{"foo": "bar"}, + in: &prompb.QueryResult{ + Timeseries: []*prompb.TimeSeries{ + {Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []prompb.Sample{}}, + }, + }, + expected: &prompb.QueryResult{ + Timeseries: []*prompb.TimeSeries{ + {Labels: labelsToLabelsProto(labels.FromStrings("a", "b")), Samples: []prompb.Sample{}}, + }, + }, + }, + } + + for i, tc := range tests { + filtered := newSeriesSetFilter(FromQueryResult(false, tc.in), tc.toRemove) + have, err := ToQueryResult(filtered, 1e6) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, tc.expected) { + t.Fatalf("%d. unexpected labels; want %v, got %v", i, tc.expected, have) + } + } +} + +type mockQuerier struct { + ctx context.Context + mint, maxt int64 + + storage.Querier +} + +type mockSeriesSet struct { + storage.SeriesSet +} + +func (m mockSeriesSet) Err() error { + if m.SeriesSet != nil { + return m.SeriesSet.Err() + } + return nil +} + +func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { + return mockSeriesSet{} +} + +func TestPreferLocalStorageFilter(t *testing.T) { + ctx := context.Background() + + tests := []struct { + localStartTime int64 + mint int64 + maxt int64 + querier storage.Querier + }{ + { + localStartTime: int64(100), + mint: int64(0), + maxt: int64(50), + querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, + }, + { + localStartTime: int64(20), + mint: int64(0), + maxt: int64(50), + querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20}, + }, + { + localStartTime: int64(20), + mint: int64(30), + maxt: int64(50), + querier: storage.NoopQuerier(), + }, + } + + for i, test := range tests { + f := PreferLocalStorageFilter( + storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + }), + func() (int64, error) { return test.localStartTime, nil }, + ) + + q, err := f.Querier(ctx, test.mint, test.maxt) + if err != nil { + t.Fatal(err) + } + + if test.querier != q { + t.Errorf("%d. expected quierer %+v, got %+v", i, test.querier, q) + } + } +} + +func TestRequiredMatchersFilter(t *testing.T) { + ctx := context.Background() + + f := RequiredMatchersFilter( + storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + }), + []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")}, + ) + + want := &requiredMatchersQuerier{ + Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, + requiredMatchers: []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "special", "label")}, + } + have, err := f.Querier(ctx, 0, 50) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(want, have) { + t.Errorf("expected quierer %+v, got %+v", want, have) + } +} + +func TestRequiredLabelsQuerierSelect(t *testing.T) { + tests := []struct { + requiredMatchers []*labels.Matcher + matchers []*labels.Matcher + seriesSet storage.SeriesSet + }{ + { + requiredMatchers: []*labels.Matcher{}, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchRegexp, "special", "label"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "different"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + seriesSet: mockSeriesSet{}, + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "baz"), + }, + seriesSet: storage.NoopSeriesSet(), + }, + { + requiredMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + matchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "special", "label"), + mustNewLabelMatcher(labels.MatchEqual, "foo", "bar"), + }, + seriesSet: mockSeriesSet{}, + }, + } + + for i, test := range tests { + q := &requiredMatchersQuerier{ + Querier: mockQuerier{}, + requiredMatchers: test.requiredMatchers, + } + + have := q.Select(false, nil, test.matchers...) + if err := have.Err(); err != nil { + t.Error(err) + } + if want := test.seriesSet; want != have { + t.Errorf("%d. expected series set %+v, got %+v", i, want, have) + } + if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { + t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i) + } + } +} diff --git a/pkg/remote/storage.go b/pkg/remote/storage.go new file mode 100644 index 000000000..911c1139e --- /dev/null +++ b/pkg/remote/storage.go @@ -0,0 +1,174 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +// startTimeCallback is a callback func that return the oldest timestamp stored in a storage. +type startTimeCallback func() (int64, error) + +// Storage represents all the remote read and write endpoints. It implements +// storage.Storage. +type Storage struct { + logger log.Logger + mtx sync.RWMutex + + // For writes + queues []*QueueManager + + // For reads + queryables []storage.Queryable + localStartTimeCallback startTimeCallback + flushDeadline time.Duration +} + +// NewStorage returns a remote.Storage. +func NewStorage(l log.Logger, stCallback startTimeCallback, flushDeadline time.Duration) *Storage { + if l == nil { + l = log.NewNopLogger() + } + return &Storage{ + logger: l, + localStartTimeCallback: stCallback, + flushDeadline: flushDeadline, + } +} + +// ApplyConfig updates the state as the new config requires. +func (s *Storage) ApplyConfig(conf *config.Config) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + // Update write queues + + newQueues := []*QueueManager{} + // TODO: we should only stop & recreate queues which have changes, + // as this can be quite disruptive. + for i, rwConf := range conf.RemoteWriteConfigs { + c, err := NewClient(i, &ClientConfig{ + URL: rwConf.URL, + Timeout: rwConf.RemoteTimeout, + HTTPClientConfig: rwConf.HTTPClientConfig, + }) + if err != nil { + return err + } + newQueues = append(newQueues, NewQueueManager( + s.logger, + rwConf.QueueConfig, + conf.GlobalConfig.ExternalLabels, + rwConf.WriteRelabelConfigs, + c, + s.flushDeadline, + )) + } + + for _, q := range s.queues { + q.Stop() + } + + s.queues = newQueues + for _, q := range s.queues { + q.Start() + } + + // Update read clients + + s.queryables = make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) + for i, rrConf := range conf.RemoteReadConfigs { + c, err := NewClient(i, &ClientConfig{ + URL: rrConf.URL, + Timeout: rrConf.RemoteTimeout, + HTTPClientConfig: rrConf.HTTPClientConfig, + }) + if err != nil { + return err + } + + q := QueryableClient(c) + + ls := make(model.LabelSet, len(conf.GlobalConfig.ExternalLabels)) + for _, label := range conf.GlobalConfig.ExternalLabels { + ls[model.LabelName(label.Name)] = model.LabelValue(label.Value) + } + + q = ExternalLabelsHandler(q, ls) + if len(rrConf.RequiredMatchers) > 0 { + q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers)) + } + if !rrConf.ReadRecent { + q = PreferLocalStorageFilter(q, s.localStartTimeCallback) + } + s.queryables = append(s.queryables, q) + } + + return nil +} + +// StartTime implements the Storage interface. +func (s *Storage) StartTime() (int64, error) { + return int64(model.Latest), nil +} + +// Querier returns a storage.MergeQuerier combining the remote client queriers +// of each configured remote read endpoint. +func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + s.mtx.Lock() + queryables := s.queryables + s.mtx.Unlock() + + queriers := make([]storage.Querier, 0, len(queryables)) + for _, queryable := range queryables { + q, err := queryable.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + queriers = append(queriers, q) + } + return storage.NewMergeQuerier(queriers, nil, storage.ChainedSeriesMerge), nil +} + +// Close the background processing of the storage queues. +func (s *Storage) Close() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, q := range s.queues { + q.Stop() + } + + return nil +} + +func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher { + ms := make([]*labels.Matcher, 0, len(ls)) + for k, v := range ls { + ms = append(ms, &labels.Matcher{ + Type: labels.MatchEqual, + Name: string(k), + Value: string(v), + }) + } + return ms +} diff --git a/pkg/remote/write.go b/pkg/remote/write.go new file mode 100644 index 000000000..7aa4eebf8 --- /dev/null +++ b/pkg/remote/write.go @@ -0,0 +1,56 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +// Appender implements scrape.Appendable. +func (s *Storage) Appender() (storage.Appender, error) { + return s, nil +} + +// Add implements storage.Appender. +func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + for _, q := range s.queues { + if err := q.Append(&model.Sample{ + Metric: labelsToMetric(l), + Timestamp: model.Time(t), + Value: model.SampleValue(v), + }); err != nil { + panic(err) // QueueManager.Append() should always return nil as per doc string. + } + } + return 0, nil +} + +// AddFast implements storage.Appender. +func (s *Storage) AddFast(_ uint64, t int64, v float64) error { + return storage.ErrNotFound +} + +// Commit implements storage.Appender. +func (*Storage) Commit() error { + return nil +} + +// Rollback implements storage.Appender. +func (*Storage) Rollback() error { + return nil +}