From 802fa4674550b1b512654895016ad7ec1a078513 Mon Sep 17 00:00:00 2001 From: Morgan Fainberg Date: Fri, 8 Dec 2023 14:36:19 -0800 Subject: [PATCH] Implement Fallback strategy for consistent-hashing This does the first pass of implementing the fallback strategy for consistent hashing. This is not configurable and statically falls back to "buffer" mode. --- pkg/download/consistent_hashing.go | 17 ++++++++++--- pkg/download/consistent_hashing_test.go | 32 ++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/pkg/download/consistent_hashing.go b/pkg/download/consistent_hashing.go index 362021d..add7f33 100644 --- a/pkg/download/consistent_hashing.go +++ b/pkg/download/consistent_hashing.go @@ -21,6 +21,8 @@ import ( type ConsistentHashingMode struct { Client *client.HTTPClient Options + // TODO: allow this to be configured and not just "BufferMode" + FallbackStrategy Strategy } func GetConsistentHashingMode(opts Options) (Strategy, error) { @@ -31,9 +33,13 @@ func GetConsistentHashingMode(opts Options) (Strategy, error) { return nil, fmt.Errorf("if you provide a semaphore you must specify MaxConcurrency") } client := client.NewHTTPClient(opts.Client) + fallbackStrategy := GetBufferMode(opts) + fallbackStrategy.Client = client + return &ConsistentHashingMode{ - Client: client, - Options: opts, + Client: client, + Options: opts, + FallbackStrategy: fallbackStrategy, }, nil } @@ -78,8 +84,13 @@ func (m *ConsistentHashingMode) Fetch(ctx context.Context, urlString string) (io break } } + // Use our fallback mode if we're not downloading from a consistent-hashing enabled domain if !shouldContinue { - return nil, -1, fmt.Errorf("ConsistentHashingMode not implemented for domains outside DomainsToCache") + logger.Debug(). + Str("url", urlString). + Str("reason", fmt.Sprintf("consistent hashing not enabled for %s", parsed.Host)). + Msg("fallback strategy") + return m.FallbackStrategy.Fetch(ctx, urlString) } firstChunkResp, err := m.doRequest(ctx, 0, m.minChunkSize()-1, urlString) diff --git a/pkg/download/consistent_hashing_test.go b/pkg/download/consistent_hashing_test.go index 6f01a68..f1e35da 100644 --- a/pkg/download/consistent_hashing_test.go +++ b/pkg/download/consistent_hashing_test.go @@ -30,8 +30,9 @@ var testFSes = []fstest.MapFS{ func makeConsistentHashingMode(opts download.Options) *download.ConsistentHashingMode { client := client.NewHTTPClient(opts.Client) + fallbackMode := download.BufferMode{Options: opts, Client: client} - return &download.ConsistentHashingMode{Client: client, Options: opts} + return &download.ConsistentHashingMode{Client: client, Options: opts, FallbackStrategy: &fallbackMode} } type chTestCase struct { @@ -184,3 +185,32 @@ func TestConsistentHashing(t *testing.T) { assert.Equal(t, tc.expectedOutput, string(bytes)) } } + +func TestConsistentHashingHasFallback(t *testing.T) { + server := httptest.NewServer(http.FileServer(http.FS(testFSes[0]))) + defer server.Close() + + opts := download.Options{ + Client: client.Options{}, + MaxConcurrency: 8, + MinChunkSize: 2, + Semaphore: semaphore.NewWeighted(8), + CacheHosts: []string{}, + DomainsToCache: []string{"fake.replicate.delivery"}, + SliceSize: 3, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + strategy := makeConsistentHashingMode(opts) + + urlString, err := url.JoinPath(server.URL, "hello.txt") + assert.NoError(t, err) + reader, _, err := strategy.Fetch(ctx, urlString) + assert.NoError(t, err) + bytes, err := io.ReadAll(reader) + assert.NoError(t, err) + + assert.Equal(t, "0000000000000000", string(bytes)) +}