Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent hashing: consistently retry another host #119

Merged
merged 4 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions pkg/consistent/consistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Package consistent implements consistent hashing for cache nodes.
package consistent

import (
"fmt"
"slices"

"github.com/dgryski/go-jump"
"github.com/mitchellh/hashstructure/v2"
)

type cacheKey struct {
Key any
Attempt int
}

// HashBucket returns a bucket from [0,buckets). If you want to implement a
// retry, you can pass previousBuckets, which indicates buckets which must be
// avoided in the output. HashBucket will modify the previousBuckets slice by
// sorting it.
func HashBucket(key any, buckets int, previousBuckets ...int) (int, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nice. I like the derived/wrapped key a lot.

if len(previousBuckets) >= buckets {
return -1, fmt.Errorf("No more buckets left: %d buckets available but %d already attempted", buckets, previousBuckets)
}
// we set IgnoreZeroValue so that we can add fields to the hash key
// later without breaking things.
// note that it's not safe to share a HashOptions so we create a fresh one each time.
hashopts := &hashstructure.HashOptions{IgnoreZeroValue: true}
hash, err := hashstructure.Hash(cacheKey{Key: key, Attempt: len(previousBuckets)}, hashstructure.FormatV2, hashopts)
if err != nil {
return -1, fmt.Errorf("error calculating hash of key: %w", err)
}

// jump is an implementation of Google's Jump Consistent Hash.
//
// See http://arxiv.org/abs/1406.2294 for details.
bucket := int(jump.Hash(hash, buckets-len(previousBuckets)))
slices.Sort(previousBuckets)
for _, prev := range previousBuckets {
if bucket >= prev {
bucket++
}
}
return bucket, nil
}
Comment on lines +21 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so nice to have it extracted!

73 changes: 73 additions & 0 deletions pkg/consistent/consistent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package consistent_test

import (
"slices"
"testing"

"github.com/replicate/pget/pkg/consistent"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHashingDoesNotChangeWhenZeroValueFieldsAreAdded(t *testing.T) {
a, err := consistent.HashBucket(struct{}{}, 1024)
require.NoError(t, err)
b, err := consistent.HashBucket(struct{ I int }{}, 1024)
require.NoError(t, err)

assert.Equal(t, a, b)
}

func TestRetriesScatterBuckets(t *testing.T) {
// This test is tricky! We want an example of hash keys which map to the
// same bucket, but after one retry map to different buckets.
//
// These two keys happen to have this property for 10 buckets:
strA := "abcdefg"
strB := "1234567"
a, err := consistent.HashBucket(strA, 10)
require.NoError(t, err)
b, err := consistent.HashBucket(strB, 10)
require.NoError(t, err)

// strA and strB to map to the same bucket
require.Equal(t, a, b)

aRetry, err := consistent.HashBucket(strA, 10, a)
require.NoError(t, err)
bRetry, err := consistent.HashBucket(strB, 10, b)
require.NoError(t, err)

// but after retry they map to different buckets
assert.NotEqual(t, aRetry, bRetry)
}

func FuzzRetriesMostNotRepeatIndices(f *testing.F) {
f.Add("test.replicate.delivery", 5)
f.Add("test.replicate.delivery", 0)
f.Fuzz(func(t *testing.T, key string, excessBuckets int) {
if excessBuckets < 0 {
t.Skip("invalid value")
}
attempts := 20
buckets := attempts + excessBuckets
if buckets < 0 {
t.Skip("integer overflow")
}
previous := []int{}
for i := 0; i < attempts; i++ {
next, err := consistent.HashBucket(key, buckets, previous...)
require.NoError(t, err)

// we must be in range
assert.Less(t, next, buckets)
assert.GreaterOrEqual(t, next, 0)

// we shouldn't repeat any previous value
assert.NotContains(t, previous, next)

previous = append(previous, next)
slices.Sort(previous)
}
})
}
69 changes: 6 additions & 63 deletions pkg/download/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
"net/url"
"strconv"

jump "github.com/dgryski/go-jump"
"github.com/mitchellh/hashstructure/v2"
"golang.org/x/sync/errgroup"

"github.com/replicate/pget/pkg/client"
"github.com/replicate/pget/pkg/config"
"github.com/replicate/pget/pkg/consistent"
"github.com/replicate/pget/pkg/logging"
)

Expand All @@ -32,7 +31,6 @@
type CacheKey struct {
URL *url.URL `hash:"string"`
Slice int64
Retry bool
}

