From 989f3a08a8128983437558e8039dbc0a6ae65a63 Mon Sep 17 00:00:00 2001 From: Anna Date: Sun, 29 Oct 2023 09:49:03 -0400 Subject: [PATCH] refactor: switch to leaky bucket instead of token bucket --- Util/GloballyThrottledStream.cs | 65 +++++++++++++++------------------ 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/Util/GloballyThrottledStream.cs b/Util/GloballyThrottledStream.cs index 60153b2..c49a21b 100644 --- a/Util/GloballyThrottledStream.cs +++ b/Util/GloballyThrottledStream.cs @@ -12,7 +12,13 @@ internal static long MaxBytesPerSecond { } private static readonly SemaphoreSlim Mutex = new(1, 1); - private static long _freqTokens; + + /// + /// The fill level of the leaky bucket. Number of bytes read multiplied by + /// . + /// + private static long _bucket; + private static long _lastRead = Stopwatch.GetTimestamp(); private Stream Inner { get; } @@ -51,63 +57,52 @@ public override void Flush() { this.Inner.Flush(); } - private static long AddTokens(long mbps) { + private static long Leak(long mbps) { var now = Stopwatch.GetTimestamp(); var then = Interlocked.Exchange(ref _lastRead, now); - var tokensToAdd = (now - then) * mbps; + var leakAmt = (now - then) * mbps; - long freqTokens; + long bucket; Mutex.Wait(); try { - var untilFull = mbps * Stopwatch.Frequency - _freqTokens; - if (untilFull > 0 && tokensToAdd > 0) { - _freqTokens += Math.Min(untilFull, tokensToAdd); + if (_bucket > 0) { + _bucket = Math.Max(0, _bucket - leakAmt); } - freqTokens = _freqTokens; + bucket = mbps * Stopwatch.Frequency - _bucket; } finally { Mutex.Release(); } - return freqTokens; - - // var now = Stopwatch.GetTimestamp(); - // var then = Interlocked.Exchange(ref _lastRead, now); - // var tokensToAdd = (now - then) * _maxBytesPerSecond; - // - // var curTokens = Interlocked.CompareExchange(ref _freqTokens, 0, 0); - // var untilFull = _maxBytesPerSecond * Stopwatch.Frequency - curTokens; - // var freqTokens = untilFull > 0 && tokensToAdd > 0 - // ? Interlocked.Add(ref _freqTokens, Math.Min(untilFull, tokensToAdd)) - // : curTokens; - // - // return freqTokens; + return bucket; } public override int Read(byte[] buffer, int offset, int count) { - long mbps; - Mutex.Wait(); - try { - mbps = _maxBytesPerSecond; - } finally { - Mutex.Release(); - } + var mbps = _maxBytesPerSecond; int amt; if (mbps == 0) { amt = count; } else { - var freqTokens = AddTokens(mbps); - var bytes = (int) (freqTokens / Stopwatch.Frequency); + // available capacity in the bucket * freq + var bucket = Leak(mbps); + // number of bytes the bucket has space for + var bytes = (int) (bucket / Stopwatch.Frequency); + // let's not do a million tiny reads - var exp = (int) Math.Truncate(Math.Log2(mbps)); + // wait until between 1 and 65536 bytes are available, depending on + // the buffer size and the speed limit + var exp = (int) Math.Truncate(Math.Log2(Math.Min(count, mbps))); var lessThan = Math.Pow(2, Math.Clamp(exp, 0, 16)); + while (bytes < lessThan) { Thread.Sleep(TimeSpan.FromMilliseconds(5)); - freqTokens = AddTokens(mbps); - bytes = (int) (freqTokens / Stopwatch.Frequency); + bucket = Leak(mbps); + bytes = (int) (bucket / Stopwatch.Frequency); } + // read how many bytes are available or the buffer size, whichever + // is smaller amt = Math.Min(bytes, count); } @@ -116,7 +111,7 @@ public override int Read(byte[] buffer, int offset, int count) { if (mbps != 0) { Mutex.Wait(); try { - _freqTokens -= read * Stopwatch.Frequency; + _bucket += read * Stopwatch.Frequency; } finally { Mutex.Release(); } @@ -141,4 +136,4 @@ public override void SetLength(long value) { public override void Write(byte[] buffer, int offset, int count) { this.Inner.Write(buffer, offset, count); } -} \ No newline at end of file +}