Skip to content

Commit

Permalink
Fixed TestHighContentionFromStore test
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed May 20, 2024
1 parent e7937b8 commit 6abafb9
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 34 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ tidy:

.PHONY: validate
validate: tidy lint test bench
echo $$?
@echo
@echo "\033[32mEVERYTHING PASSED!\033[0m"

.PHONY: test
test: ## Run unit tests and measure code coverage
Expand Down
34 changes: 23 additions & 11 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package gubernator_test

import (
"context"
"fmt"
"runtime"
"testing"

guber "github.com/gubernator-io/gubernator/v3"
"github.com/gubernator-io/gubernator/v3/cluster"
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/syncutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -57,9 +58,9 @@ func BenchmarkServer(b *testing.B) {
require.NoError(b, err, "Error in conf.SetDefaults")
createdAt := epochMillis(clock.Now())
d := cluster.GetRandomDaemon(cluster.DataCenterNone)
client := d.MustClient().(guber.PeerClient)

b.Run("GetPeerRateLimit", func(b *testing.B) {
client := d.MustClient().(guber.PeerClient)
b.ResetTimer()

for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -146,15 +147,27 @@ func BenchmarkServer(b *testing.B) {
})

b.Run("Thundering herd", func(b *testing.B) {
client := cluster.GetRandomDaemon(cluster.DataCenterNone).MustClient()
require.NoError(b, err, "Error in guber.DialV1Server")
var clients []guber.Client

// Create a client for each CPU on the system. This should allow us to simulate the
// maximum contention possible for this system.
for i := 0; i < runtime.NumCPU(); i++ {
client, err := guber.NewClient(guber.WithNoTLS(d.Listener.Addr().String()))
require.NoError(b, err)
clients = append(clients, client)
}
b.ResetTimer()
fan := syncutil.NewFanOut(100)
mask := len(clients) - 1

for n := 0; n < b.N; n++ {
fan.Run(func(o interface{}) error {
var idx int
b.RunParallel(func(pb *testing.PB) {
client := clients[idx&mask]
idx++

for pb.Next() {
var resp guber.CheckRateLimitsResponse
err := client.CheckRateLimits(ctx, &guber.CheckRateLimitsRequest{
err = client.CheckRateLimits(ctx, &guber.CheckRateLimitsRequest{
Requests: []*guber.RateLimitRequest{
{
Name: b.Name(),
Expand All @@ -166,10 +179,9 @@ func BenchmarkServer(b *testing.B) {
},
}, &resp)
if err != nil {
b.Errorf("Error in client.GetRateLimits: %s", err)
fmt.Printf("%s\n", err.Error())
}
return nil
}, nil)
}
}
})
})
}
26 changes: 13 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ func NewPeerClient(opts ClientOptions) PeerClient {
func (c *client) CheckRateLimits(ctx context.Context, req *CheckRateLimitsRequest, resp *CheckRateLimitsResponse) error {
payload, err := proto.Marshal(req)
if err != nil {
return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil)
return duh.NewClientError("while marshaling request payload: %w", err, nil)
}

r, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s%s", c.opts.Endpoint, RPCRateLimitCheck), bytes.NewReader(payload))
if err != nil {
return duh.NewClientError(err, nil)
return duh.NewClientError("", err, nil)
}

r.Header.Set("Content-Type", duh.ContentTypeProtoBuf)
Expand All @@ -109,13 +109,13 @@ func (c *client) CheckRateLimits(ctx context.Context, req *CheckRateLimitsReques
func (c *client) HealthCheck(ctx context.Context, resp *HealthCheckResponse) error {
payload, err := proto.Marshal(&HealthCheckRequest{})
if err != nil {
return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil)
return duh.NewClientError("while marshaling request payload: %w", err, nil)
}

r, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s%s", c.opts.Endpoint, RPCHealthCheck), bytes.NewReader(payload))
if err != nil {
return duh.NewClientError(err, nil)
return duh.NewClientError("", err, nil)
}

r.Header.Set("Content-Type", duh.ContentTypeProtoBuf)
Expand All @@ -125,13 +125,13 @@ func (c *client) HealthCheck(ctx context.Context, resp *HealthCheckResponse) err
func (c *client) Forward(ctx context.Context, req *ForwardRequest, resp *ForwardResponse) error {
payload, err := proto.Marshal(req)
if err != nil {
return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil)
return duh.NewClientError("while marshaling request payload: %w", err, nil)
}

r, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s%s", c.opts.Endpoint, RPCPeerForward), bytes.NewReader(payload))
if err != nil {
return duh.NewClientError(err, nil)
return duh.NewClientError("", err, nil)
}

c.prop.Inject(ctx, propagation.HeaderCarrier(r.Header))
Expand All @@ -142,12 +142,12 @@ func (c *client) Forward(ctx context.Context, req *ForwardRequest, resp *Forward
func (c *client) Update(ctx context.Context, req *UpdateRequest) error {
payload, err := proto.Marshal(req)
if err != nil {
return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil)
return duh.NewClientError("while marshaling request payload: %w", err, nil)
}
r, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s%s", c.opts.Endpoint, RPCPeerUpdate), bytes.NewReader(payload))
if err != nil {
return duh.NewClientError(err, nil)
return duh.NewClientError("", err, nil)
}

r.Header.Set("Content-Type", duh.ContentTypeProtoBuf)
Expand Down Expand Up @@ -227,10 +227,10 @@ func RandomPeer(peers []PeerInfo) PeerInfo {
// RandomString returns a random alpha string of 'n' length
func RandomString(n int) string {
const alphanumeric = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
var bytes = make([]byte, n)
_, _ = crand.Read(bytes)
for i, b := range bytes {
bytes[i] = alphanumeric[b%byte(len(alphanumeric))]
var buf = make([]byte, n)
_, _ = crand.Read(buf)
for i, b := range buf {
buf[i] = alphanumeric[b%byte(len(alphanumeric))]
}
return string(bytes)
return string(buf)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

require (
github.com/davecgh/go-spew v1.1.1
github.com/duh-rpc/duh-go v0.0.2-0.20230929155108-5d641b0c008a
github.com/duh-rpc/duh-go v0.1.0
github.com/hashicorp/memberlist v0.5.0
github.com/mailgun/errors v0.1.5
github.com/mailgun/holster/v4 v4.19.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ func (s *Service) CheckRateLimits(ctx context.Context, req *CheckRateLimitsReque
if len(req.Requests) > maxBatchSize {
metricCheckErrorCounter.WithLabelValues("Request too large").Add(1)
return duh.NewServiceError(duh.CodeBadRequest,
fmt.Errorf("CheckRateLimitsRequest.RateLimits list too large; max size is '%d'", maxBatchSize), nil)
fmt.Sprintf("CheckRateLimitsRequest.RateLimits list too large; max size is '%d'", maxBatchSize), nil, nil)
}

if len(req.Requests) == 0 {
return duh.NewServiceError(duh.CodeBadRequest,
errors.New("CheckRateLimitsRequest.RateLimits list is empty; provide at least one rate limit"), nil)
"CheckRateLimitsRequest.RateLimits list is empty; provide at least one rate limit", nil, nil)
}

resp.Responses = make([]*RateLimitResponse, len(req.Requests))
Expand Down Expand Up @@ -471,7 +471,7 @@ func (s *Service) Forward(ctx context.Context, req *ForwardRequest, resp *Forwar
if len(req.Requests) > maxBatchSize {
metricCheckErrorCounter.WithLabelValues("Request too large").Add(1)
return duh.NewServiceError(duh.CodeBadRequest,
fmt.Errorf("'Forward.requests' list too large; max size is '%d'", maxBatchSize), nil)
fmt.Sprintf("'Forward.requests' list too large; max size is '%d'", maxBatchSize), nil, nil)
}

// Invoke each rate limit request.
Expand Down
13 changes: 10 additions & 3 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gubernator_test

import (
"context"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -110,7 +111,7 @@ func (ms *NoOpStore) Get(ctx context.Context, r *gubernator.RateLimitRequest) (*
func TestHighContentionFromStore(t *testing.T) {
const (
// Increase these number to improve the chance of contention, but at the cost of test speed.
numGoroutines = 500
numGoroutines = 150
numKeys = 100
)
store := &NoOpStore{}
Expand All @@ -132,10 +133,12 @@ func TestHighContentionFromStore(t *testing.T) {
var ready sync.WaitGroup
wg.Add(numGoroutines)
ready.Add(numGoroutines)
client := d.MustClient()

for i := 0; i < numGoroutines; i++ {
go func() {
// Create a client for each concurrent request to avoid contention in the client
client, err := gubernator.NewClient(gubernator.WithNoTLS(d.Listener.Addr().String()))
require.NoError(t, err)
ready.Wait()
for idx := 0; idx < numKeys; idx++ {
var resp gubernator.CheckRateLimitsResponse
Expand All @@ -151,7 +154,11 @@ func TestHighContentionFromStore(t *testing.T) {
},
},
}, &resp)
require.NoError(t, err)
if err != nil {
// NOTE: you may see `connection reset by peer` if the server is overloaded
// and needs to forcibly drop some connections due to out of open file handlers etc...
fmt.Printf("%s\n", err)
}
}
wg.Done()
}()
Expand Down

0 comments on commit 6abafb9

Please sign in to comment.