diff --git a/CHANGELOG.md b/CHANGELOG.md index 378dbcb..127b1c3 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## tip +* FEATURE: make retry attempt for datasource requests if returned error is a temporary network error. See [this issue](https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/193) + ## [v0.8.5](https://github.com/VictoriaMetrics/victoriametrics-datasource/releases/tag/v0.8.5) * BUGFIX: restore support for Grafana versions below `10.0.0`. See [this issue](https://github.com/VictoriaMetrics/victoriametrics-datasource/issues/159). diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index cdcb1e6..f707350 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -3,8 +3,11 @@ package plugin import ( "context" "encoding/json" + "errors" "fmt" + "io" "net/http" + "strings" "sync" "time" @@ -114,8 +117,23 @@ func (d *Datasource) query(ctx context.Context, query backend.DataQuery) backend } resp, err := d.httpClient.Do(req) if err != nil { - err = fmt.Errorf("failed to make http request: %w", err) - return newResponseError(err, backend.StatusBadRequest) + if !isTrivialError(err) { + // Return unexpected error to the caller. + return newResponseError(err, backend.StatusBadRequest) + } + + // Something in the middle between client and datasource might be closing + // the connection. So we do a one more attempt in hope request will succeed. + req, err = http.NewRequestWithContext(ctx, settings.HTTPMethod, reqURL, nil) + if err != nil { + err = fmt.Errorf("failed to create new request with context: %w", err) + return newResponseError(err, backend.StatusBadRequest) + } + resp, err = d.httpClient.Do(req) + if err != nil { + err = fmt.Errorf("failed to make http request: %w", err) + return newResponseError(err, backend.StatusBadRequest) + } } defer func() { if err := resp.Body.Close(); err != nil { @@ -136,7 +154,7 @@ func (d *Datasource) query(ctx context.Context, query backend.DataQuery) backend frames, err := r.getDataFrames() if err != nil { - err = fmt.Errorf("failed to prepare data from reponse: %w", err) + err = fmt.Errorf("failed to prepare data from response: %w", err) return newResponseError(err, backend.StatusInternal) } for i := range frames { @@ -183,3 +201,16 @@ func newResponseError(err error, httpStatus backend.Status) backend.DataResponse log.DefaultLogger.Error(err.Error()) return backend.DataResponse{Status: httpStatus, Error: err} } + +// isTrivialError returns true if the err is temporary and can be retried. +func isTrivialError(err error) bool { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + // Suppress trivial network errors, which could occur at remote side. + s := err.Error() + if strings.Contains(s, "broken pipe") || strings.Contains(s, "reset by peer") { + return true + } + return false +} diff --git a/pkg/plugin/datasource_test.go b/pkg/plugin/datasource_test.go new file mode 100644 index 0000000..4826c8d --- /dev/null +++ b/pkg/plugin/datasource_test.go @@ -0,0 +1,369 @@ +package plugin + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +func TestDatasourceQueryRequest(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { + t.Fatalf("should not be called") + }) + c := -1 + mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) { + c++ + if r.Method != http.MethodPost { + t.Fatalf("expected POST method got %s", r.Method) + } + + switch c { + case 0: + w.WriteHeader(500) + case 1: + _, err := w.Write([]byte("[]")) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + case 2: + _, err := w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`)) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + case 3: + _, err := w.Write([]byte(`{"status":"unknown"}`)) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + case 4: + _, err := w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`)) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + case 5: + _, err := w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"ingress_nginx_request_qps","status":"100"},"values":[[1670324477.542,"1"]]}, {"metric":{"__name__":"ingress_nginx_request_qps","status":"500"},"values":[[1670324477.542,"2"]]}, {"metric":{"__name__":"ingress_nginx_request_qps","status":"200"},"values":[[1670324477.542,"3"]]}]}}`)) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + case 6: + _, err := w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`)) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + } + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + + ctx := context.Background() + settings := backend.DataSourceInstanceSettings{ + URL: srv.URL, + JSONData: []byte(`{"httpMethod":"POST","customQueryParameters":""}`), + } + + instance, err := NewDatasource(ctx, settings) + if err != nil { + t.Fatalf("unexpected %s", err) + } + + datasource := instance.(*Datasource) + + expErr := func(ctx context.Context, err string) { + rsp, gotErr := datasource.QueryData(ctx, &backend.QueryDataRequest{ + Queries: []backend.DataQuery{ + { + RefID: "A", + QueryType: instantQueryPath, + JSON: []byte(`{ + "refId": "A", + "instant": true, + "range": false, + "interval": "10s", + "intervalMs": 10000, + "timeInterval": "", + "expr": "sum(vm_http_request_total)", + "legendFormat": "__auto" +}`), + }, + }, + }) + response := rsp.Responses["A"] + + if response.Error == nil { + t.Fatalf("expected %v got nil", err) + } + + if !strings.Contains(response.Error.Error(), err) { + t.Fatalf("expected err %q; got %q", err, gotErr) + } + } + + expErr(ctx, "got unexpected response status code: 500") // 0 + expErr(ctx, "failed to decode body response: json: cannot unmarshal array into Go value of type plugin.Response") // 1 + expErr(ctx, "failed to prepare data from response: unknown result type \"\"") // 2 + expErr(ctx, "failed to prepare data from response: unknown result type \"\"") // 3 + expErr(ctx, "failed to prepare data from response: unmarshal err unexpected end of JSON input") // 4 + + // 5 + queryJSON := []byte(`{ + "refId": "A", + "instant": true, + "range": false, + "interval": "10s", + "intervalMs": 10000, + "timeInterval": "", + "expr": "sum(ingress_nginx_request_qps)", + "legendFormat": "__auto" +}`) + var q Query + if err := json.Unmarshal(queryJSON, &q); err != nil { + t.Fatalf("error parse query %s", err) + } + rsp, gotErr := datasource.QueryData(ctx, &backend.QueryDataRequest{Queries: []backend.DataQuery{ + { + RefID: "A", + QueryType: rangeQueryPath, + JSON: queryJSON, + }, + }, + }) + if gotErr != nil { + t.Fatalf("unexpected %s", gotErr) + } + + response := rsp.Responses["A"] + if len(response.Frames) != 3 { + t.Fatalf("expected 2 metrics got %d in %+v", len(response.Frames), response.Frames) + } + + expected := []*data.Frame{ + data.NewFrame("sum(ingress_nginx_request_qps)", + data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{time.Unix(1670324477, 0)}), + data.NewField(data.TimeSeriesValueFieldName, data.Labels{"__name__": "ingress_nginx_request_qps", "status": "100"}, []float64{1}), + ), + data.NewFrame("sum(ingress_nginx_request_qps)", + data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{time.Unix(1670324477, 0)}), + data.NewField(data.TimeSeriesValueFieldName, data.Labels{"__name__": "ingress_nginx_request_qps", "status": "500"}, []float64{2}), + ), + data.NewFrame("sum(ingress_nginx_request_qps)", + data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{time.Unix(1670324477, 0)}), + data.NewField(data.TimeSeriesValueFieldName, data.Labels{"__name__": "ingress_nginx_request_qps", "status": "200"}, []float64{3}), + ), + } + + for j := range expected { + q.addMetadataToMultiFrame(expected[j]) + } + for i := range response.Frames { + q.addMetadataToMultiFrame(response.Frames[i]) + } + + for i, frame := range response.Frames { + d, err := frame.MarshalJSON() + if err != nil { + t.Fatalf("error marshal response frames %s", err) + } + exd, err := expected[i].MarshalJSON() + if err != nil { + t.Fatalf("error marshal expected frames %s", err) + } + + if !bytes.Equal(d, exd) { + t.Fatalf("unexpected metric %s want %s", d, exd) + } + } + + // 6 + queryJSON = []byte(`{ + "refId": "A", + "instant": true, + "range": false, + "interval": "10s", + "intervalMs": 10000, + "timeInterval": "", + "expr": "sum(ingress_nginx_request_qps)", + "legendFormat": "__auto" +}`) + + if err := json.Unmarshal(queryJSON, &q); err != nil { + t.Fatalf("error parse query %s", err) + } + rsp, gotErr = datasource.QueryData(ctx, &backend.QueryDataRequest{Queries: []backend.DataQuery{ + { + RefID: "A", + QueryType: instantQueryPath, + JSON: queryJSON, + }, + }, + }) + if gotErr != nil { + t.Fatalf("unexpected %s", gotErr) + } + + expected = []*data.Frame{ + data.NewFrame("", + data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{time.Unix(1583786142, 0)}), + data.NewField(data.TimeSeriesValueFieldName, nil, []float64{1}), + ), + } + + response = rsp.Responses["A"] + + for j := range expected { + q.addMetadataToMultiFrame(expected[j]) + } + for i := range response.Frames { + q.addMetadataToMultiFrame(response.Frames[i]) + } + + for i, frame := range response.Frames { + d, err := frame.MarshalJSON() + if err != nil { + t.Fatalf("error marshal response frames %s", err) + } + exd, err := expected[i].MarshalJSON() + if err != nil { + t.Fatalf("error marshal expected frames %s", err) + } + + if !bytes.Equal(d, exd) { + t.Fatalf("unexpected metric %s want %s", d, exd) + } + } +} + +func TestDatasourceQueryRequestWithRetry(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { + t.Fatalf("should not be called") + }) + c := -1 + mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Fatalf("expected POST method got %s", r.Method) + } + c++ + switch c { + case 0: + _, err := w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`)) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + case 1: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 2: + _, err := w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "2"]}}`)) + if err != nil { + t.Fatalf("error write reposne: %s", err) + } + case 3: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 4: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + } + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + + ctx := context.Background() + settings := backend.DataSourceInstanceSettings{ + URL: srv.URL, + JSONData: []byte(`{"httpMethod":"POST","customQueryParameters":""}`), + } + + instance, err := NewDatasource(ctx, settings) + if err != nil { + t.Fatalf("unexpected %s", err) + } + + datasource := instance.(*Datasource) + + expErr := func(err string) { + rsp, gotErr := datasource.QueryData(ctx, &backend.QueryDataRequest{ + Queries: []backend.DataQuery{ + { + RefID: "A", + QueryType: instantQueryPath, + JSON: []byte(`{ + "refId": "A", + "instant": true, + "range": false, + "interval": "10s", + "intervalMs": 10000, + "timeInterval": "", + "expr": "sum(vm_http_request_total)", + "legendFormat": "__auto" +}`), + }, + }, + }) + + response := rsp.Responses["A"] + + if response.Error == nil { + t.Fatalf("expected %v got nil", err) + } + + if !strings.Contains(response.Error.Error(), err) { + t.Fatalf("expected err %q; got %q", err, gotErr) + } + } + + expValue := func(v float64) { + rsp, gotErr := datasource.QueryData(ctx, &backend.QueryDataRequest{ + Queries: []backend.DataQuery{ + { + RefID: "A", + QueryType: instantQueryPath, + JSON: []byte(`{ + "refId": "A", + "instant": true, + "range": false, + "interval": "10s", + "intervalMs": 10000, + "timeInterval": "", + "expr": "sum(vm_http_request_total)", + "legendFormat": "__auto" +}`), + }, + }, + }) + + response := rsp.Responses["A"] + if gotErr != nil { + t.Fatalf("unexpected %s", gotErr) + } + if response.Error != nil { + t.Fatalf("unexpected error: %s", response.Error.Error()) + } + if len(response.Frames) != 1 { + t.Fatalf("expected 1 frame got %d", len(response.Frames)) + } + for _, frame := range response.Frames { + if len(frame.Fields) != 2 { + t.Fatalf("expected 2 fields got %d", len(frame.Fields)) + } + if frame.Fields[1].At(0) != v { + t.Fatalf("unexpected value %v", frame.Fields[1].At(0)) + } + } + } + + expValue(1) // 0 + expValue(2) // 1 - fail, 2 - retry + expErr("EOF") // 3, 4 - retries +} diff --git a/pkg/plugin/response.go b/pkg/plugin/response.go index c08b77f..31a20c6 100644 --- a/pkg/plugin/response.go +++ b/pkg/plugin/response.go @@ -118,19 +118,19 @@ func (r *Response) getDataFrames() (data.Frames, error) { case vector: var pi promInstant if err := json.Unmarshal(r.Data.Result, &pi.Result); err != nil { - return nil, fmt.Errorf("umarshal err %s; \n %#v", err, string(r.Data.Result)) + return nil, fmt.Errorf("unmarshal err %s; \n %#v", err, string(r.Data.Result)) } return pi.dataframes() case matrix: var pr promRange if err := json.Unmarshal(r.Data.Result, &pr.Result); err != nil { - return nil, fmt.Errorf("umarshal err %s; \n %#v", err, string(r.Data.Result)) + return nil, fmt.Errorf("unmarshal err %s; \n %#v", err, string(r.Data.Result)) } return pr.dataframes() case scalar: var ps promScalar if err := json.Unmarshal(r.Data.Result, &ps); err != nil { - return nil, fmt.Errorf("umarshal err %s; \n %#v", err, string(r.Data.Result)) + return nil, fmt.Errorf("unmarshal err %s; \n %#v", err, string(r.Data.Result)) } return ps.dataframes() default: