Skip to content

Commit

Permalink
fix: work around needing the inner read in a mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-is-cute committed Oct 29, 2023
1 parent a840d6b commit 1b7fc5d
Showing 1 changed file with 71 additions and 48 deletions.
119 changes: 71 additions & 48 deletions Util/GloballyThrottledStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,73 +57,96 @@ public override void Flush() {
this.Inner.Flush();
}

private static ulong Leak(ulong mbps) {
var now = (ulong) Stopwatch.GetTimestamp();
var then = _lastRead;
var leakAmt = checked(now - then) * mbps;
_lastRead = now;

if (_bucket > 0 && leakAmt > 0) {
_bucket = leakAmt > _bucket
? 0
: checked(_bucket - leakAmt);
}
private static bool Leak(ulong mbps, ulong wantedBytes) {
Mutex.Wait();
try {
var now = (ulong) Stopwatch.GetTimestamp();
var then = _lastRead;
var leakAmt = checked(now - then) * mbps;
_lastRead = now;

if (_bucket > 0 && leakAmt > 0) {
_bucket = leakAmt >= _bucket
? 0
: checked(_bucket - leakAmt);
}

var mul = mbps * (ulong) Stopwatch.Frequency;
if (_bucket > mul) {
// by changing the speed limit, we have now overfilled the
// bucket. remove excess
_bucket = mul;
}
var max = mbps * (ulong) Stopwatch.Frequency;
if (_bucket > max) {
// by changing the speed limit, we have now overfilled the
// bucket. remove excess
Console.WriteLine("removing excess");
_bucket = max;
}

var wantedFreq = wantedBytes * (ulong) Stopwatch.Frequency;
if (_bucket + wantedFreq > max) {
return false;
}

return checked(mul - _bucket);
_bucket = checked(_bucket + wantedFreq);
return true;
} finally {
Mutex.Release();
}
}

public override int Read(byte[] buffer, int offset, int count) {
var mbps = MaxBytesPerSecond;

int read;
if (mbps == 0) {
read = this.Inner.Read(buffer, offset, count);
goto End;
if (MaxBytesPerSecond == 0) {
goto Unlimited;
}

Mutex.Wait();
try {
// available capacity in the bucket * freq
var bucket = Leak(mbps);
// number of bytes the bucket has space for
var bytes = (int) (bucket / (ulong) Stopwatch.Frequency);

// let's not do a million tiny reads
// wait until between 1 and 65536 bytes are available, depending on
// the buffer size and the speed limit
var exp = (int) Math.Truncate(Math.Log2(Math.Min(count, (int) mbps)));
var lessThan = Math.Pow(2, Math.Clamp(exp, 0, 16));
var (mbps, toRead) = CalculateToRead();
while (!Leak(mbps, toRead)) {
Thread.Sleep(TimeSpan.FromMilliseconds(50));
(mbps, toRead) = CalculateToRead();
// make sure to check if the max bytes per sec was set to 0
if (mbps == 0) {
goto Unlimited;
}
}

while (bytes < lessThan) {
Thread.Sleep(TimeSpan.FromMilliseconds(5));
bucket = Leak(mbps);
bytes = (int) (bucket / (ulong) Stopwatch.Frequency);
read = this.Inner.Read(buffer, offset, (int) toRead);
// Leak says that we read this amount, so if we didn't, give back what
// we didn't read
if ((ulong) read < toRead) {
Mutex.Wait();
try {
var over = checked(toRead - (ulong) read) * (ulong) Stopwatch.Frequency;
_bucket = over >= _bucket
? 0
: checked(_bucket - over);
} finally {
Mutex.Release();
}
}

// read how many bytes are available or the buffer size, whichever
// is smaller
var amt = Math.Min(bytes, count);
this.Entries.PushRight(new DownloadTask.Measurement {
Ticks = Stopwatch.GetTimestamp(),
Data = (uint) read,
});

read = this.Inner.Read(buffer, offset, amt);
_bucket += (ulong) read * (ulong) Stopwatch.Frequency;
} finally {
Mutex.Release();
}
return read;

End:
Unlimited:
read = this.Inner.Read(buffer, offset, count);
this.Entries.PushRight(new DownloadTask.Measurement {
Ticks = Stopwatch.GetTimestamp(),
Data = (uint) read,
});

return read;

(ulong Mbps, ulong ToRead) CalculateToRead() {
// let's not do a million tiny reads, ask for a decent amount
// wait until between 1 and 65536 bytes are available, depending on
// the buffer size and the speed limit
var mbps = MaxBytesPerSecond;
var exp = (int) Math.Truncate(Math.Log2(Math.Min(count, (int) mbps)));
var wanted = (ulong) Math.Pow(2, Math.Clamp(exp, 0, 16));
return (mbps, Math.Min(wanted, (ulong) count));
}
}

public override long Seek(long offset, SeekOrigin origin) {
Expand Down

0 comments on commit 1b7fc5d

Please sign in to comment.