Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
perf: Fanout for async updates in global (#198)
Browse files Browse the repository at this point in the history
* perf: Fanout for async updates in global

* fix: data race

* use fanout

* make it configurable

* rename config
  • Loading branch information
miparnisari authored Oct 30, 2023
1 parent 71238ba commit 3ae0fdf
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type BehaviorConfig struct {
GlobalBatchLimit int
// ForceGlobal forces global mode on all rate limit checks.
ForceGlobal bool

// Number of concurrent requests that will be made to peers. Defaults to 100
GlobalPeerRequestsConcurrency int
}

// Config for a gubernator instance
Expand Down Expand Up @@ -126,6 +129,8 @@ func (c *Config) SetDefaults() error {
setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize)
setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*100)

setter.SetDefault(&c.Behaviors.GlobalPeerRequestsConcurrency, 100)

setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas))
setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil))

Expand Down
48 changes: 29 additions & 19 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,23 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
}
}

fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency)
// Send the rate limit requests to their respective owning peers.
for _, p := range peerRequests {
ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
_, err := p.client.GetPeerRateLimits(ctx, &p.req)
cancel()

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
continue
}
fan.Run(func(in interface{}) error {
p := in.(*pair)
ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
_, err := p.client.GetPeerRateLimits(ctx, &p.req)
cancel()

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
}
return nil
}, p)
}
fan.Wait()
}

// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
Expand Down Expand Up @@ -232,24 +237,29 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
})
}

fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency)
for _, peer := range gm.instance.GetPeerList() {
// Exclude ourselves from the update
if peer.Info().IsOwner {
continue
}

ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
fan.Run(func(in interface{}) error {
peer := in.(*PeerClient)
ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
}
}
continue
}
return nil
}, peer)
}
fan.Wait()
}

func (gm *globalManager) Close() {
Expand Down

0 comments on commit 3ae0fdf

Please sign in to comment.