-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace policy and action limiters with a checkin limiter #3255
Changes from 6 commits
3d414c4
5cdd609
469a987
ec839a1
2ef1aa6
0915476
3e3e13d
ad77db9
b9f17fb
274068d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ import ( | |
"github.com/hashicorp/go-version" | ||
"github.com/miolini/datacounter" | ||
"github.com/rs/zerolog" | ||
"golang.org/x/time/rate" | ||
|
||
"go.elastic.co/apm/module/apmhttp/v2" | ||
"go.elastic.co/apm/v2" | ||
|
@@ -75,6 +76,7 @@ type CheckinT struct { | |
// gwPool is a gzip.Writer pool intended to lower the amount of writers created when responding to checkin requests. | ||
// gzip.Writer allocations are expensive (~1.2MB each) and can exhaust an instance's memory if a lot of concurrent responses are sent (this occurs when a mass-action such as an upgrade is detected). | ||
// effectiveness of the pool is controlled by rate limiter configured through the limit.action_limit attribute. | ||
limit *rate.Limiter | ||
gwPool sync.Pool | ||
bulker bulk.Bulk | ||
} | ||
|
@@ -90,6 +92,13 @@ func NewCheckinT( | |
tr *action.TokenResolver, | ||
bulker bulk.Bulk, | ||
) *CheckinT { | ||
rt := rate.Every(cfg.Limits.ActionLimit.Interval) | ||
if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle != 0 { | ||
rt = rate.Every(cfg.Limits.PolicyThrottle) | ||
} else if cfg.Limits.ActionLimit.Interval == 0 && cfg.Limits.PolicyThrottle == 0 { | ||
rt = rate.Inf | ||
} | ||
zerolog.Ctx(context.TODO()).Debug().Any("event_rate", rt).Int("burst", cfg.Limits.ActionLimit.Burst).Msg("checkin response gzip limiter") | ||
ct := &CheckinT{ | ||
verCon: verCon, | ||
cfg: cfg, | ||
|
@@ -99,6 +108,7 @@ func NewCheckinT( | |
gcp: gcp, | ||
ad: ad, | ||
tr: tr, | ||
limit: rate.NewLimiter(rt, cfg.Limits.ActionLimit.Burst), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interval and burst are used to configure the rate limiter (interval is the time it takes for 1 token to be added to the rate limit pool, burst is max pool size) |
||
gwPool: sync.Pool{ | ||
New: func() any { | ||
zipper, err := gzip.NewWriterLevel(io.Discard, cfg.CompressionLevel) | ||
|
@@ -548,7 +558,17 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r | |
compressionLevel := ct.cfg.CompressionLevel | ||
compressThreshold := ct.cfg.CompressionThresh | ||
|
||
if len(payload) > compressThreshold && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) { | ||
if (len(fromPtr(resp.Actions)) > 0 || len(payload) > compressThreshold) && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) { | ||
// We compress the response if it would be over the threshold (default 1kb) or we are sending an action. | ||
// gzip.Writer allocations are expensive, and we can easily hit an OOM issue if we try to write a large number of responses at once. | ||
// This is likely to happen on bulk upgrades, or policy changes. | ||
// In order to avoid a massive memory allocation we do two things: | ||
// 1. re-use gzip.Writer instances across responses (through a pool) | ||
// 2. rate-limit access to the writer pool | ||
if err := ct.limit.Wait(ctx); err != nil { | ||
return fmt.Errorf("checkin response limiter error: %w", err) | ||
} | ||
|
||
wrCounter := datacounter.NewWriterCounter(w) | ||
|
||
zipper, _ := ct.gwPool.Get().(*gzip.Writer) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shouldn't there be a
ctx
passed instead ofcontext.TODO()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally yes, but we would need to make sure that the context is tied to the function instead of
checkinT
's lifecycle.We also use context.TODO in a few other places similar to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an existing issue to address this: #3087