From 6abafb94c3f47f233d2a9fa77e3e15e2466508a5 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 20 May 2024 12:08:21 -0500 Subject: [PATCH] Fixed TestHighContentionFromStore test --- Makefile | 3 ++- benchmark_test.go | 34 +++++++++++++++++++++++----------- client.go | 26 +++++++++++++------------- go.mod | 2 +- go.sum | 4 ++-- gubernator.go | 6 +++--- store_test.go | 13 ++++++++++--- 7 files changed, 54 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index b913e39..a2ef134 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,8 @@ tidy: .PHONY: validate validate: tidy lint test bench - echo $$? + @echo + @echo "\033[32mEVERYTHING PASSED!\033[0m" .PHONY: test test: ## Run unit tests and measure code coverage diff --git a/benchmark_test.go b/benchmark_test.go index 6280349..4cac6dc 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -18,12 +18,13 @@ package gubernator_test import ( "context" + "fmt" + "runtime" "testing" guber "github.com/gubernator-io/gubernator/v3" "github.com/gubernator-io/gubernator/v3/cluster" "github.com/mailgun/holster/v4/clock" - "github.com/mailgun/holster/v4/syncutil" "github.com/stretchr/testify/require" ) @@ -57,9 +58,9 @@ func BenchmarkServer(b *testing.B) { require.NoError(b, err, "Error in conf.SetDefaults") createdAt := epochMillis(clock.Now()) d := cluster.GetRandomDaemon(cluster.DataCenterNone) - client := d.MustClient().(guber.PeerClient) b.Run("GetPeerRateLimit", func(b *testing.B) { + client := d.MustClient().(guber.PeerClient) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -146,15 +147,27 @@ func BenchmarkServer(b *testing.B) { }) b.Run("Thundering herd", func(b *testing.B) { - client := cluster.GetRandomDaemon(cluster.DataCenterNone).MustClient() require.NoError(b, err, "Error in guber.DialV1Server") + var clients []guber.Client + + // Create a client for each CPU on the system. This should allow us to simulate the + // maximum contention possible for this system. + for i := 0; i < runtime.NumCPU(); i++ { + client, err := guber.NewClient(guber.WithNoTLS(d.Listener.Addr().String())) + require.NoError(b, err) + clients = append(clients, client) + } b.ResetTimer() - fan := syncutil.NewFanOut(100) + mask := len(clients) - 1 - for n := 0; n < b.N; n++ { - fan.Run(func(o interface{}) error { + var idx int + b.RunParallel(func(pb *testing.PB) { + client := clients[idx&mask] + idx++ + + for pb.Next() { var resp guber.CheckRateLimitsResponse - err := client.CheckRateLimits(ctx, &guber.CheckRateLimitsRequest{ + err = client.CheckRateLimits(ctx, &guber.CheckRateLimitsRequest{ Requests: []*guber.RateLimitRequest{ { Name: b.Name(), @@ -166,10 +179,9 @@ func BenchmarkServer(b *testing.B) { }, }, &resp) if err != nil { - b.Errorf("Error in client.GetRateLimits: %s", err) + fmt.Printf("%s\n", err.Error()) } - return nil - }, nil) - } + } + }) }) } diff --git a/client.go b/client.go index ca58a8d..c62fe86 100644 --- a/client.go +++ b/client.go @@ -93,13 +93,13 @@ func NewPeerClient(opts ClientOptions) PeerClient { func (c *client) CheckRateLimits(ctx context.Context, req *CheckRateLimitsRequest, resp *CheckRateLimitsResponse) error { payload, err := proto.Marshal(req) if err != nil { - return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil) + return duh.NewClientError("while marshaling request payload: %w", err, nil) } r, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s%s", c.opts.Endpoint, RPCRateLimitCheck), bytes.NewReader(payload)) if err != nil { - return duh.NewClientError(err, nil) + return duh.NewClientError("", err, nil) } r.Header.Set("Content-Type", duh.ContentTypeProtoBuf) @@ -109,13 +109,13 @@ func (c *client) CheckRateLimits(ctx context.Context, req *CheckRateLimitsReques func (c *client) HealthCheck(ctx context.Context, resp *HealthCheckResponse) error { payload, err := proto.Marshal(&HealthCheckRequest{}) if err != nil { - return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil) + return duh.NewClientError("while marshaling request payload: %w", err, nil) } r, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s%s", c.opts.Endpoint, RPCHealthCheck), bytes.NewReader(payload)) if err != nil { - return duh.NewClientError(err, nil) + return duh.NewClientError("", err, nil) } r.Header.Set("Content-Type", duh.ContentTypeProtoBuf) @@ -125,13 +125,13 @@ func (c *client) HealthCheck(ctx context.Context, resp *HealthCheckResponse) err func (c *client) Forward(ctx context.Context, req *ForwardRequest, resp *ForwardResponse) error { payload, err := proto.Marshal(req) if err != nil { - return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil) + return duh.NewClientError("while marshaling request payload: %w", err, nil) } r, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s%s", c.opts.Endpoint, RPCPeerForward), bytes.NewReader(payload)) if err != nil { - return duh.NewClientError(err, nil) + return duh.NewClientError("", err, nil) } c.prop.Inject(ctx, propagation.HeaderCarrier(r.Header)) @@ -142,12 +142,12 @@ func (c *client) Forward(ctx context.Context, req *ForwardRequest, resp *Forward func (c *client) Update(ctx context.Context, req *UpdateRequest) error { payload, err := proto.Marshal(req) if err != nil { - return duh.NewClientError(fmt.Errorf("while marshaling request payload: %w", err), nil) + return duh.NewClientError("while marshaling request payload: %w", err, nil) } r, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s%s", c.opts.Endpoint, RPCPeerUpdate), bytes.NewReader(payload)) if err != nil { - return duh.NewClientError(err, nil) + return duh.NewClientError("", err, nil) } r.Header.Set("Content-Type", duh.ContentTypeProtoBuf) @@ -227,10 +227,10 @@ func RandomPeer(peers []PeerInfo) PeerInfo { // RandomString returns a random alpha string of 'n' length func RandomString(n int) string { const alphanumeric = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" - var bytes = make([]byte, n) - _, _ = crand.Read(bytes) - for i, b := range bytes { - bytes[i] = alphanumeric[b%byte(len(alphanumeric))] + var buf = make([]byte, n) + _, _ = crand.Read(buf) + for i, b := range buf { + buf[i] = alphanumeric[b%byte(len(alphanumeric))] } - return string(bytes) + return string(buf) } diff --git a/go.mod b/go.mod index 3af05ad..c179407 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.0 require ( github.com/davecgh/go-spew v1.1.1 - github.com/duh-rpc/duh-go v0.0.2-0.20230929155108-5d641b0c008a + github.com/duh-rpc/duh-go v0.1.0 github.com/hashicorp/memberlist v0.5.0 github.com/mailgun/errors v0.1.5 github.com/mailgun/holster/v4 v4.19.0 diff --git a/go.sum b/go.sum index 1106c21..4f94803 100644 --- a/go.sum +++ b/go.sum @@ -106,8 +106,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ= github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4= -github.com/duh-rpc/duh-go v0.0.2-0.20230929155108-5d641b0c008a h1:v/NQEfHHOY/huFECKxKZnEkY5jVD8Yix8TPa0FjgKbg= -github.com/duh-rpc/duh-go v0.0.2-0.20230929155108-5d641b0c008a/go.mod h1:OoCoGsZkeED84v8TAE86m2NM5ZfNLNlqUUm7tYO+h+k= +github.com/duh-rpc/duh-go v0.1.0 h1:Ym7XvNhl1CD6dgy+YWiPfhkOQGNzFsBsIc5uvYdF08c= +github.com/duh-rpc/duh-go v0.1.0/go.mod h1:OoCoGsZkeED84v8TAE86m2NM5ZfNLNlqUUm7tYO+h+k= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/gubernator.go b/gubernator.go index 1b24418..f26fa2b 100644 --- a/gubernator.go +++ b/gubernator.go @@ -176,12 +176,12 @@ func (s *Service) CheckRateLimits(ctx context.Context, req *CheckRateLimitsReque if len(req.Requests) > maxBatchSize { metricCheckErrorCounter.WithLabelValues("Request too large").Add(1) return duh.NewServiceError(duh.CodeBadRequest, - fmt.Errorf("CheckRateLimitsRequest.RateLimits list too large; max size is '%d'", maxBatchSize), nil) + fmt.Sprintf("CheckRateLimitsRequest.RateLimits list too large; max size is '%d'", maxBatchSize), nil, nil) } if len(req.Requests) == 0 { return duh.NewServiceError(duh.CodeBadRequest, - errors.New("CheckRateLimitsRequest.RateLimits list is empty; provide at least one rate limit"), nil) + "CheckRateLimitsRequest.RateLimits list is empty; provide at least one rate limit", nil, nil) } resp.Responses = make([]*RateLimitResponse, len(req.Requests)) @@ -471,7 +471,7 @@ func (s *Service) Forward(ctx context.Context, req *ForwardRequest, resp *Forwar if len(req.Requests) > maxBatchSize { metricCheckErrorCounter.WithLabelValues("Request too large").Add(1) return duh.NewServiceError(duh.CodeBadRequest, - fmt.Errorf("'Forward.requests' list too large; max size is '%d'", maxBatchSize), nil) + fmt.Sprintf("'Forward.requests' list too large; max size is '%d'", maxBatchSize), nil, nil) } // Invoke each rate limit request. diff --git a/store_test.go b/store_test.go index b111cc9..9685f54 100644 --- a/store_test.go +++ b/store_test.go @@ -18,6 +18,7 @@ package gubernator_test import ( "context" + "fmt" "sync" "testing" @@ -110,7 +111,7 @@ func (ms *NoOpStore) Get(ctx context.Context, r *gubernator.RateLimitRequest) (* func TestHighContentionFromStore(t *testing.T) { const ( // Increase these number to improve the chance of contention, but at the cost of test speed. - numGoroutines = 500 + numGoroutines = 150 numKeys = 100 ) store := &NoOpStore{} @@ -132,10 +133,12 @@ func TestHighContentionFromStore(t *testing.T) { var ready sync.WaitGroup wg.Add(numGoroutines) ready.Add(numGoroutines) - client := d.MustClient() for i := 0; i < numGoroutines; i++ { go func() { + // Create a client for each concurrent request to avoid contention in the client + client, err := gubernator.NewClient(gubernator.WithNoTLS(d.Listener.Addr().String())) + require.NoError(t, err) ready.Wait() for idx := 0; idx < numKeys; idx++ { var resp gubernator.CheckRateLimitsResponse @@ -151,7 +154,11 @@ func TestHighContentionFromStore(t *testing.T) { }, }, }, &resp) - require.NoError(t, err) + if err != nil { + // NOTE: you may see `connection reset by peer` if the server is overloaded + // and needs to forcibly drop some connections due to out of open file handlers etc... + fmt.Printf("%s\n", err) + } } wg.Done() }()