Skip to content

Commit

Permalink
Async improvements (#1007)
Browse files Browse the repository at this point in the history
* Improve async code.

* Improve assets.

* Fix tests.

* More tests

* Fix mapping.

* Fix header names.

* Fix languages header
  • Loading branch information
SebastianStehle authored Jul 24, 2023
1 parent 6b7c6fc commit 2e95bf8
Show file tree
Hide file tree
Showing 65 changed files with 1,039 additions and 1,082 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected override async Task<Result> ExecuteJobAsync(ScriptJob job,
AppName = job.Event.AppId.Name,
};

if (job.Event is EnrichedUserEventBase userEvent)
if (job.Event is EnrichedUserEventBase)
{
vars.User = AllPrinicpal();
}
Expand Down
108 changes: 41 additions & 67 deletions backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
// ==========================================================================

using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks.Dataflow;
using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Infrastructure;
Expand All @@ -28,104 +27,79 @@ public AddAppIdToEventStream(IMongoDatabase database)
public async Task UpdateAsync(
CancellationToken ct)
{
const int SizeOfBatch = 1000;
const int SizeOfQueue = 20;

// Do not resolve in constructor, because most of the time it is not executed anyway.
var collectionV1 = database.GetCollection<BsonDocument>("Events");
var collectionV2 = database.GetCollection<BsonDocument>("Events2");

var batchBlock = new BatchBlock<BsonDocument>(SizeOfBatch, new GroupingDataflowBlockOptions
// Run batch first, because it is cheaper as it has less items.
var batchedCommits = collectionV1.Find(FindAll).ToAsyncEnumerable(ct).Batch(500, ct).Buffered(2, ct);

var options = new ParallelOptions
{
BoundedCapacity = SizeOfQueue * SizeOfBatch
});
CancellationToken = ct,
// The tasks are mostly executed on database level, therefore we increase parallelism.
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
};

var actionBlock = new ActionBlock<BsonDocument[]>(async batch =>
await Parallel.ForEachAsync(batchedCommits, ct, async (batch, ct) =>
{
try
var writes = new List<WriteModel<BsonDocument>>();
foreach (var document in batch)
{
var writes = new List<WriteModel<BsonDocument>>();
var eventStream = document["EventStream"].AsString;
foreach (var document in batch)
if (TryGetAppId(document, out var appId))
{
var eventStream = document["EventStream"].AsString;
if (TryGetAppId(document, out var appId))
if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase))
{
if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase))
{
var indexOfType = eventStream.IndexOf('-', StringComparison.Ordinal);
var indexOfId = indexOfType + 1;
var indexOfType = eventStream.IndexOf('-', StringComparison.Ordinal);
var indexOfId = indexOfType + 1;
var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
if (indexOfOldId > 0)
{
indexOfId = indexOfOldId + 2;
}
var domainType = eventStream[..indexOfType];
var domainId = eventStream[indexOfId..];
if (indexOfOldId > 0)
{
indexOfId = indexOfOldId + 2;
}
var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString();
var newStreamName = $"{domainType}-{newDomainId}";
var domainType = eventStream[..indexOfType];
var domainId = eventStream[indexOfId..];
document["EventStream"] = newStreamName;
var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString();
var newStreamName = $"{domainType}-{newDomainId}";
foreach (var @event in document["Events"].AsBsonArray)
{
var metadata = @event["Metadata"].AsBsonDocument;
metadata["AggregateId"] = newDomainId;
}
}
document["EventStream"] = newStreamName;
foreach (var @event in document["Events"].AsBsonArray)
{
var metadata = @event["Metadata"].AsBsonDocument;
metadata.Remove("AppId");
metadata["AggregateId"] = newDomainId;
}
}
var filter = Builders<BsonDocument>.Filter.Eq("_id", document["_id"].AsString);
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
foreach (var @event in document["Events"].AsBsonArray)
{
IsUpsert = true
});
var metadata = @event["Metadata"].AsBsonDocument;
metadata.Remove("AppId");
}
}
if (writes.Count > 0)
var filter = Builders<BsonDocument>.Filter.Eq("_id", document["_id"].AsString);
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
{
await collectionV2.BulkWriteAsync(writes, BulkUnordered, ct);
}
IsUpsert = true
});
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
MaxMessagesPerTask = 10,
BoundedCapacity = SizeOfQueue
});

batchBlock.BidirectionalLinkTo(actionBlock);
await foreach (var commit in collectionV1.Find(FindAll).ToAsyncEnumerable(ct: ct))
{
if (!await batchBlock.SendAsync(commit, ct))
if (writes.Count > 0)
{
break;
await collectionV2.BulkWriteAsync(writes, BulkUnordered, ct);
}
}

batchBlock.Complete();

await actionBlock.Completion;
});
}

private static bool TryGetAppId(BsonDocument document, [MaybeNullWhen(false)] out string appId)
Expand Down
84 changes: 23 additions & 61 deletions backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================