func GetConsistentHashingMode(opts Options) (*ConsistentHashingMode, error) {
Expand Down Expand Up @@ -269,7 +267,7 @@
if err != nil {
return nil, fmt.Errorf("failed to download %s: %w", req.URL.String(), err)
}
err = m.retryConsistentHash(req, start, end, cachePodIndex)
cachePodIndex, err = m.consistentHashIfNeeded(req, start, end, cachePodIndex)

Check failure on line 270 in pkg/download/consistent_hashing.go

View workflow job for this annotation

GitHub Actions / Lint

ineffectual assignment to cachePodIndex (ineffassign)
if err != nil {
// return origErr so that we can use our regular fallback strategy
return nil, origErr
Expand All @@ -280,7 +278,7 @@
resp, err = m.Client.Do(req)
if err != nil {
// return origErr so that we can use our regular fallback strategy
return nil, err
return nil, origErr
}
} else {
return nil, fmt.Errorf("error executing request for %s: %w", req.URL.String(), err)
Expand All @@ -293,7 +291,7 @@
return resp, nil
}

func (m *ConsistentHashingMode) consistentHashIfNeeded(req *http.Request, start int64, end int64) (cachePodIndex int, err error) {
func (m *ConsistentHashingMode) consistentHashIfNeeded(req *http.Request, start int64, end int64, previousPodIndexes ...int) (cachePodIndex int, err error) {
logger := logging.GetLogger()
for _, host := range m.DomainsToCache {
if host == req.URL.Host {
Expand All @@ -303,23 +301,11 @@
slice := start / m.SliceSize

key := CacheKey{URL: req.URL, Slice: slice}
// we set IgnoreZeroValue so that we can add fields to the hash key
// later without breaking things.
// note that it's not safe to share a HashOptions so we create a fresh one each time.
hashopts := &hashstructure.HashOptions{IgnoreZeroValue: true}
var hash uint64
hash, err = hashstructure.Hash(key, hashstructure.FormatV2, hashopts)

cachePodIndex, err = consistent.HashBucket(key, len(m.CacheHosts), previousPodIndexes...)
if err != nil {
err = fmt.Errorf("error calculating hash of key: %w", err)
return
philandstuff marked this conversation as resolved.
Show resolved Hide resolved
}

logger.Debug().Uint64("hash_sum", hash).Int("len_cache_hosts", len(m.CacheHosts)).Msg("consistent hashing")

// jump is an implementation of Google's Jump Consistent Hash.
//
// See http://arxiv.org/abs/1406.2294 for details.
cachePodIndex = int(jump.Hash(hash, len(m.CacheHosts)))
cacheHost := m.CacheHosts[cachePodIndex]
logger.Debug().Str("cache_key", fmt.Sprintf("%+v", key)).Int64("start", start).Int64("end", end).Int64("slice_size", m.SliceSize).Int("bucket", cachePodIndex).Msg("consistent hashing")
if cacheHost != "" {
Expand All @@ -331,46 +317,3 @@
}
return
}

func (m *ConsistentHashingMode) retryConsistentHash(req *http.Request, start int64, end int64, originalIndex int) error {
if len(m.CacheHosts) == 1 {
return fmt.Errorf("Can't retry with only one cache host")
}
logger := logging.GetLogger()
for _, host := range m.DomainsToCache {
if host == req.URL.Host {
if start/m.SliceSize != end/m.SliceSize {
return fmt.Errorf("can't make a range request across a slice boundary: %d-%d straddles a slice boundary (slice size is %d)", start, end, m.SliceSize)
}
slice := start / m.SliceSize

key := CacheKey{URL: req.URL, Slice: slice, Retry: true}
// we set IgnoreZeroValue so that we can add fields to the hash key
// later without breaking things.
// note that it's not safe to share a HashOptions so we create a fresh one each time.
hashopts := &hashstructure.HashOptions{IgnoreZeroValue: true}
hash, err := hashstructure.Hash(key, hashstructure.FormatV2, hashopts)
if err != nil {
return fmt.Errorf("error calculating hash of key: %w", err)
}

logger.Debug().Uint64("hash_sum", hash).Int("len_cache_hosts", len(m.CacheHosts)).Msg("consistent hashing")

// jump is an implementation of Google's Jump Consistent Hash.
//
// See http://arxiv.org/abs/1406.2294 for details.

// we advance around the ring by somewhere between 1 and n-1
displacement := int(jump.Hash(hash, len(m.CacheHosts)-1)) + 1
cachePodIndex := (originalIndex + displacement) % len(m.CacheHosts)
cacheHost := m.CacheHosts[cachePodIndex]
logger.Debug().Str("cache_key", fmt.Sprintf("%+v", key)).Int64("start", start).Int64("end", end).Int64("slice_size", m.SliceSize).Int("bucket", cachePodIndex).Int("original_bucket", originalIndex).Msg("consistent hashing retry")
if cacheHost != "" {
req.URL.Scheme = "http"
req.URL.Host = cacheHost
}
return nil
}
}
return nil
}
Loading
Loading