Skip to content

Commit

Permalink
Implement Fallback strategy for consistent-hashing
Browse files Browse the repository at this point in the history
This does the first pass of implementing the fallback strategy for
consistent hashing. This is not configurable and statically falls
back to "buffer" mode.
  • Loading branch information
tempusfrangit committed Dec 8, 2023
1 parent bb048d6 commit 802fa46
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
17 changes: 14 additions & 3 deletions pkg/download/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
type ConsistentHashingMode struct {
Client *client.HTTPClient
Options
// TODO: allow this to be configured and not just "BufferMode"
FallbackStrategy Strategy
}

func GetConsistentHashingMode(opts Options) (Strategy, error) {
Expand All @@ -31,9 +33,13 @@ func GetConsistentHashingMode(opts Options) (Strategy, error) {
return nil, fmt.Errorf("if you provide a semaphore you must specify MaxConcurrency")
}
client := client.NewHTTPClient(opts.Client)
fallbackStrategy := GetBufferMode(opts)
fallbackStrategy.Client = client

return &ConsistentHashingMode{
Client: client,
Options: opts,
Client: client,
Options: opts,
FallbackStrategy: fallbackStrategy,
}, nil
}

Expand Down Expand Up @@ -78,8 +84,13 @@ func (m *ConsistentHashingMode) Fetch(ctx context.Context, urlString string) (io
break
}
}
// Use our fallback mode if we're not downloading from a consistent-hashing enabled domain
if !shouldContinue {
return nil, -1, fmt.Errorf("ConsistentHashingMode not implemented for domains outside DomainsToCache")
logger.Debug().
Str("url", urlString).
Str("reason", fmt.Sprintf("consistent hashing not enabled for %s", parsed.Host)).
Msg("fallback strategy")
return m.FallbackStrategy.Fetch(ctx, urlString)
}

firstChunkResp, err := m.doRequest(ctx, 0, m.minChunkSize()-1, urlString)
Expand Down
32 changes: 31 additions & 1 deletion pkg/download/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ var testFSes = []fstest.MapFS{

func makeConsistentHashingMode(opts download.Options) *download.ConsistentHashingMode {
client := client.NewHTTPClient(opts.Client)
fallbackMode := download.BufferMode{Options: opts, Client: client}

return &download.ConsistentHashingMode{Client: client, Options: opts}
return &download.ConsistentHashingMode{Client: client, Options: opts, FallbackStrategy: &fallbackMode}
}

type chTestCase struct {
Expand Down Expand Up @@ -184,3 +185,32 @@ func TestConsistentHashing(t *testing.T) {
assert.Equal(t, tc.expectedOutput, string(bytes))
}
}

func TestConsistentHashingHasFallback(t *testing.T) {
server := httptest.NewServer(http.FileServer(http.FS(testFSes[0])))
defer server.Close()

opts := download.Options{
Client: client.Options{},
MaxConcurrency: 8,
MinChunkSize: 2,
Semaphore: semaphore.NewWeighted(8),
CacheHosts: []string{},
DomainsToCache: []string{"fake.replicate.delivery"},
SliceSize: 3,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

strategy := makeConsistentHashingMode(opts)

urlString, err := url.JoinPath(server.URL, "hello.txt")
assert.NoError(t, err)
reader, _, err := strategy.Fetch(ctx, urlString)
assert.NoError(t, err)
bytes, err := io.ReadAll(reader)
assert.NoError(t, err)

assert.Equal(t, "0000000000000000", string(bytes))
}

0 comments on commit 802fa46

Please sign in to comment.