using System.Threading.Tasks.Dataflow;
using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Infrastructure;
Expand Down Expand Up @@ -72,9 +71,6 @@ public async Task UpdateAsync(
private static async Task RebuildAsync(IMongoDatabase database, Action<BsonDocument>? extraAction, string collectionNameV1,
CancellationToken ct)
{
const int SizeOfBatch = 1000;
const int SizeOfQueue = 10;

string collectionNameV2;

collectionNameV2 = $"{collectionNameV1}2";
Expand All @@ -91,80 +87,46 @@ private static async Task RebuildAsync(IMongoDatabase database, Action<BsonDocum

await collectionV2.DeleteManyAsync(FindAll, ct);

var batchBlock = new BatchBlock<BsonDocument>(SizeOfBatch, new GroupingDataflowBlockOptions
{
BoundedCapacity = SizeOfQueue * SizeOfBatch
});
// Run batch first, because it is cheaper as it has less items.
var batches = collectionV1.Find(FindAll).ToAsyncEnumerable(ct).Batch(500, ct).Buffered(2, ct);

var writeOptions = new BulkWriteOptions
await Parallel.ForEachAsync(batches, ct, async (batch, ct) =>
{
IsOrdered = false
};
var writes = new List<WriteModel<BsonDocument>>();
var actionBlock = new ActionBlock<BsonDocument[]>(async batch =>
{
try
foreach (var document in batch)
{
var writes = new List<WriteModel<BsonDocument>>();
foreach (var document in batch)
{
var appId = document["_ai"].AsString;
var appId = document["_ai"].AsString;
var documentIdOld = document["_id"].AsString;
var documentIdOld = document["_id"].AsString;
if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase))
{
var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase))
{
var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
documentIdOld = documentIdOld[(index + 2)..];
}
documentIdOld = documentIdOld[(index + 2)..];
}
var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString();
var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString();
document["id"] = documentIdOld;
document["_id"] = documentIdNew;
document["id"] = documentIdOld;
document["_id"] = documentIdNew;
extraAction?.Invoke(document);
extraAction?.Invoke(document);
var filter = Filter.Eq("_id", documentIdNew);
var filter = Filter.Eq("_id", documentIdNew);
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
{
IsUpsert = true
});
}
if (writes.Count > 0)
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
{
await collectionV2.BulkWriteAsync(writes, writeOptions, ct);
}
IsUpsert = true
});
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 2,
MaxMessagesPerTask = 1,
BoundedCapacity = SizeOfQueue
});

batchBlock.BidirectionalLinkTo(actionBlock);
await foreach (var document in collectionV1.Find(FindAll).ToAsyncEnumerable(ct: ct))
{
if (!await batchBlock.SendAsync(document, ct))
if (writes.Count > 0)
{
break;
await collectionV2.BulkWriteAsync(writes, BulkUnordered, ct);
}
}

batchBlock.Complete();

await actionBlock.Completion;
});
}

private static void ConvertParentId(BsonDocument document)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ public static class AssetHeaders
{
public const string NoEnrichment = "X-NoAssetEnrichment";

public static bool ShouldSkipAssetEnrichment(this Context context)
public static bool NoAssetEnrichment(this Context context)
{
return context.Headers.ContainsKey(NoEnrichment);
return context.AsBoolean(NoEnrichment);
}

public static ICloneBuilder WithoutAssetEnrichment(this ICloneBuilder builder, bool value = true)
public static ICloneBuilder WithNoAssetEnrichment(this ICloneBuilder builder, bool value = true)
{
return builder.WithBoolean(NoEnrichment, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private static bool TryGetAssetRef(FluidValue input, out AssetRef assetRef)

var requestContext =
Context.Admin(app).Clone(b => b
.WithoutTotal());
.WithNoTotal());

var asset = await assetQuery.FindAsync(requestContext, domainId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private void GetAssets(ScriptExecutionContext context, DomainId appId, ClaimsPri
var requestContext =
new Context(user, app).Clone(b => b
.WithoutTotal());
.WithNoTotal());
var assets = await assetQuery.QueryAsync(requestContext, null, Q.Empty.WithIds(ids), ct);
Expand Down Expand Up @@ -264,7 +264,7 @@ private void GetAsset(ScriptExecutionContext context, DomainId appId, ClaimsPrin
var requestContext =
new Context(user, app).Clone(b => b
.WithoutTotal());
.WithNoTotal());
var assets = await assetQuery.QueryAsync(requestContext, null, Q.Empty.WithIds(ids), ct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,30 @@ protected override async Task<object> EnrichResultAsync(CommandContext context,
{
var payload = await base.EnrichResultAsync(context, result, ct);

if (payload is IAssetEntity asset)
if (payload is not IAssetEntity asset)
{
if (result.IsChanged && context.Command is UploadAssetCommand)
return payload;
}

if (result.IsChanged && context.Command is UploadAssetCommand)
{
var tempFile = context.ContextId.ToString();
try
{
var tempFile = context.ContextId.ToString();
try
{
await assetFileStore.CopyAsync(tempFile, asset.AppId.Id, asset.AssetId, asset.FileVersion, null, ct);
}
catch (AssetAlreadyExistsException) when (context.Command is not UpsertAsset)
await assetFileStore.CopyAsync(tempFile, asset.AppId.Id, asset.AssetId, asset.FileVersion, null, ct);
}
catch (AssetAlreadyExistsException)
{
if (context.Command is not UpsertAsset)
{
throw;
}
}
}

if (payload is not IEnrichedAssetEntity)
{
payload = await assetEnricher.EnrichAsync(asset, contextProvider.Context, ct);
}
if (payload is not IEnrichedAssetEntity)
{
payload = await assetEnricher.EnrichAsync(asset, contextProvider.Context, ct);
}

return payload;
Expand Down
Loading

0 comments on commit 2e95bf8

Please sign in to comment.