diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index 12fea874e311..2b5458af6af5 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -21,12 +21,15 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" awscommon "github.com/grafana/dskit/aws" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/instrument" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + amnet "k8s.io/apimachinery/pkg/util/net" + bucket_s3 "github.com/grafana/loki/v3/pkg/storage/bucket/s3" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" @@ -532,5 +535,61 @@ func (a *S3ObjectClient) IsObjectNotFoundErr(err error) bool { return false } -// TODO(dannyk): implement for client -func (a *S3ObjectClient) IsRetryableErr(error) bool { return false } +func isTimeoutError(err error) bool { + var netErr net.Error + return errors.As(err, &netErr) && netErr.Timeout() +} + +func isContextErr(err error) bool { + return errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, context.Canceled) +} + +// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts. +func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool { + // TODO(dannyk): move these out to be generic + // context errors are all client-side + if isContextErr(err) { + return false + } + + // connection misconfiguration, or writing on a closed connection + // do NOT retry; this is not a server-side issue + if errors.Is(err, net.ErrClosed) || amnet.IsConnectionRefused(err) { + return false + } + + // this is a server-side timeout + if isTimeoutError(err) { + return true + } + + // connection closed (closed before established) or reset (closed after established) + // this is a server-side issue + if errors.Is(err, io.EOF) || amnet.IsConnectionReset(err) { + return true + } + + if rerr, ok := err.(awserr.RequestFailure); ok { + // https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html + return rerr.StatusCode() == http.StatusRequestTimeout || + rerr.StatusCode() == http.StatusGatewayTimeout + } + + return false +} + +// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling. +func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool { + if rerr, ok := err.(awserr.RequestFailure); ok { + + // https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html + return rerr.StatusCode() == http.StatusTooManyRequests || + (rerr.StatusCode()/100 == 5) // all 5xx errors are retryable + } + + return false +} +func (a *S3ObjectClient) IsRetryableErr(err error) bool { + return a.IsStorageTimeoutErr(err) || a.IsStorageThrottledErr(err) +} diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index 3a2c1e8dc33c..ba2939ff4688 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -6,9 +6,11 @@ import ( "errors" "fmt" "io" + "net" "net/http" "net/http/httptest" "strings" + "syscall" "testing" "time" @@ -73,6 +75,108 @@ func TestIsObjectNotFoundErr(t *testing.T) { } } +func TestIsRetryableErr(t *testing.T) { + tests := []struct { + err error + expected bool + name string + }{ + { + name: "IsStorageThrottledErr - Too Many Requests", + err: awserr.NewRequestFailure( + awserr.New("TooManyRequests", "TooManyRequests", nil), 429, "reqId", + ), + expected: true, + }, + { + name: "IsStorageThrottledErr - 500", + err: awserr.NewRequestFailure( + awserr.New("500", "500", nil), 500, "reqId", + ), + expected: true, + }, + { + name: "IsStorageThrottledErr - 5xx", + err: awserr.NewRequestFailure( + awserr.New("501", "501", nil), 501, "reqId", + ), + expected: true, + }, + { + name: "IsStorageTimeoutErr - Request Timeout", + err: awserr.NewRequestFailure( + awserr.New("Request Timeout", "Request Timeout", nil), 408, "reqId", + ), + expected: true, + }, + { + name: "IsStorageTimeoutErr - Gateway Timeout", + err: awserr.NewRequestFailure( + awserr.New("Gateway Timeout", "Gateway Timeout", nil), 504, "reqId", + ), + expected: true, + }, + { + name: "IsStorageTimeoutErr - EOF", + err: io.EOF, + expected: true, + }, + { + name: "IsStorageTimeoutErr - Connection Reset", + err: syscall.ECONNRESET, + expected: true, + }, + { + name: "IsStorageTimeoutErr - Timeout Error", + err: awserr.NewRequestFailure( + awserr.New("RequestCanceled", "request canceled due to timeout", nil), 408, "request-id", + ), + expected: true, + }, + { + name: "IsStorageTimeoutErr - Closed", + err: net.ErrClosed, + expected: false, + }, + { + name: "IsStorageTimeoutErr - Connection Refused", + err: syscall.ECONNREFUSED, + expected: false, + }, + { + name: "IsStorageTimeoutErr - Context Deadline Exceeded", + err: context.DeadlineExceeded, + expected: false, + }, + { + name: "IsStorageTimeoutErr - Context Canceled", + err: context.Canceled, + expected: false, + }, + { + name: "Not a retryable error", + err: syscall.EINVAL, + expected: false, + }, + { + name: "Not found 404", + err: awserr.NewRequestFailure( + awserr.New("404", "404", nil), 404, "reqId", + ), + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewS3ObjectClient(S3Config{BucketNames: "mybucket"}, hedging.Config{}) + require.NoError(t, err) + + require.Equal(t, tt.expected, client.IsRetryableErr(tt.err)) + }) + } +} + func TestRequestMiddleware(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, r.Header.Get("echo-me"))