Skip to content

Commit

Permalink
fix: mutex deadlocks in PeerClient (#223)
Browse files Browse the repository at this point in the history
* fix race issuers in PeerClient

* undo changes in comments

* more fixes

* final fixes

* do not create 1 connection every time

* add comment

* remove comment

* fix lint

* add ctx back to shutdown

* move conn.Close out of goroutine

* fix lint

* oops

* make it blocking

* put cap of 1 second to NewPeerClient connect

* undo put cap of 1 second to NewPeerClient connect

* fix another leak

* on peer shutdown, clear all errors

* add time.sleep to end of TestHealthCheck

* use testutil.UntilPass

* check health of every instance

* use cluster.GetAllPeers
  • Loading branch information
miparnisari authored Feb 28, 2024
1 parent f740f2b commit 63fd9ff
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 203 deletions.
5 changes: 4 additions & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ func BenchmarkServer(b *testing.B) {
require.NoError(b, err, "Error in conf.SetDefaults")

b.Run("GetPeerRateLimit() with no batching", func(b *testing.B) {
client := guber.NewPeerClient(guber.PeerConfig{
client, err := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
Behavior: conf.Behaviors,
})
if err != nil {
b.Errorf("Error building client: %s", err)
}

b.ResetTimer()

Expand Down
14 changes: 12 additions & 2 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
json "google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -1618,6 +1617,16 @@ func TestHealthCheck(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*15)
defer cancel()
require.NoError(t, cluster.Restart(ctx))

// wait for every peer instance to come back online
for _, peer := range cluster.GetPeers() {
peerClient, err := guber.DialV1Server(peer.GRPCAddress, nil)
require.NoError(t, err)
testutil.UntilPass(t, 10, clock.Millisecond*300, func(t testutil.TestingT) {
healthResp, err = peerClient.HealthCheck(context.Background(), &guber.HealthCheckReq{})
assert.Equal(t, "healthy", healthResp.GetStatus())
})
}
}

func TestLeakyBucketDivBug(t *testing.T) {
Expand Down Expand Up @@ -1723,9 +1732,10 @@ func TestGRPCGateway(t *testing.T) {

func TestGetPeerRateLimits(t *testing.T) {
ctx := context.Background()
peerClient := guber.NewPeerClient(guber.PeerConfig{
peerClient, err := guber.NewPeerClient(guber.PeerConfig{
Info: cluster.GetRandomPeer(cluster.DataCenterNone),
})
require.NoError(t, err)

t.Run("Stable rate check request order", func(t *testing.T) {
// Ensure response order matches rate check request order.
Expand Down
11 changes: 8 additions & 3 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

"github.com/mailgun/holster/v4/syncutil"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -31,7 +32,7 @@ type globalManager struct {
wg syncutil.WaitGroup
conf BehaviorConfig
log FieldLogger
instance *V1Instance // todo circular import? V1Instance also holds a reference to globalManager
instance *V1Instance // TODO circular import? V1Instance also holds a reference to globalManager
metricGlobalSendDuration prometheus.Summary
metricBroadcastDuration prometheus.Summary
metricBroadcastCounter *prometheus.CounterVec
Expand Down Expand Up @@ -249,8 +250,8 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
// Only log if it's an unknown error
if !errors.Is(err, context.Canceled) && errors.Is(err, context.DeadlineExceeded) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
}
}
Expand All @@ -260,6 +261,10 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
fan.Wait()
}

// Close stops all goroutines and shuts down all the peers.
func (gm *globalManager) Close() {
gm.wg.Stop()
for _, peer := range gm.instance.GetPeerList() {
_ = peer.Shutdown(context.Background())
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
go.opentelemetry.io/otel/trace v1.21.0
go.uber.org/goleak v1.3.0
golang.org/x/net v0.18.0
golang.org/x/sync v0.3.0
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b
google.golang.org/grpc v1.59.0
Expand Down
1 change: 1 addition & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 17 additions & 6 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) {
// Make an RPC call to the peer that owns this rate limit
r, err := req.Peer.GetPeerRateLimit(ctx, req.Req)
if err != nil {
if IsNotReady(err) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
attempts++
metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc()
req.Peer, err = s.GetPeer(ctx, req.Key)
Expand Down Expand Up @@ -528,7 +528,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health
localPeers := s.conf.LocalPicker.Peers()
for _, peer := range localPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("Error returned from local peer.GetLastErr: %s", errMsg)
err := fmt.Errorf("error returned from local peer.GetLastErr: %s", errMsg)
span.RecordError(err)
errs = append(errs, err.Error())
}
Expand All @@ -538,7 +538,7 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health
regionPeers := s.conf.RegionPicker.Peers()
for _, peer := range regionPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("Error returned from region peer.GetLastErr: %s", errMsg)
err := fmt.Errorf("error returned from region peer.GetLastErr: %s", errMsg)
span.RecordError(err)
errs = append(errs, err.Error())
}
Expand Down Expand Up @@ -586,7 +586,8 @@ func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_
return resp, nil
}

// SetPeers is called by the implementor to indicate the pool of peers has changed
// SetPeers replaces the peers and shuts down all the previous peers.
// TODO this should return an error if we failed to connect to any of the new peers
func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
localPicker := s.conf.LocalPicker.New()
regionPicker := s.conf.RegionPicker.New()
Expand All @@ -597,27 +598,37 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
peer := s.conf.RegionPicker.GetByPeerInfo(info)
// If we don't have an existing PeerClient create a new one
if peer == nil {
peer = NewPeerClient(PeerConfig{
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
return
}
}
regionPicker.Add(peer)
continue
}
// If we don't have an existing PeerClient create a new one
peer := s.conf.LocalPicker.GetByPeerInfo(info)
if peer == nil {
peer = NewPeerClient(PeerConfig{
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
return
}
}
localPicker.Add(peer)
}
Expand Down
Loading

0 comments on commit 63fd9ff

Please sign in to comment.