Skip to content

Commit

Permalink
refactor: switch to leaky bucket instead of token bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-is-cute committed Oct 29, 2023
1 parent 9d70fc1 commit 989f3a0
Showing 1 changed file with 30 additions and 35 deletions.
65 changes: 30 additions & 35 deletions Util/GloballyThrottledStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ internal static long MaxBytesPerSecond {
}

private static readonly SemaphoreSlim Mutex = new(1, 1);
private static long _freqTokens;

/// <summary>
/// The fill level of the leaky bucket. Number of bytes read multiplied by
/// <see cref="Stopwatch.Frequency"/>.
/// </summary>
private static long _bucket;

private static long _lastRead = Stopwatch.GetTimestamp();

private Stream Inner { get; }
Expand Down Expand Up @@ -51,63 +57,52 @@ public override void Flush() {
this.Inner.Flush();
}

private static long AddTokens(long mbps) {
private static long Leak(long mbps) {
var now = Stopwatch.GetTimestamp();
var then = Interlocked.Exchange(ref _lastRead, now);
var tokensToAdd = (now - then) * mbps;
var leakAmt = (now - then) * mbps;

long freqTokens;
long bucket;
Mutex.Wait();
try {
var untilFull = mbps * Stopwatch.Frequency - _freqTokens;
if (untilFull > 0 && tokensToAdd > 0) {
_freqTokens += Math.Min(untilFull, tokensToAdd);
if (_bucket > 0) {
_bucket = Math.Max(0, _bucket - leakAmt);
}

freqTokens = _freqTokens;
bucket = mbps * Stopwatch.Frequency - _bucket;
} finally {
Mutex.Release();
}

return freqTokens;

// var now = Stopwatch.GetTimestamp();
// var then = Interlocked.Exchange(ref _lastRead, now);
// var tokensToAdd = (now - then) * _maxBytesPerSecond;
//
// var curTokens = Interlocked.CompareExchange(ref _freqTokens, 0, 0);
// var untilFull = _maxBytesPerSecond * Stopwatch.Frequency - curTokens;
// var freqTokens = untilFull > 0 && tokensToAdd > 0
// ? Interlocked.Add(ref _freqTokens, Math.Min(untilFull, tokensToAdd))
// : curTokens;
//
// return freqTokens;
return bucket;
}

public override int Read(byte[] buffer, int offset, int count) {
long mbps;
Mutex.Wait();
try {
mbps = _maxBytesPerSecond;
} finally {
Mutex.Release();
}
var mbps = _maxBytesPerSecond;

int amt;
if (mbps == 0) {
amt = count;
} else {
var freqTokens = AddTokens(mbps);
var bytes = (int) (freqTokens / Stopwatch.Frequency);
// available capacity in the bucket * freq
var bucket = Leak(mbps);
// number of bytes the bucket has space for
var bytes = (int) (bucket / Stopwatch.Frequency);

// let's not do a million tiny reads
var exp = (int) Math.Truncate(Math.Log2(mbps));
// wait until between 1 and 65536 bytes are available, depending on
// the buffer size and the speed limit
var exp = (int) Math.Truncate(Math.Log2(Math.Min(count, mbps)));
var lessThan = Math.Pow(2, Math.Clamp(exp, 0, 16));

while (bytes < lessThan) {
Thread.Sleep(TimeSpan.FromMilliseconds(5));
freqTokens = AddTokens(mbps);
bytes = (int) (freqTokens / Stopwatch.Frequency);
bucket = Leak(mbps);
bytes = (int) (bucket / Stopwatch.Frequency);
}

// read how many bytes are available or the buffer size, whichever
// is smaller
amt = Math.Min(bytes, count);
}

Expand All @@ -116,7 +111,7 @@ public override int Read(byte[] buffer, int offset, int count) {
if (mbps != 0) {
Mutex.Wait();
try {
_freqTokens -= read * Stopwatch.Frequency;
_bucket += read * Stopwatch.Frequency;
} finally {
Mutex.Release();
}
Expand All @@ -141,4 +136,4 @@ public override void SetLength(long value) {
public override void Write(byte[] buffer, int offset, int count) {
this.Inner.Write(buffer, offset, count);
}
}
}

0 comments on commit 989f3a0

Please sign in to comment.