Skip to content

Commit

Permalink
refactor: switch to Parallel instead of spawning tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-is-cute committed Aug 23, 2024
1 parent 93d069b commit 8a82fe0
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 134 deletions.
235 changes: 123 additions & 112 deletions DownloadTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -398,30 +398,35 @@ private async Task DownloadFiles(IDownloadTask_GetVersion info) {
throw new DirectoryNotFoundException($"Directory '{filesPath}' could not be found after waiting");
}

var tasks = info.Batched
var task = info.Batched
? this.DownloadBatchedFiles(info.NeededFiles, info.Batches, filesPath)
: this.DownloadNormalFiles(info.NeededFiles, filesPath);
await Task.WhenAll(tasks);
await task;
}

private IEnumerable<Task> DownloadNormalFiles(IDownloadTask_GetVersion_NeededFiles neededFiles, string filesPath) {
private Task DownloadNormalFiles(IDownloadTask_GetVersion_NeededFiles neededFiles, string filesPath) {
using var span = this.Transaction?.StartChild(nameof(this.DownloadNormalFiles));

this.State = State.DownloadingFiles;
this.SetStateData(0, (uint) neededFiles.Files.Files.Count);

return neededFiles.Files.Files
.Select(pair => Task.Run(async () => {
return Parallel.ForEachAsync(
neededFiles.Files.Files,
new ParallelOptions {
CancellationToken = this.CancellationToken.Token,
},
async (pair, token) => {
var (hash, files) = pair;
var outputPaths = GetOutputPaths(files);
using (await SemaphoreGuard.WaitAsync(Plugin.DownloadSemaphore, this.CancellationToken.Token)) {
using (await SemaphoreGuard.WaitAsync(Plugin.DownloadSemaphore, token)) {
await this.DownloadFile(new Uri(neededFiles.BaseUri), filesPath, outputPaths, hash);
}
}));
}
);
}

private IEnumerable<Task> DownloadBatchedFiles(IDownloadTask_GetVersion_NeededFiles neededFiles, BatchList batches, string filesPath) {
private Task DownloadBatchedFiles(IDownloadTask_GetVersion_NeededFiles neededFiles, BatchList batches, string filesPath) {
using var span = this.Transaction?.StartChild(nameof(this.DownloadBatchedFiles));

var neededHashes = neededFiles.Files.Files.Keys.ToList();
Expand All @@ -446,134 +451,140 @@ private IEnumerable<Task> DownloadBatchedFiles(IDownloadTask_GetVersion_NeededFi
this.State = State.DownloadingFiles;
this.SetStateData(0, (uint) neededFiles.Files.Files.Count);

return clonedBatches.Select(pair => Task.Run(async () => {
var (batch, batchedFiles) = pair;
return Parallel.ForEachAsync(
clonedBatches,
new ParallelOptions {
CancellationToken = this.CancellationToken.Token,
},
async (pair, token) => {
var (batch, batchedFiles) = pair;
// find which files from this batch we already have a hash for
var toDuplicate = new HashSet<string>();
foreach (var hash in batchedFiles.Keys) {
if (!this.ExistingHashPaths.TryGetValue(hash, out var path)) {
continue;
// find which files from this batch we already have a hash for
var toDuplicate = new HashSet<string>();
foreach (var hash in batchedFiles.Keys) {
if (!this.ExistingHashPaths.TryGetValue(hash, out var path)) {
continue;
}
toDuplicate.Add(path);
}
toDuplicate.Add(path);
}
// sort files in batch by offset, removing already-downloaded files
var listOfFiles = batchedFiles
.Select(pair => (Hash: pair.Key, Info: pair.Value))
.Where(pair => !this.ExistingHashPaths.ContainsKey(pair.Hash))
.OrderBy(pair => pair.Info.Offset).ToList();
if (listOfFiles.Count > 0) {
// calculate ranges
var ranges = new List<(ulong, ulong)>();
var begin = 0ul;
var end = 0ul;
var chunk = new List<string>();
var chunks = new List<List<string>>();
foreach (var (hash, info) in listOfFiles) {
if (begin == 0 && end == 0) {
// first item, so set begin and end
begin = info.Offset;
end = info.Offset + info.SizeCompressed;
// add the hash to this chunk
chunk.Add(hash);
// sort files in batch by offset, removing already-downloaded files
var listOfFiles = batchedFiles
.Select(pair => (Hash: pair.Key, Info: pair.Value))
.Where(pair => !this.ExistingHashPaths.ContainsKey(pair.Hash))
.OrderBy(pair => pair.Info.Offset).ToList();
if (listOfFiles.Count > 0) {
// calculate ranges
var ranges = new List<(ulong, ulong)>();
var begin = 0ul;
var end = 0ul;
var chunk = new List<string>();
var chunks = new List<List<string>>();
foreach (var (hash, info) in listOfFiles) {
if (begin == 0 && end == 0) {
// first item, so set begin and end
continue;
}
if (info.Offset == end) {
// there's no gap, so extend the end of this range
end += info.SizeCompressed;
// add the hash to this chunk
chunk.Add(hash);
continue;
}
// there is a gap
// add this chunk to the list of chunks
chunks.Add(chunk);
// make a new chunk
chunk = [];
// add the range to the list of ranges
ranges.Add((begin, end));
// start a new range after the gap
begin = info.Offset;
end = info.Offset + info.SizeCompressed;
// add the hash to this chunk
chunk.Add(hash);
continue;
}
if (info.Offset == end) {
// there's no gap, so extend the end of this range
end += info.SizeCompressed;
// add the hash to this chunk
// add the hash to the new chunk
chunk.Add(hash);
continue;
}
// there is a gap
// add this chunk to the list of chunks
chunks.Add(chunk);
// make a new chunk
chunk = [];
// add the range to the list of ranges
ranges.Add((begin, end));
if (end != 0) {
// add the last range if necessary
ranges.Add((begin, end));
// start a new range after the gap
begin = info.Offset;
end = info.Offset + info.SizeCompressed;
// add the hash to the new chunk
chunk.Add(hash);
}
if (chunk.Count > 0) {
chunks.Add(chunk);
}
}
if (end != 0) {
// add the last range if necessary
ranges.Add((begin, end));
// check if we're just downloading the whole file - cf cache
// won't kick in for range requests
var totalBatchSize = batchedFiles.Values
.Select(file => file.SizeCompressed)
// no Sum function for ulong
.Aggregate((total, size) => total + size);
RangeHeaderValue? rangeHeader;
if (ranges is [{ Item1: 0, Item2: var rangeEnd }] && rangeEnd == totalBatchSize) {
rangeHeader = null;
} else {
// construct the header
rangeHeader = new RangeHeaderValue();
foreach (var (from, to) in ranges) {
rangeHeader.Ranges.Add(new RangeItemHeaderValue((long) from, (long) to));
}
}
if (chunk.Count > 0) {
chunks.Add(chunk);
// construct the uri
var baseUri = new Uri(new Uri(neededFiles.BaseUri), "../batches/");
var uri = new Uri(baseUri, batch);
using (await SemaphoreGuard.WaitAsync(Plugin.DownloadSemaphore, token)) {
var counter = new StateCounter();
await Plugin.Resilience.ExecuteAsync(
async _ => {
// if we're retrying, remove the files that this task added
this.StateData -= counter.Added;
counter.Added = 0;
await this.DownloadBatchedFile(neededFiles, filesPath, uri, rangeHeader, chunks, batchedFiles, counter);
},
token
);
}
}
// check if we're just downloading the whole file - cf cache
// won't kick in for range requests
var totalBatchSize = batchedFiles.Values
.Select(file => file.SizeCompressed)
// no Sum function for ulong
.Aggregate((total, size) => total + size);
foreach (var path in toDuplicate) {
var joined = Path.Join(filesPath, path);
RangeHeaderValue? rangeHeader;
if (ranges is [{ Item1: 0, Item2: var rangeEnd }] && rangeEnd == totalBatchSize) {
rangeHeader = null;
} else {
// construct the header
rangeHeader = new RangeHeaderValue();
foreach (var (from, to) in ranges) {
rangeHeader.Ranges.Add(new RangeItemHeaderValue((long) from, (long) to));
if (!File.Exists(joined)) {
Plugin.Log.Warning($"{joined} was supposed to be duplicated but no longer exists");
continue;
}
}
// construct the uri
var baseUri = new Uri(new Uri(neededFiles.BaseUri), "../batches/");
var uri = new Uri(baseUri, batch);
using (await SemaphoreGuard.WaitAsync(Plugin.DownloadSemaphore, this.CancellationToken.Token)) {
var counter = new StateCounter();
await Plugin.Resilience.ExecuteAsync(
async _ => {
// if we're retrying, remove the files that this task added
this.StateData -= counter.Added;
counter.Added = 0;
await this.DownloadBatchedFile(neededFiles, filesPath, uri, rangeHeader, chunks, batchedFiles, counter);
},
this.CancellationToken.Token
);
}
}
if (!this.ExistingPathHashes.TryGetValue(path, out var hash)) {
throw new Exception("missing hash for file to duplicate");
}
foreach (var path in toDuplicate) {
var joined = Path.Join(filesPath, path);
var gamePaths = neededFiles.Files.Files[hash];
var outputPaths = GetOutputPaths(gamePaths);
if (!File.Exists(joined)) {
Plugin.Log.Warning($"{joined} was supposed to be duplicated but no longer exists");
continue;
}
await DuplicateFile(filesPath, outputPaths, joined);
if (!this.ExistingPathHashes.TryGetValue(path, out var hash)) {
throw new Exception("missing hash for file to duplicate");
this.StateData += 1;
}
var gamePaths = neededFiles.Files.Files[hash];
var outputPaths = GetOutputPaths(gamePaths);
await DuplicateFile(filesPath, outputPaths, joined);
this.StateData += 1;
}
}));
);
}

private class StateCounter {
Expand Down
35 changes: 18 additions & 17 deletions PackageState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,26 @@ internal async Task UpdatePackages() {
Interlocked.Exchange(ref this.CurrentDirectory, 0);
Interlocked.Exchange(ref this.DirectoriesToScan, dirs.Count);

var tasks = dirs.Select(dir => Task.Run(async () => {
Interlocked.Increment(ref this.CurrentDirectory);
if (dir.StartsWith("hs-")) {
try {
await this.LoadPackage(dir, penumbraPath);
} catch (Exception ex) {
ErrorHelper.Handle(ex, "Could not load package");
}
} else {
try {
await this.LoadExternalPackage(dir, penumbraPath);
} catch (Exception ex) {
ErrorHelper.Handle(ex, "Could not load external package");
await Parallel.ForEachAsync(
dirs,
async (dir, _) => {
Interlocked.Increment(ref this.CurrentDirectory);
if (dir.StartsWith("hs-")) {
try {
await this.LoadPackage(dir, penumbraPath);
} catch (Exception ex) {
ErrorHelper.Handle(ex, "Could not load package");
}
} else {
try {
await this.LoadExternalPackage(dir, penumbraPath);
} catch (Exception ex) {
ErrorHelper.Handle(ex, "Could not load external package");
}
}
}
}));

await Task.WhenAll(tasks);
);

Interlocked.Exchange(ref this.DirectoriesToScan, -1);

Expand Down
10 changes: 5 additions & 5 deletions Ui/InstallerWindow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ internal InstallerWindow(Plugin plugin, Guid packageId, Guid variantId, Guid ver
var images = this.Info.InstallerImages;
this._imagesDownloading = images.Images.Images.Count;
var tasks = images.Images.Images
.Select(entry => Task.Run(async () => {
await Parallel.ForEachAsync(
images.Images.Images,
async (entry, _) => {
var hash = entry.Key;
var paths = entry.Value;
Expand Down Expand Up @@ -85,9 +86,8 @@ internal InstallerWindow(Plugin plugin, Guid packageId, Guid variantId, Guid ver
} finally {
this._imagesDownloading -= 1;
}
}));
await Task.WhenAll(tasks);
}
);
});
}

Expand Down

0 comments on commit 8a82fe0

Please sign in to comment.