From 1b7fc5dab698d0832d2dad3864a2f31e793988a0 Mon Sep 17 00:00:00 2001 From: Anna Date: Sun, 29 Oct 2023 15:43:51 -0400 Subject: [PATCH] fix: work around needing the inner read in a mutex --- Util/GloballyThrottledStream.cs | 119 +++++++++++++++++++------------- 1 file changed, 71 insertions(+), 48 deletions(-) diff --git a/Util/GloballyThrottledStream.cs b/Util/GloballyThrottledStream.cs index 46c7f67..469be83 100644 --- a/Util/GloballyThrottledStream.cs +++ b/Util/GloballyThrottledStream.cs @@ -57,73 +57,96 @@ public override void Flush() { this.Inner.Flush(); } - private static ulong Leak(ulong mbps) { - var now = (ulong) Stopwatch.GetTimestamp(); - var then = _lastRead; - var leakAmt = checked(now - then) * mbps; - _lastRead = now; - - if (_bucket > 0 && leakAmt > 0) { - _bucket = leakAmt > _bucket - ? 0 - : checked(_bucket - leakAmt); - } + private static bool Leak(ulong mbps, ulong wantedBytes) { + Mutex.Wait(); + try { + var now = (ulong) Stopwatch.GetTimestamp(); + var then = _lastRead; + var leakAmt = checked(now - then) * mbps; + _lastRead = now; + + if (_bucket > 0 && leakAmt > 0) { + _bucket = leakAmt >= _bucket + ? 0 + : checked(_bucket - leakAmt); + } - var mul = mbps * (ulong) Stopwatch.Frequency; - if (_bucket > mul) { - // by changing the speed limit, we have now overfilled the - // bucket. remove excess - _bucket = mul; - } + var max = mbps * (ulong) Stopwatch.Frequency; + if (_bucket > max) { + // by changing the speed limit, we have now overfilled the + // bucket. remove excess + Console.WriteLine("removing excess"); + _bucket = max; + } + + var wantedFreq = wantedBytes * (ulong) Stopwatch.Frequency; + if (_bucket + wantedFreq > max) { + return false; + } - return checked(mul - _bucket); + _bucket = checked(_bucket + wantedFreq); + return true; + } finally { + Mutex.Release(); + } } public override int Read(byte[] buffer, int offset, int count) { - var mbps = MaxBytesPerSecond; - int read; - if (mbps == 0) { - read = this.Inner.Read(buffer, offset, count); - goto End; + if (MaxBytesPerSecond == 0) { + goto Unlimited; } - Mutex.Wait(); - try { - // available capacity in the bucket * freq - var bucket = Leak(mbps); - // number of bytes the bucket has space for - var bytes = (int) (bucket / (ulong) Stopwatch.Frequency); - - // let's not do a million tiny reads - // wait until between 1 and 65536 bytes are available, depending on - // the buffer size and the speed limit - var exp = (int) Math.Truncate(Math.Log2(Math.Min(count, (int) mbps))); - var lessThan = Math.Pow(2, Math.Clamp(exp, 0, 16)); + var (mbps, toRead) = CalculateToRead(); + while (!Leak(mbps, toRead)) { + Thread.Sleep(TimeSpan.FromMilliseconds(50)); + (mbps, toRead) = CalculateToRead(); + // make sure to check if the max bytes per sec was set to 0 + if (mbps == 0) { + goto Unlimited; + } + } - while (bytes < lessThan) { - Thread.Sleep(TimeSpan.FromMilliseconds(5)); - bucket = Leak(mbps); - bytes = (int) (bucket / (ulong) Stopwatch.Frequency); + read = this.Inner.Read(buffer, offset, (int) toRead); + // Leak says that we read this amount, so if we didn't, give back what + // we didn't read + if ((ulong) read < toRead) { + Mutex.Wait(); + try { + var over = checked(toRead - (ulong) read) * (ulong) Stopwatch.Frequency; + _bucket = over >= _bucket + ? 0 + : checked(_bucket - over); + } finally { + Mutex.Release(); } + } - // read how many bytes are available or the buffer size, whichever - // is smaller - var amt = Math.Min(bytes, count); + this.Entries.PushRight(new DownloadTask.Measurement { + Ticks = Stopwatch.GetTimestamp(), + Data = (uint) read, + }); - read = this.Inner.Read(buffer, offset, amt); - _bucket += (ulong) read * (ulong) Stopwatch.Frequency; - } finally { - Mutex.Release(); - } + return read; - End: + Unlimited: + read = this.Inner.Read(buffer, offset, count); this.Entries.PushRight(new DownloadTask.Measurement { Ticks = Stopwatch.GetTimestamp(), Data = (uint) read, }); return read; + + (ulong Mbps, ulong ToRead) CalculateToRead() { + // let's not do a million tiny reads, ask for a decent amount + // wait until between 1 and 65536 bytes are available, depending on + // the buffer size and the speed limit + var mbps = MaxBytesPerSecond; + var exp = (int) Math.Truncate(Math.Log2(Math.Min(count, (int) mbps))); + var wanted = (ulong) Math.Pow(2, Math.Clamp(exp, 0, 16)); + return (mbps, Math.Min(wanted, (ulong) count)); + } } public override long Seek(long offset, SeekOrigin origin) {