diff --git a/backend/extensions/Squidex.Extensions/Actions/Script/ScriptActionHandler.cs b/backend/extensions/Squidex.Extensions/Actions/Script/ScriptActionHandler.cs index 98b4a7c3c9..02056a3f2a 100644 --- a/backend/extensions/Squidex.Extensions/Actions/Script/ScriptActionHandler.cs +++ b/backend/extensions/Squidex.Extensions/Actions/Script/ScriptActionHandler.cs @@ -44,7 +44,7 @@ protected override async Task ExecuteJobAsync(ScriptJob job, AppName = job.Event.AppId.Name, }; - if (job.Event is EnrichedUserEventBase userEvent) + if (job.Event is EnrichedUserEventBase) { vars.User = AllPrinicpal(); } diff --git a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs index a9ed573175..b69662a93e 100644 --- a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs +++ b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs @@ -6,7 +6,6 @@ // ========================================================================== using System.Diagnostics.CodeAnalysis; -using System.Threading.Tasks.Dataflow; using MongoDB.Bson; using MongoDB.Driver; using Squidex.Infrastructure; @@ -28,104 +27,79 @@ public AddAppIdToEventStream(IMongoDatabase database) public async Task UpdateAsync( CancellationToken ct) { - const int SizeOfBatch = 1000; - const int SizeOfQueue = 20; - // Do not resolve in constructor, because most of the time it is not executed anyway. var collectionV1 = database.GetCollection("Events"); var collectionV2 = database.GetCollection("Events2"); - var batchBlock = new BatchBlock(SizeOfBatch, new GroupingDataflowBlockOptions + // Run batch first, because it is cheaper as it has less items. + var batchedCommits = collectionV1.Find(FindAll).ToAsyncEnumerable(ct).Batch(500, ct).Buffered(2, ct); + + var options = new ParallelOptions { - BoundedCapacity = SizeOfQueue * SizeOfBatch - }); + CancellationToken = ct, + // The tasks are mostly executed on database level, therefore we increase parallelism. + MaxDegreeOfParallelism = Environment.ProcessorCount * 2, + }; - var actionBlock = new ActionBlock(async batch => + await Parallel.ForEachAsync(batchedCommits, ct, async (batch, ct) => { - try + var writes = new List>(); + + foreach (var document in batch) { - var writes = new List>(); + var eventStream = document["EventStream"].AsString; - foreach (var document in batch) + if (TryGetAppId(document, out var appId)) { - var eventStream = document["EventStream"].AsString; - - if (TryGetAppId(document, out var appId)) + if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase)) { - if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase)) - { - var indexOfType = eventStream.IndexOf('-', StringComparison.Ordinal); - var indexOfId = indexOfType + 1; + var indexOfType = eventStream.IndexOf('-', StringComparison.Ordinal); + var indexOfId = indexOfType + 1; - var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); + var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); - if (indexOfOldId > 0) - { - indexOfId = indexOfOldId + 2; - } - - var domainType = eventStream[..indexOfType]; - var domainId = eventStream[indexOfId..]; + if (indexOfOldId > 0) + { + indexOfId = indexOfOldId + 2; + } - var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString(); - var newStreamName = $"{domainType}-{newDomainId}"; + var domainType = eventStream[..indexOfType]; + var domainId = eventStream[indexOfId..]; - document["EventStream"] = newStreamName; + var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString(); + var newStreamName = $"{domainType}-{newDomainId}"; - foreach (var @event in document["Events"].AsBsonArray) - { - var metadata = @event["Metadata"].AsBsonDocument; - - metadata["AggregateId"] = newDomainId; - } - } + document["EventStream"] = newStreamName; foreach (var @event in document["Events"].AsBsonArray) { var metadata = @event["Metadata"].AsBsonDocument; - metadata.Remove("AppId"); + metadata["AggregateId"] = newDomainId; } } - var filter = Builders.Filter.Eq("_id", document["_id"].AsString); - - writes.Add(new ReplaceOneModel(filter, document) + foreach (var @event in document["Events"].AsBsonArray) { - IsUpsert = true - }); + var metadata = @event["Metadata"].AsBsonDocument; + + metadata.Remove("AppId"); + } } - if (writes.Count > 0) + var filter = Builders.Filter.Eq("_id", document["_id"].AsString); + + writes.Add(new ReplaceOneModel(filter, document) { - await collectionV2.BulkWriteAsync(writes, BulkUnordered, ct); - } + IsUpsert = true + }); } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = Environment.ProcessorCount * 2, - MaxMessagesPerTask = 10, - BoundedCapacity = SizeOfQueue - }); - - batchBlock.BidirectionalLinkTo(actionBlock); - await foreach (var commit in collectionV1.Find(FindAll).ToAsyncEnumerable(ct: ct)) - { - if (!await batchBlock.SendAsync(commit, ct)) + if (writes.Count > 0) { - break; + await collectionV2.BulkWriteAsync(writes, BulkUnordered, ct); } - } - - batchBlock.Complete(); - - await actionBlock.Completion; + }); } private static bool TryGetAppId(BsonDocument document, [MaybeNullWhen(false)] out string appId) diff --git a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs index 108636d2c4..35d1aaa30c 100644 --- a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs +++ b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs @@ -5,7 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Threading.Tasks.Dataflow; using MongoDB.Bson; using MongoDB.Driver; using Squidex.Infrastructure; @@ -72,9 +71,6 @@ public async Task UpdateAsync( private static async Task RebuildAsync(IMongoDatabase database, Action? extraAction, string collectionNameV1, CancellationToken ct) { - const int SizeOfBatch = 1000; - const int SizeOfQueue = 10; - string collectionNameV2; collectionNameV2 = $"{collectionNameV1}2"; @@ -91,80 +87,46 @@ private static async Task RebuildAsync(IMongoDatabase database, Action(SizeOfBatch, new GroupingDataflowBlockOptions - { - BoundedCapacity = SizeOfQueue * SizeOfBatch - }); + // Run batch first, because it is cheaper as it has less items. + var batches = collectionV1.Find(FindAll).ToAsyncEnumerable(ct).Batch(500, ct).Buffered(2, ct); - var writeOptions = new BulkWriteOptions + await Parallel.ForEachAsync(batches, ct, async (batch, ct) => { - IsOrdered = false - }; + var writes = new List>(); - var actionBlock = new ActionBlock(async batch => - { - try + foreach (var document in batch) { - var writes = new List>(); - - foreach (var document in batch) - { - var appId = document["_ai"].AsString; + var appId = document["_ai"].AsString; - var documentIdOld = document["_id"].AsString; + var documentIdOld = document["_id"].AsString; - if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase)) - { - var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); + if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase)) + { + var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); - documentIdOld = documentIdOld[(index + 2)..]; - } + documentIdOld = documentIdOld[(index + 2)..]; + } - var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString(); + var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString(); - document["id"] = documentIdOld; - document["_id"] = documentIdNew; + document["id"] = documentIdOld; + document["_id"] = documentIdNew; - extraAction?.Invoke(document); + extraAction?.Invoke(document); - var filter = Filter.Eq("_id", documentIdNew); + var filter = Filter.Eq("_id", documentIdNew); - writes.Add(new ReplaceOneModel(filter, document) - { - IsUpsert = true - }); - } - - if (writes.Count > 0) + writes.Add(new ReplaceOneModel(filter, document) { - await collectionV2.BulkWriteAsync(writes, writeOptions, ct); - } + IsUpsert = true + }); } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = Environment.ProcessorCount * 2, - MaxMessagesPerTask = 1, - BoundedCapacity = SizeOfQueue - }); - - batchBlock.BidirectionalLinkTo(actionBlock); - await foreach (var document in collectionV1.Find(FindAll).ToAsyncEnumerable(ct: ct)) - { - if (!await batchBlock.SendAsync(document, ct)) + if (writes.Count > 0) { - break; + await collectionV2.BulkWriteAsync(writes, BulkUnordered, ct); } - } - - batchBlock.Complete(); - - await actionBlock.Completion; + }); } private static void ConvertParentId(BsonDocument document) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetHeaders.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetHeaders.cs index f3e36d6fa3..fc631f5273 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetHeaders.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetHeaders.cs @@ -11,12 +11,12 @@ public static class AssetHeaders { public const string NoEnrichment = "X-NoAssetEnrichment"; - public static bool ShouldSkipAssetEnrichment(this Context context) + public static bool NoAssetEnrichment(this Context context) { - return context.Headers.ContainsKey(NoEnrichment); + return context.AsBoolean(NoEnrichment); } - public static ICloneBuilder WithoutAssetEnrichment(this ICloneBuilder builder, bool value = true) + public static ICloneBuilder WithNoAssetEnrichment(this ICloneBuilder builder, bool value = true) { return builder.WithBoolean(NoEnrichment, value); } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs index 8084763b6d..4bdbb74f53 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs @@ -154,7 +154,7 @@ private static bool TryGetAssetRef(FluidValue input, out AssetRef assetRef) var requestContext = Context.Admin(app).Clone(b => b - .WithoutTotal()); + .WithNoTotal()); var asset = await assetQuery.FindAsync(requestContext, domainId); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsJintExtension.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsJintExtension.cs index b15373f283..3a52bec920 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsJintExtension.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsJintExtension.cs @@ -229,7 +229,7 @@ private void GetAssets(ScriptExecutionContext context, DomainId appId, ClaimsPri var requestContext = new Context(user, app).Clone(b => b - .WithoutTotal()); + .WithNoTotal()); var assets = await assetQuery.QueryAsync(requestContext, null, Q.Empty.WithIds(ids), ct); @@ -264,7 +264,7 @@ private void GetAsset(ScriptExecutionContext context, DomainId appId, ClaimsPrin var requestContext = new Context(user, app).Clone(b => b - .WithoutTotal()); + .WithNoTotal()); var assets = await assetQuery.QueryAsync(requestContext, null, Q.Empty.WithIds(ids), ct); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetCommandMiddleware.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetCommandMiddleware.cs index 5e943bdb9a..d15dc10c29 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetCommandMiddleware.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetCommandMiddleware.cs @@ -128,25 +128,30 @@ protected override async Task EnrichResultAsync(CommandContext context, { var payload = await base.EnrichResultAsync(context, result, ct); - if (payload is IAssetEntity asset) + if (payload is not IAssetEntity asset) { - if (result.IsChanged && context.Command is UploadAssetCommand) + return payload; + } + + if (result.IsChanged && context.Command is UploadAssetCommand) + { + var tempFile = context.ContextId.ToString(); + try { - var tempFile = context.ContextId.ToString(); - try - { - await assetFileStore.CopyAsync(tempFile, asset.AppId.Id, asset.AssetId, asset.FileVersion, null, ct); - } - catch (AssetAlreadyExistsException) when (context.Command is not UpsertAsset) + await assetFileStore.CopyAsync(tempFile, asset.AppId.Id, asset.AssetId, asset.FileVersion, null, ct); + } + catch (AssetAlreadyExistsException) + { + if (context.Command is not UpsertAsset) { throw; } } + } - if (payload is not IEnrichedAssetEntity) - { - payload = await assetEnricher.EnrichAsync(asset, contextProvider.Context, ct); - } + if (payload is not IEnrichedAssetEntity) + { + payload = await assetEnricher.EnrichAsync(asset, contextProvider.Context, ct); } return payload; diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs index dd4786cbbd..3615b8fe90 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs @@ -5,15 +5,11 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Collections.Concurrent; -using System.Threading.Tasks.Dataflow; -using Microsoft.Extensions.Logging; using Squidex.Domain.Apps.Entities.Assets.Commands; using Squidex.Domain.Apps.Entities.Contents; using Squidex.Infrastructure; using Squidex.Infrastructure.Commands; using Squidex.Infrastructure.Reflection; -using Squidex.Infrastructure.Tasks; using Squidex.Shared; #pragma warning disable SA1313 // Parameter names should begin with lower-case letter @@ -24,201 +20,140 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject; public sealed class AssetsBulkUpdateCommandMiddleware : ICommandMiddleware { private readonly IContextProvider contextProvider; - private readonly ILogger log; - - private sealed record BulkTaskCommand(BulkTask Task, DomainId Id, ICommand Command, - CancellationToken CancellationToken); private sealed record BulkTask( - ICommandBus Bus, + BulkUpdateJob BulkJob, + BulkUpdateAssets Bulk, int JobIndex, - BulkUpdateJob CommandJob, - BulkUpdateAssets Command, - ConcurrentBag Results, - CancellationToken Aborted); + AssetCommand? Command) + { + public BulkUpdateResultItem? Result { get; private set; } + + public BulkTask SetResult(Exception? exception = null) + { + var id = Command?.AssetId ?? BulkJob.Id; - public AssetsBulkUpdateCommandMiddleware(IContextProvider contextProvider, ILogger log) + Result = new BulkUpdateResultItem(id, JobIndex, exception); + return this; + } + + public static BulkTask Failed(BulkUpdateJob bulkJob, BulkUpdateAssets bulk, int jobIndex, Exception exception) + { + return new BulkTask(bulkJob, bulk, jobIndex, null).SetResult(exception); + } + } + + public AssetsBulkUpdateCommandMiddleware(IContextProvider contextProvider) { this.contextProvider = contextProvider; - - this.log = log; } public async Task HandleAsync(CommandContext context, NextDelegate next, CancellationToken ct) { - if (context.Command is BulkUpdateAssets bulkUpdates) - { - if (bulkUpdates.Jobs?.Length > 0) - { - var executionOptions = new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 2) - }; - - // Each job can create exactly one command. - var createCommandsBlock = new TransformBlock(task => - { - try - { - return CreateCommand(task); - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, executionOptions); - - // Execute the commands in batches - var executeCommandBlock = new ActionBlock(async command => - { - try - { - if (command != null) - { - await ExecuteCommandAsync(command); - } - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, executionOptions); - - createCommandsBlock.BidirectionalLinkTo(executeCommandBlock); - - contextProvider.Context.Change(b => b - .WithoutAssetEnrichment() - .WithoutCleanup() - .WithUnpublished(true) - .WithoutTotal()); - - var results = new ConcurrentBag(); - - for (var i = 0; i < bulkUpdates.Jobs.Length; i++) - { - var task = new BulkTask( - context.CommandBus, - i, - bulkUpdates.Jobs[i], - bulkUpdates, - results, - ct); - - if (!await createCommandsBlock.SendAsync(task, ct)) - { - break; - } - } - - createCommandsBlock.Complete(); - - // Wait for all commands to be executed. - await executeCommandBlock.Completion; - - context.Complete(new BulkUpdateResult(results)); - } - else - { - context.Complete(new BulkUpdateResult()); - } - } - else + if (context.Command is not BulkUpdateAssets bulkUpdates) { await next(context, ct); + return; } - } - private async Task ExecuteCommandAsync(BulkTaskCommand bulkCommand) - { - var (task, id, command, ct) = bulkCommand; - try + if (bulkUpdates.Jobs == null || bulkUpdates.Jobs.Length == 0) { - await task.Bus.PublishAsync(command, ct); - - task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex)); + context.Complete(new BulkUpdateResult()); + return; } - catch (Exception ex) + + contextProvider.Context.Change(b => b + .WithNoAssetEnrichment() + .WithNoCleanup() + .WithUnpublished(true) + .WithNoTotal()); + + var tasks = bulkUpdates.Jobs.Select((job, i) => CreateTask(job, bulkUpdates, i)).ToList(); + + // Group the items by id, so that we do not run jobs in parallel on the same entity. + var groupedTasks = tasks.GroupBy(x => x.BulkJob.Id).ToList(); + + await Parallel.ForEachAsync(groupedTasks, ct, async (group, ct) => { - log.LogError(ex, "Failed to execute asset bulk job with index {index} of type {type}.", - task.JobIndex, - task.CommandJob.Type); + foreach (var task in group) + { + await ExecuteCommandAsync(context.CommandBus, task, ct); + } + }); - task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); - } + context.Complete(new BulkUpdateResult(tasks.Select(x => x.Result).NotNull())); } - private BulkTaskCommand? CreateCommand(BulkTask task) + private static async Task ExecuteCommandAsync(ICommandBus commandBus, BulkTask task, + CancellationToken ct) { - var id = task.CommandJob.Id; - try + if (task.Result != null || task.Command == null) { - var command = CreateCommandCore(task); - - // Set the asset id here in case we have another way to resolve ids. - command.AssetId = id; + return; + } - return new BulkTaskCommand(task, id, command, task.Aborted); + try + { + await commandBus.PublishAsync(task.Command, ct); + task.SetResult(); } catch (Exception ex) { - log.LogError(ex, "Failed to execute asset bulk job with index {index} of type {type}.", - task.JobIndex, - task.CommandJob.Type); - - task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); - return null; + task.SetResult(ex); } } - private AssetCommand CreateCommandCore(BulkTask task) + private BulkTask CreateTask( + BulkUpdateJob bulkJob, + BulkUpdateAssets bulk, + int jobIndex) { - var job = task.CommandJob; - - switch (job.Type) + try { - case BulkUpdateAssetType.Annotate: - { - var command = new AnnotateAsset(); - - EnrichAndCheckPermission(task, command, PermissionIds.AppAssetsUpdate); - return command; - } - - case BulkUpdateAssetType.Move: - { - var command = new MoveAsset(); - - EnrichAndCheckPermission(task, command, PermissionIds.AppAssetsUpdate); - return command; - } + switch (bulkJob.Type) + { + case BulkUpdateAssetType.Annotate: + return CreateTask(bulkJob, bulk, jobIndex, + PermissionIds.AppAssetsUpdate); - case BulkUpdateAssetType.Delete: - { - var command = new DeleteAsset(); + case BulkUpdateAssetType.Move: + return CreateTask(bulkJob, bulk, jobIndex, + PermissionIds.AppAssetsUpdate); - EnrichAndCheckPermission(task, command, PermissionIds.AppAssetsDelete); - return command; - } + case BulkUpdateAssetType.Delete: + return CreateTask(bulkJob, bulk, jobIndex, + PermissionIds.AppAssetsDelete); - default: - ThrowHelper.NotSupportedException(); - return default!; + default: + return BulkTask.Failed(bulkJob, bulk, jobIndex, new NotSupportedException()); + } + } + catch (Exception ex) + { + return BulkTask.Failed(bulkJob, bulk, jobIndex, ex); } } - private void EnrichAndCheckPermission(BulkTask task, T command, string permissionId) where T : AssetCommand + private BulkTask CreateTask( + BulkUpdateJob bulkJob, + BulkUpdateAssets bulk, + int jobIndex, + string permissionId) where T : AssetCommand, new() { - SimpleMapper.Map(task.Command, command); - SimpleMapper.Map(task.CommandJob, command); - if (!contextProvider.Context.Allows(permissionId)) { - throw new DomainForbiddenException("Forbidden"); + return BulkTask.Failed(bulkJob, bulk, jobIndex, new DomainForbiddenException("Forbidden")); } - command.ExpectedVersion = task.Command.ExpectedVersion; + var command = new T(); + + SimpleMapper.Map(bulk, command); + SimpleMapper.Map(bulkJob, command); + + command.ExpectedVersion = bulk.ExpectedVersion; + command.AssetId = bulkJob.Id; + + return new BulkTask(bulkJob, bulk, jobIndex, command); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/AssetQueryParser.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/AssetQueryParser.cs index 20e4bbf90d..2e5b2d7f3d 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/AssetQueryParser.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/AssetQueryParser.cs @@ -54,11 +54,11 @@ public virtual async Task ParseAsync(Context context, Q q, q = q.WithQuery(query); - if (context.ShouldSkipTotal()) + if (context.NoTotal()) { q = q.WithoutTotal(); } - else if (context.ShouldSkipSlowTotal()) + else if (context.NoSlowTotal()) { q = q.WithoutSlowTotal(); } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/CalculateTokens.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/CalculateTokens.cs index ae15c4544f..8cee962190 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/CalculateTokens.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/CalculateTokens.cs @@ -25,7 +25,7 @@ public CalculateTokens(IUrlGenerator urlGenerator, IJsonSerializer serializer) public Task EnrichAsync(Context context, IEnumerable assets, CancellationToken ct) { - if (context.ShouldSkipAssetEnrichment()) + if (context.NoAssetEnrichment()) { return Task.CompletedTask; } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ConvertTags.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ConvertTags.cs index 243cfc026a..e224fff406 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ConvertTags.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ConvertTags.cs @@ -22,7 +22,7 @@ public ConvertTags(ITagService tagService) public async Task EnrichAsync(Context context, IEnumerable assets, CancellationToken ct) { - if (context.ShouldSkipAssetEnrichment()) + if (context.NoAssetEnrichment()) { return; } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichForCaching.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichForCaching.cs index 1369868205..394df0c682 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichForCaching.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichForCaching.cs @@ -54,6 +54,6 @@ public Task EnrichAsync(Context context, IEnumerable assets, private static bool ShouldEnrich(Context context) { - return !context.ShouldSkipCacheKeys(); + return !context.NoCacheKeys(); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichWithMetadataText.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichWithMetadataText.cs index 4e63c8c2d9..f9aa09cab6 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichWithMetadataText.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/EnrichWithMetadataText.cs @@ -22,7 +22,7 @@ public EnrichWithMetadataText(IEnumerable assetMetadataSou public Task EnrichAsync(Context context, IEnumerable assets, CancellationToken ct) { - if (context.ShouldSkipAssetEnrichment()) + if (context.NoAssetEnrichment()) { return Task.CompletedTask; } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ScriptAsset.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ScriptAsset.cs index 8ca83d37c3..46b175ce81 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ScriptAsset.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/Queries/Steps/ScriptAsset.cs @@ -100,7 +100,7 @@ private static bool ShouldEnrich(Context context) { // We need a special permission to disable scripting for security reasons, if the script removes sensible data. var shouldScript = - !context.ShouldSkipScripting() || + !context.NoScripting() || !context.UserPermissions.Allows(PermissionIds.ForApp(PermissionIds.AppNoScripting, context.App.Name)); return !context.IsFrontendClient && shouldScript; diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreProcessor.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreProcessor.cs index a63df4939d..acaa48957d 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreProcessor.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreProcessor.cs @@ -5,7 +5,7 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Threading.Tasks.Dataflow; +using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using NodaTime; using Squidex.Domain.Apps.Core.Apps; @@ -285,67 +285,55 @@ private async Task DownloadAsync(Run run, private async Task ReadEventsAsync(Run run, CancellationToken ct) { - const int BatchSize = 100; + // Run batch first, because it is cheaper as it has less items. + var events = HandleEventsAsync(run, ct).Batch(100, ct).Buffered(2, ct); var handled = 0; - var writeBlock = new ActionBlock<(string, Envelope)[]>(async batch => + await Parallel.ForEachAsync(events, new ParallelOptions { - try - { - var commits = new List(batch.Length); - - foreach (var (stream, @event) in batch) - { - var offset = run.StreamMapper.GetStreamOffset(stream); - - commits.Add(EventCommit.Create(stream, offset, @event, eventFormatter)); - } + CancellationToken = ct, + // The event store cannot insert events in parallel. + MaxDegreeOfParallelism = 1, + }, + async (batch, ct) => + { + var commits = + batch.Select(item => + EventCommit.Create( + item.Stream, + item.Offset, + item.Event, + eventFormatter)); - await eventStore.AppendUnsafeAsync(commits, ct); + await eventStore.AppendUnsafeAsync(commits, ct); - handled += commits.Count; + // Just in case we use parallel inserts later. + Interlocked.Increment(ref handled); - await LogAsync(run, $"Reading {run.Reader.ReadEvents}/{handled} events and {run.Reader.ReadAttachments} attachments completed.", true); - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, - BoundedCapacity = 2 - }); - - var batchBlock = new BatchBlock<(string, Envelope)>(BatchSize, new GroupingDataflowBlockOptions - { - BoundedCapacity = BatchSize * 2 + await LogAsync(run, $"Reading {run.Reader.ReadEvents}/{handled} events and {run.Reader.ReadAttachments} attachments completed.", true); }); + } - batchBlock.BidirectionalLinkTo(writeBlock); + private async IAsyncEnumerable<(string Stream, long Offset, Envelope Event)> HandleEventsAsync(Run run, + [EnumeratorCancellation] CancellationToken ct) + { + var @events = run.Reader.ReadEventsAsync(eventStreamNames, eventFormatter, ct); - await foreach (var job in run.Reader.ReadEventsAsync(eventStreamNames, eventFormatter, ct)) + await foreach (var (stream, @event) in events.WithCancellation(ct)) { - var newStream = await HandleEventAsync(run, job.Stream, job.Event, ct); + var (newStream, handled) = await HandleEventAsync(run, stream, @event, ct); - if (newStream != null) + if (handled) { - if (!await batchBlock.SendAsync((newStream, job.Event), ct)) - { - break; - } + var offset = run.StreamMapper.GetStreamOffset(newStream); + + yield return (newStream, offset, @event); } } - - batchBlock.Complete(); - - await writeBlock.Completion; } - private async Task HandleEventAsync(Run run, string stream, Envelope @event, + private async Task<(string StreamName, bool Handled)> HandleEventAsync(Run run, string stream, Envelope @event, CancellationToken ct = default) { if (@event.Payload is AppCreated appCreated) @@ -390,11 +378,11 @@ private async Task ReadEventsAsync(Run run, { if (!await handler.RestoreEventAsync(@event, run.Context, ct)) { - return null; + return (newStream, false); } } - return newStream; + return (newStream, true); } private async Task CreateContextAsync(Run run, DomainId previousAppId, diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentExtensions.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentExtensions.cs new file mode 100644 index 0000000000..6e58836aef --- /dev/null +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentExtensions.cs @@ -0,0 +1,18 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Squidex.Domain.Apps.Core.Contents; + +namespace Squidex.Domain.Apps.Entities.Contents; + +public static class ContentExtensions +{ + public static Status EditingStatus(this IContentEntity content) + { + return content.NewStatus ?? content.Status; + } +} diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentHeaders.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentHeaders.cs index 70ef174222..8d25f778fa 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentHeaders.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentHeaders.cs @@ -8,6 +8,7 @@ using Squidex.Domain.Apps.Core.Contents; using Squidex.Infrastructure; using Squidex.Infrastructure.Caching; +using static OpenIddict.Abstractions.OpenIddictConstants; #pragma warning disable IDE0060 // Remove unused parameter @@ -15,34 +16,32 @@ namespace Squidex.Domain.Apps.Entities.Contents; public static class ContentHeaders { - private static readonly char[] Separators = { ',', ';' }; - - public const string Fields = "X-Fields"; - public const string Flatten = "X-Flatten"; - public const string Languages = "X-Languages"; - public const string NoCleanup = "X-NoCleanup"; - public const string NoEnrichment = "X-NoEnrichment"; - public const string NoResolveLanguages = "X-NoResolveLanguages"; - public const string ResolveFlow = "X-ResolveFlow"; - public const string ResolveUrls = "X-ResolveUrls"; - public const string Unpublished = "X-Unpublished"; + public const string KeyFields = "X-Fields"; + public const string KeyFlatten = "X-Flatten"; + public const string KeyLanguages = "X-Languages"; + public const string KeyNoCleanup = "X-NoCleanup"; + public const string KeyNoEnrichment = "X-NoEnrichment"; + public const string KeyNoResolveLanguages = "X-NoResolveLanguages"; + public const string KeyResolveFlow = "X-ResolveFlow"; + public const string KeyResolveUrls = "X-ResolveUrls"; + public const string KeyUnpublished = "X-Unpublished"; public static void AddCacheHeaders(this Context context, IRequestCache cache) { - cache.AddHeader(Fields); - cache.AddHeader(Flatten); - cache.AddHeader(Languages); - cache.AddHeader(NoCleanup); - cache.AddHeader(NoEnrichment); - cache.AddHeader(NoResolveLanguages); - cache.AddHeader(ResolveFlow); - cache.AddHeader(ResolveUrls); - cache.AddHeader(Unpublished); + cache.AddHeader(KeyFields); + cache.AddHeader(KeyFlatten); + cache.AddHeader(KeyLanguages); + cache.AddHeader(KeyNoCleanup); + cache.AddHeader(KeyNoEnrichment); + cache.AddHeader(KeyNoResolveLanguages); + cache.AddHeader(KeyResolveFlow); + cache.AddHeader(KeyResolveUrls); + cache.AddHeader(KeyUnpublished); } - public static Status EditingStatus(this IContentEntity content) + public static SearchScope Scope(this Context context) { - return content.NewStatus ?? content.Status; + return context.Unpublished() || context.IsFrontendClient ? SearchScope.All : SearchScope.Published; } public static bool IsPublished(this IContentEntity content) @@ -50,113 +49,93 @@ public static bool IsPublished(this IContentEntity content) return content.EditingStatus() == Status.Published; } - public static SearchScope Scope(this Context context) - { - return context.ShouldProvideUnpublished() || context.IsFrontendClient ? SearchScope.All : SearchScope.Published; - } - - public static bool ShouldSkipCleanup(this Context context) + public static bool NoCleanup(this Context context) { - return context.Headers.ContainsKey(NoCleanup); + return context.AsBoolean(KeyNoCleanup); } - public static ICloneBuilder WithoutCleanup(this ICloneBuilder builder, bool value = true) + public static ICloneBuilder WithNoCleanup(this ICloneBuilder builder, bool value = true) { - return builder.WithBoolean(NoCleanup, value); + return builder.WithBoolean(KeyNoCleanup, value); } - public static bool ShouldSkipContentEnrichment(this Context context) + public static bool NoEnrichment(this Context context) { - return context.Headers.ContainsKey(NoEnrichment); + return context.AsBoolean(KeyNoEnrichment); } - public static ICloneBuilder WithoutContentEnrichment(this ICloneBuilder builder, bool value = true) + public static ICloneBuilder WithNoEnrichment(this ICloneBuilder builder, bool value = true) { - return builder.WithBoolean(NoEnrichment, value); + return builder.WithBoolean(KeyNoEnrichment, value); } - public static bool ShouldProvideUnpublished(this Context context) + public static bool Unpublished(this Context context) { - return context.Headers.ContainsKey(Unpublished); + return context.AsBoolean(KeyUnpublished); } public static ICloneBuilder WithUnpublished(this ICloneBuilder builder, bool value = true) { - return builder.WithBoolean(Unpublished, value); + return builder.WithBoolean(KeyUnpublished, value); } - public static bool ShouldFlatten(this Context context) + public static bool Flatten(this Context context) { - return context.Headers.ContainsKey(Flatten); + return context.AsBoolean(KeyFlatten); } public static ICloneBuilder WithFlatten(this ICloneBuilder builder, bool value = true) { - return builder.WithBoolean(Flatten, value); + return builder.WithBoolean(KeyFlatten, value); } - public static bool ShouldResolveFlow(this Context context) + public static bool ResolveFlow(this Context context) { - return context.Headers.ContainsKey(ResolveFlow); + return context.AsBoolean(KeyResolveFlow); } public static ICloneBuilder WithResolveFlow(this ICloneBuilder builder, bool value = true) { - return builder.WithBoolean(ResolveFlow, value); + return builder.WithBoolean(KeyResolveFlow, value); } - public static bool ShouldResolveLanguages(this Context context) + public static bool NoResolveLanguages(this Context context) { - return !context.Headers.ContainsKey(NoResolveLanguages); + return context.AsBoolean(KeyNoResolveLanguages); } - public static ICloneBuilder WithoutResolveLanguages(this ICloneBuilder builder, bool value = true) + public static ICloneBuilder WithNoResolveLanguages(this ICloneBuilder builder, bool value = true) { - return builder.WithBoolean(NoResolveLanguages, value); + return builder.WithBoolean(KeyNoResolveLanguages, value); } - public static IEnumerable AssetUrls(this Context context) + public static IEnumerable ResolveUrls(this Context context) { - if (context.Headers.TryGetValue(ResolveUrls, out var value)) - { - return value.Split(Separators, StringSplitOptions.RemoveEmptyEntries).Select(x => x.Trim()).ToHashSet(); - } - - return Enumerable.Empty(); + return context.AsStrings(KeyResolveUrls); } - public static ICloneBuilder WithAssetUrlsToResolve(this ICloneBuilder builder, IEnumerable? fieldNames) + public static ICloneBuilder WithResolveUrls(this ICloneBuilder builder, IEnumerable? fieldNames) { - return builder.WithStrings(ResolveUrls, fieldNames); + return builder.WithStrings(KeyResolveUrls, fieldNames); } - public static HashSet? FieldsList(this Context context) + public static HashSet? Fields(this Context context) { - if (context.Headers.TryGetValue(Fields, out var value)) - { - return value.Split(Separators, StringSplitOptions.RemoveEmptyEntries).ToHashSet(); - } - - return null; + return context.AsStrings(KeyFields).ToHashSet(); } - public static ICloneBuilder WithFields(this ICloneBuilder builder, IEnumerable fields) + public static ICloneBuilder WithFields(this ICloneBuilder builder, IEnumerable? fields) { - return builder.WithStrings(Fields, fields); + return builder.WithStrings(KeyFields, fields); } - public static HashSet LanguagesList(this Context context) + public static HashSet Languages(this Context context) { - if (context.Headers.TryGetValue(Languages, out var value)) - { - return value.Split(Separators, StringSplitOptions.RemoveEmptyEntries).Select(x => (Language)x).ToHashSet(); - } - - return new HashSet(); + return context.AsStrings(KeyLanguages).Select(Language.GetLanguage).ToHashSet(); } public static ICloneBuilder WithLanguages(this ICloneBuilder builder, IEnumerable languages) { - return builder.WithStrings(Languages, languages); + return builder.WithStrings(KeyLanguages, languages); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentsSearchSource.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentsSearchSource.cs index 5bc5869fba..ab03703538 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentsSearchSource.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/ContentsSearchSource.cs @@ -62,6 +62,7 @@ public async Task SearchAsync(string query, Context context, return result; } + // Resolve the app ID once, because we loop over the results. var appId = context.App.NamedId(); var contents = await contentQuery.QueryAsync(context, Q.Empty.WithIds(ids).WithoutTotal(), ct); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs index be5df948e0..f1a1aad743 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs @@ -439,7 +439,11 @@ private async Task DeleteCore(DeleteContent c, ContentOperation operation, private void Create(CreateContent command, Status status) { - Raise(command, new ContentCreated { Status = status }); + var @event = SimpleMapper.Map(command, new ContentCreated()); + + @event.Status = status; + + RaiseEvent(Envelope.Create(@event)); } private void Update(ContentCommand command, ContentData data) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs index a3eee076e7..5ca2938034 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs @@ -5,16 +5,11 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Collections.Concurrent; -using System.Threading.Tasks.Dataflow; -using Microsoft.Extensions.Logging; -using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Entities.Contents.Commands; using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; using Squidex.Infrastructure.Commands; using Squidex.Infrastructure.Reflection; -using Squidex.Infrastructure.Tasks; using Squidex.Infrastructure.Translations; using Squidex.Shared; @@ -27,299 +22,239 @@ public sealed class ContentsBulkUpdateCommandMiddleware : ICommandMiddleware { private readonly IContentQueryService contentQuery; private readonly IContextProvider contextProvider; - private readonly ILogger log; - - private sealed record BulkTaskCommand(BulkTask Task, DomainId Id, ICommand Command, - CancellationToken CancellationToken); private sealed record BulkTask( - ICommandBus Bus, - NamedId SchemaId, + BulkUpdateJob BulkJob, + BulkUpdateContents Bulk, int JobIndex, - BulkUpdateJob CommandJob, - BulkUpdateContents Command, - ConcurrentBag Results, - CancellationToken Aborted); + ContentCommand? Command) + { + public BulkUpdateResultItem? Result { get; private set; } + + public BulkTask SetResult(Exception? exception = null) + { + var id = Command?.ContentId ?? BulkJob.Id; + + Result = new BulkUpdateResultItem(id, JobIndex, exception); + return this; + } + + public static BulkTask Failed(BulkUpdateJob bulkJob, BulkUpdateContents bulk, int jobIndex, Exception exception) + { + return new BulkTask(bulkJob, bulk, jobIndex, null).SetResult(exception); + } + } public ContentsBulkUpdateCommandMiddleware( IContentQueryService contentQuery, - IContextProvider contextProvider, - ILogger log) + IContextProvider contextProvider) { this.contentQuery = contentQuery; this.contextProvider = contextProvider; - - this.log = log; } public async Task HandleAsync(CommandContext context, NextDelegate next, CancellationToken ct) { - if (context.Command is BulkUpdateContents bulkUpdates) + if (context.Command is not BulkUpdateContents bulkUpdates) { - if (bulkUpdates.Jobs?.Length > 0) - { - var executionOptions = new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = Math.Max(1, Environment.ProcessorCount / 2) - }; - - // Each job can create one or more commands. - var createCommandsBlock = new TransformManyBlock(async task => - { - try - { - return await CreateCommandsAsync(task); - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, executionOptions); - - // Execute the commands in batches. - var executeCommandBlock = new ActionBlock(async command => - { - try - { - await ExecuteCommandAsync(command); - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, executionOptions); - - createCommandsBlock.BidirectionalLinkTo(executeCommandBlock); - - contextProvider.Context.Change(b => b - .WithoutContentEnrichment() - .WithoutCleanup() - .WithUnpublished(true) - .WithoutTotal()); - - var results = new ConcurrentBag(); - - for (var i = 0; i < bulkUpdates.Jobs.Length; i++) - { - var task = new BulkTask( - context.CommandBus, - bulkUpdates.SchemaId, - i, - bulkUpdates.Jobs[i], - bulkUpdates, - results, - ct); - - if (!await createCommandsBlock.SendAsync(task, ct)) - { - break; - } - } - - createCommandsBlock.Complete(); - - // Wait for all commands to be executed. - await executeCommandBlock.Completion; - - context.Complete(new BulkUpdateResult(results)); - } - else - { - context.Complete(new BulkUpdateResult()); - } + await next(context, ct); + return; } - else + + if (bulkUpdates.Jobs == null || bulkUpdates.Jobs.Length == 0) { - await next(context, ct); + context.Complete(new BulkUpdateResult()); + return; } + + contextProvider.Context.Change(b => b + .WithNoEnrichment() + .WithNoCleanup() + .WithUnpublished(true) + .WithNoTotal()); + + var tasks = await bulkUpdates.Jobs.SelectManyAsync((job, i, ct) => CreateTasksAsync(job, bulkUpdates, i, ct), ct); + + // Group the items by id, so that we do not run jobs in parallel on the same entity. + var groupedTasks = tasks.GroupBy(x => x.Command?.ContentId).ToList(); + + await Parallel.ForEachAsync(groupedTasks, ct, async (group, ct) => + { + foreach (var task in group) + { + await ExecuteCommandAsync(context.CommandBus, task, ct); + } + }); + + context.Complete(new BulkUpdateResult(tasks.Select(x => x.Result).NotNull())); } - private async Task ExecuteCommandAsync(BulkTaskCommand bulkCommand) + private static async Task ExecuteCommandAsync(ICommandBus commandBus, BulkTask task, + CancellationToken ct) { - var (task, id, command, ct) = bulkCommand; + if (task.Result != null || task.Command == null) + { + return; + } try { - await task.Bus.PublishAsync(command, ct); - - task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex)); + await commandBus.PublishAsync(task.Command, ct); + task.SetResult(); } catch (Exception ex) { - log.LogError(ex, "Failed to execute content bulk job with index {index} of type {type}.", - task.JobIndex, - task.CommandJob.Type); - - task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); + task.SetResult(ex); } } - private async Task> CreateCommandsAsync(BulkTask task) + private async Task> CreateTasksAsync( + BulkUpdateJob bulkJob, + BulkUpdateContents bulk, + int jobIndex, + CancellationToken ct) { // The task parallel pipeline does not allow async-enumerable. - var commands = new List(); + var tasks = new List(); try { - // Check whether another schema is defined for the current job and override the schema id if necessary. - var overridenSchema = task.CommandJob.Schema; + var schemaId = bulk.SchemaId; - if (!string.IsNullOrWhiteSpace(overridenSchema)) + // Check whether another schema is defined for the current job and override the schema id if necessary. + if (!string.IsNullOrWhiteSpace(bulkJob.Schema)) { - var schema = await contentQuery.GetSchemaOrThrowAsync(contextProvider.Context, overridenSchema, task.Aborted); + var schema = await contentQuery.GetSchemaOrThrowAsync(contextProvider.Context, bulkJob.Schema, ct); - // Task is immutable, so we have to create a copy. - task = task with { SchemaId = schema.NamedId() }; + schemaId = schema.NamedId(); } // The bulk command can be invoke in a schema controller or without a schema controller, therefore the name might be null. - if (task.SchemaId == null || task.SchemaId.Id == default) + if (schemaId == null || schemaId.Id == default) { - throw new DomainObjectNotFoundException("undefined"); + tasks.Add(BulkTask.Failed(bulkJob, bulk, jobIndex, new DomainObjectNotFoundException("undefined"))); + return tasks; } - var resolvedIds = await FindIdAsync(task, task.SchemaId.Name); + var resolvedIds = await FindIdAsync(schemaId, bulkJob, ct); if (resolvedIds.Length == 0) { - throw new DomainObjectNotFoundException("undefined"); + tasks.Add(BulkTask.Failed(bulkJob, bulk, jobIndex, new DomainObjectNotFoundException("undefined"))); + return tasks; } foreach (var id in resolvedIds) { - try - { - var command = CreateCommand(task); - - command.ContentId = id; - commands.Add(new BulkTaskCommand(task, id, command, task.Aborted)); - } - catch (Exception ex) - { - log.LogError(ex, "Failed to create content bulk job with index {index} of type {type}.", - task.JobIndex, - task.CommandJob.Type); - - task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); - } + tasks.Add(CreateTask(id, schemaId, bulkJob, bulk, jobIndex)); } } catch (Exception ex) { - task.Results.Add(new BulkUpdateResultItem(task.CommandJob.Id, task.JobIndex, ex)); + tasks.Add(BulkTask.Failed(bulkJob, bulk, jobIndex, ex)); } - return commands; + return tasks; } - private ContentCommand CreateCommand(BulkTask task) + private BulkTask CreateTask( + DomainId id, + NamedId schemaId, + BulkUpdateJob bulkJob, + BulkUpdateContents bulk, + int jobIndex) { - var job = task.CommandJob; - - switch (job.Type) + try { - case BulkUpdateContentType.Create: - { - var command = new CreateContent(); - - EnrichAndCheckPermission(task, command, PermissionIds.AppContentsCreate); - return command; - } - - case BulkUpdateContentType.Update: - { - var command = new UpdateContent(); - - EnrichAndCheckPermission(task, command, PermissionIds.AppContentsUpdateOwn); - return command; - } - - case BulkUpdateContentType.Upsert: - { - var command = new UpsertContent(); - - EnrichAndCheckPermission(task, command, PermissionIds.AppContentsUpsert); - return command; - } - - case BulkUpdateContentType.Patch: - { - var command = new PatchContent(); - - EnrichAndCheckPermission(task, command, PermissionIds.AppContentsUpdateOwn); - return command; - } + switch (bulkJob.Type) + { + case BulkUpdateContentType.Create: + return CreateTask(id, schemaId, bulkJob, bulk, jobIndex, + PermissionIds.AppContentsCreate); - case BulkUpdateContentType.Validate: - { - var command = new ValidateContent(); + case BulkUpdateContentType.Update: + return CreateTask(id, schemaId, bulkJob, bulk, jobIndex, + PermissionIds.AppContentsUpdateOwn); - EnrichAndCheckPermission(task, command, PermissionIds.AppContentsReadOwn); - return command; - } + case BulkUpdateContentType.Upsert: + return CreateTask(id, schemaId, bulkJob, bulk, jobIndex, + PermissionIds.AppContentsUpsert); - case BulkUpdateContentType.ChangeStatus: - { - var command = new ChangeContentStatus { Status = job.Status ?? Status.Draft }; + case BulkUpdateContentType.Patch: + return CreateTask(id, schemaId, bulkJob, bulk, jobIndex, + PermissionIds.AppContentsUpdateOwn); - EnrichAndCheckPermission(task, command, PermissionIds.AppContentsChangeStatusOwn); - return command; - } + case BulkUpdateContentType.Validate: + return CreateTask(id, schemaId, bulkJob, bulk, jobIndex, + PermissionIds.AppContentsReadOwn); - case BulkUpdateContentType.Delete: - { - var command = new DeleteContent(); + case BulkUpdateContentType.ChangeStatus: + return CreateTask(id, schemaId, bulkJob, bulk, jobIndex, + PermissionIds.AppContentsChangeStatusOwn); - EnrichAndCheckPermission(task, command, PermissionIds.AppContentsDeleteOwn); - return command; - } + case BulkUpdateContentType.Delete: + return CreateTask(id, schemaId, bulkJob, bulk, jobIndex, + PermissionIds.AppContentsDeleteOwn); - default: - ThrowHelper.NotSupportedException(); - return default!; + default: + return BulkTask.Failed(bulkJob, bulk, jobIndex, new NotSupportedException()); + } + } + catch (Exception ex) + { + return BulkTask.Failed(bulkJob, bulk, jobIndex, ex); } } - private void EnrichAndCheckPermission(BulkTask task, T command, string permissionId) where T : ContentCommand + private BulkTask CreateTask( + DomainId id, + NamedId schemaId, + BulkUpdateJob bulkJob, + BulkUpdateContents bulk, + int jobIndex, + string permissionId) where T : ContentCommand, new() { - SimpleMapper.Map(task.Command, command); - SimpleMapper.Map(task.CommandJob, command); - - if (!contextProvider.Context.Allows(permissionId, command.SchemaId.Name)) + if (!contextProvider.Context.Allows(permissionId, schemaId.Name)) { - throw new DomainForbiddenException("Forbidden"); + return BulkTask.Failed(bulkJob, bulk, jobIndex, new DomainForbiddenException("Forbidden")); } - command.SchemaId = task.SchemaId; - command.ExpectedVersion = task.Command.ExpectedVersion; + var command = new T(); + + SimpleMapper.Map(bulk, command); + SimpleMapper.Map(bulkJob, command); + + command.ContentId = id; + command.SchemaId = schemaId; + + return new BulkTask(bulkJob, bulk, jobIndex, command); } - private async Task FindIdAsync(BulkTask task, string schema) + private async Task FindIdAsync(NamedId schemaId, BulkUpdateJob bulkJob, + CancellationToken ct) { - var id = task.CommandJob.Id; + var id = bulkJob.Id; if (id != null) { return new[] { id.Value }; } - if (task.CommandJob.Query != null) + if (bulkJob.Query != null) { - task.CommandJob.Query.Take = task.CommandJob.ExpectedCount; + bulkJob.Query.Take = bulkJob.ExpectedCount; - var existingQuery = Q.Empty.WithJsonQuery(task.CommandJob.Query); - var existingResult = await contentQuery.QueryAsync(contextProvider.Context, schema, existingQuery, task.Aborted); + var existingQuery = Q.Empty.WithJsonQuery(bulkJob.Query); + var existingResult = await contentQuery.QueryAsync(contextProvider.Context, schemaId.Id.ToString(), existingQuery, ct); - if (existingResult.Total > task.CommandJob.ExpectedCount) + if (existingResult.Total > bulkJob.ExpectedCount) { throw new DomainException(T.Get("contents.bulkInsertQueryNotUnique")); } // Upsert means that we either update the content if we find it or that we create a new one. // Therefore we create a new ID if we cannot find the ID for the query. - if (existingResult.Count == 0 && task.CommandJob.Type == BulkUpdateContentType.Upsert) + if (existingResult.Count == 0 && bulkJob.Type == BulkUpdateContentType.Upsert) { return new[] { DomainId.NewGuid() }; } @@ -327,7 +262,7 @@ private async Task FindIdAsync(BulkTask task, string schema) return existingResult.Select(x => x.Id).ToArray(); } - if (task.CommandJob.Type is BulkUpdateContentType.Create or BulkUpdateContentType.Upsert) + if (bulkJob.Type is BulkUpdateContentType.Create or BulkUpdateContentType.Upsert) { return new[] { DomainId.NewGuid() }; } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/GraphQLExecutionContext.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/GraphQLExecutionContext.cs index 293e221530..faf3609b24 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/GraphQLExecutionContext.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/GraphQLExecutionContext.cs @@ -17,10 +17,13 @@ namespace Squidex.Domain.Apps.Entities.Contents.GraphQL; public sealed class GraphQLExecutionContext : QueryExecutionContext { + private const int MaxBatchSize = 5000; + private const int MinBatchSize = 1; private static readonly EmptyDataLoaderResult EmptyAssets = new EmptyDataLoaderResult(); private static readonly EmptyDataLoaderResult EmptyContents = new EmptyDataLoaderResult(); private readonly IDataLoaderContextAccessor dataLoaders; private readonly GraphQLOptions options; + private readonly int batchSize; public override Context Context { get; } @@ -38,11 +41,22 @@ public GraphQLExecutionContext( this.dataLoaders = dataLoaders; Context = context.Clone(b => b - .WithoutCleanup() - .WithoutContentEnrichment() - .WithoutAssetEnrichment()); + .WithNoCleanup() + .WithNoEnrichment() + .WithNoAssetEnrichment()); this.options = options.Value; + + batchSize = Context.BatchSize(); + + if (batchSize == 0) + { + batchSize = options.Value.DataLoaderBatchSize; + } + else + { + batchSize = Math.Max(MinBatchSize, Math.Min(MaxBatchSize, batchSize)); + } } public async ValueTask FindUserAsync(RefToken refToken, @@ -121,7 +135,7 @@ private IDataLoader, IEnrichedAssetEntity> GetAssetsLoader var result = await QueryAssetsByIdsAsync(batch, ct); return result.ToDictionary(x => x.Id); - }, maxBatchSize: options.DataLoaderBatchSize); + }, maxBatchSize: batchSize); } private IDataLoader, IEnrichedContentEntity> GetContentsLoader() @@ -132,7 +146,7 @@ private IDataLoader, IEnrichedContentEntity> GetContentsLo var result = await QueryContentsByIdsAsync(batch, null, ct); return result.ToDictionary(x => x.Id); - }, maxBatchSize: options.DataLoaderBatchSize); + }, maxBatchSize: batchSize); } private IDataLoader<(DomainId Id, HashSet Fields), IEnrichedContentEntity> GetContentsLoaderWithFields() @@ -145,7 +159,7 @@ private IDataLoader, IEnrichedContentEntity> GetContentsLo var result = await QueryContentsByIdsAsync(batch.Select(x => x.Id), fields, ct); return result.ToDictionary(x => (x.Id, fields)); - }, maxBatchSize: options.DataLoaderBatchSize); + }, maxBatchSize: batchSize); } private IDataLoader GetUserLoader() diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/FieldVisitor.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/FieldVisitor.cs index 42aea39af9..f55452a78d 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/FieldVisitor.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/FieldVisitor.cs @@ -322,24 +322,4 @@ private static IFieldResolver CreateValueResolver(ValueResolver valueResol return null; }); } - - private static IFieldResolver CreateAsyncValueResolver(AsyncValueResolver valueResolver) - { - return Resolvers.Async, object?>(async (source, fieldContext, context) => - { - var key = fieldContext.FieldDefinition.SourceName(); - - if (source.TryGetValue(key, out var value)) - { - if (value == JsonValue.Null) - { - return null; - } - - return await valueResolver(value, fieldContext, context); - } - - return null; - }); - } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryParser.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryParser.cs index cf70a1af58..cd81c92519 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryParser.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryParser.cs @@ -60,13 +60,13 @@ public virtual async Task ParseAsync(Context context, Q q, ISchemaEntity? sch WithPaging(query, q); q = q.WithQuery(query); - q = q.WithFields(context.FieldsList()); + q = q.WithFields(context.Fields()); - if (context.ShouldSkipTotal()) + if (context.NoTotal()) { q = q.WithoutTotal(); } - else if (context.ShouldSkipSlowTotal()) + else if (context.NoSlowTotal()) { q = q.WithoutSlowTotal(); } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryService.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryService.cs index b39713a435..1793f2a739 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryService.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/ContentQueryService.cs @@ -52,9 +52,9 @@ public async IAsyncEnumerable StreamAsync(Context contex // Skip all expensive operations when we call the enricher. context = context.Clone(b => b - .WithoutScripting() - .WithoutCacheKeys() - .WithoutContentEnrichment()); + .WithNoScripting() + .WithNoCacheKeys() + .WithNoEnrichment()); // We run this query without a timeout because it is meant for long running background operations. var contents = contentRepository.StreamAll(context.App.Id, HashSet.Of(schema.Id), context.Scope(), ct); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ConvertData.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ConvertData.cs index 08fe6cc768..7327fcafe8 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ConvertData.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ConvertData.cs @@ -60,7 +60,7 @@ public async Task EnrichAsync(Context context, IEnumerable conten private async Task CleanReferencesAsync(Context context, IEnumerable contents, ProvideSchema schemas, CancellationToken ct) { - if (context.ShouldSkipCleanup()) + if (context.NoCleanup()) { return null; } @@ -136,14 +136,14 @@ private ContentConverter GenerateConverter(Context context, ResolvedComponents c converter.Add( new ResolveLanguages( context.App.Languages, - context.LanguagesList().ToArray()) + context.Languages().ToArray()) { - ResolveFallback = !context.IsFrontendClient && context.ShouldResolveLanguages() + ResolveFallback = !context.IsFrontendClient && !context.NoResolveLanguages() }); if (!context.IsFrontendClient) { - var assetUrls = context.AssetUrls().ToList(); + var assetUrls = context.ResolveUrls().ToList(); if (assetUrls.Count > 0) { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichForCaching.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichForCaching.cs index 22c084d62a..0ffc5c6878 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichForCaching.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichForCaching.cs @@ -61,6 +61,6 @@ public async Task EnrichAsync(Context context, IEnumerable conten private static bool ShouldEnrich(Context context) { - return !context.ShouldSkipCacheKeys(); + return !context.NoCacheKeys(); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichWithWorkflows.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichWithWorkflows.cs index 1403e1d47b..a189cbb46e 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichWithWorkflows.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/EnrichWithWorkflows.cs @@ -105,6 +105,6 @@ private async Task GetColorAsync(IContentEntity content, Status status, private static bool ShouldEnrichWithStatuses(Context context) { - return context.IsFrontendClient || context.ShouldResolveFlow(); + return context.IsFrontendClient || context.ResolveFlow(); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveAssets.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveAssets.cs index 6b3842e331..a170c066a3 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveAssets.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveAssets.cs @@ -131,8 +131,8 @@ private async Task> GetAssetsAsync(Conte } var queryContext = context.Clone(b => b - .WithoutAssetEnrichment(true) - .WithoutTotal()); + .WithNoAssetEnrichment(true) + .WithNoTotal()); var assets = await assetQuery.QueryAsync(queryContext, null, Q.Empty.WithIds(ids), ct); @@ -149,6 +149,6 @@ private static void AddAssetIds(HashSet ids, ISchemaEntity schema, Res private static bool ShouldEnrich(Context context) { - return context.IsFrontendClient && !context.ShouldSkipContentEnrichment(); + return context.IsFrontendClient && !context.NoEnrichment(); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveReferences.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveReferences.cs index 3830004d8e..de7dfa2f8d 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveReferences.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ResolveReferences.cs @@ -159,9 +159,11 @@ private async Task> GetReferencesAsync return EmptyContents; } + // Ensure that we reset the fields to not use the field selection from the parent query. var queryContext = context.Clone(b => b - .WithoutContentEnrichment(true) - .WithoutTotal()); + .WithFields(null) + .WithNoEnrichment(true) + .WithNoTotal()); var references = await ContentQuery.QueryAsync(queryContext, Q.Empty.WithIds(ids).WithoutTotal(), ct); @@ -170,6 +172,6 @@ private async Task> GetReferencesAsync private static bool ShouldEnrich(Context context) { - return context.IsFrontendClient && !context.ShouldSkipContentEnrichment(); + return context.IsFrontendClient && !context.NoEnrichment(); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ScriptContent.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ScriptContent.cs index b38efbd802..2bf4440ec5 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ScriptContent.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Queries/Steps/ScriptContent.cs @@ -97,7 +97,7 @@ private static bool ShouldEnrich(Context context) { // We need a special permission to disable scripting for security reasons, if the script removes sensible data. var shouldScript = - !context.ShouldSkipScripting() || + !context.NoScripting() || !context.UserPermissions.Allows(PermissionIds.ForApp(PermissionIds.AppNoScripting, context.App.Name)); return !context.IsFrontendClient && shouldScript; diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs index a18f5b4589..fcd5d3b6d3 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs @@ -94,9 +94,10 @@ private void AddReferenceFilter(TemplateOptions options) var requestContext = Context.Admin(app).Clone(b => b - .WithoutContentEnrichment() + .WithFields(null) + .WithNoEnrichment() .WithUnpublished() - .WithoutTotal()); + .WithNoTotal()); var contents = await contentQuery.QueryAsync(requestContext, Q.Empty.WithIds(domainId)); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesJintExtension.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesJintExtension.cs index 4c65ab1fae..48cad87438 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesJintExtension.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesJintExtension.cs @@ -83,9 +83,10 @@ private void GetReferences(ScriptExecutionContext context, DomainId appId, Claim var requestContext = new Context(user, app).Clone(b => b - .WithoutContentEnrichment() + .WithFields(null) + .WithNoEnrichment() .WithUnpublished() - .WithoutTotal()); + .WithNoTotal()); var contents = await contentQuery.QueryAsync(requestContext, Q.Empty.WithIds(ids), ct); @@ -122,9 +123,10 @@ private void GetReference(ScriptExecutionContext context, DomainId appId, Claims var requestContext = new Context(user, app).Clone(b => b - .WithoutContentEnrichment() + .WithFields(null) + .WithNoEnrichment() .WithUnpublished() - .WithoutTotal()); + .WithNoTotal()); var contents = await contentQuery.QueryAsync(requestContext, Q.Empty.WithIds(ids), ct); diff --git a/backend/src/Squidex.Domain.Apps.Entities/ContextHeaders.cs b/backend/src/Squidex.Domain.Apps.Entities/ContextHeaders.cs index d08cf124ab..2dde6dffb3 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/ContextHeaders.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/ContextHeaders.cs @@ -5,53 +5,82 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System.Globalization; + namespace Squidex.Domain.Apps.Entities; public static class ContextHeaders { - public const string NoCacheKeys = "X-NoCacheKeys"; - public const string NoScripting = "X-NoScripting"; - public const string NoSlowTotal = "X-NoSlowTotal"; - public const string NoTotal = "X-NoTotal"; + private static readonly char[] Separators = { ',', ';' }; + + public const string KeyBatchSize = "X-BatchSize"; + public const string KeyNoCacheKeys = "X-NoCacheKeys"; + public const string KeyNoScripting = "X-NoScripting"; + public const string KeyNoSlowTotal = "X-NoSlowTotal"; + public const string KeyNoTotal = "X-NoTotal"; + + public static int BatchSize(this Context context) + { + return context.AsNumber(KeyBatchSize); + } - public static bool ShouldSkipCacheKeys(this Context context) + public static ICloneBuilder WithBatchSize(this ICloneBuilder builder, int value) { - return context.Headers.ContainsKey(NoCacheKeys); + return builder.WithNumber(KeyBatchSize, value); } - public static ICloneBuilder WithoutCacheKeys(this ICloneBuilder builder, bool value = true) + public static bool NoCacheKeys(this Context context) { - return builder.WithBoolean(NoCacheKeys, value); + return context.AsBoolean(KeyNoCacheKeys); } - public static bool ShouldSkipScripting(this Context context) + public static ICloneBuilder WithNoCacheKeys(this ICloneBuilder builder, bool value = true) { - return context.Headers.ContainsKey(NoScripting); + return builder.WithBoolean(KeyNoCacheKeys, value); } - public static ICloneBuilder WithoutScripting(this ICloneBuilder builder, bool value = true) + public static bool NoScripting(this Context context) { - return builder.WithBoolean(NoScripting, value); + return context.AsBoolean(KeyNoScripting); } - public static bool ShouldSkipTotal(this Context context) + public static ICloneBuilder WithNoScripting(this ICloneBuilder builder, bool value = true) { - return context.Headers.ContainsKey(NoTotal); + return builder.WithBoolean(KeyNoScripting, value); } - public static ICloneBuilder WithoutTotal(this ICloneBuilder builder, bool value = true) + public static bool NoTotal(this Context context) { - return builder.WithBoolean(NoTotal, value); + return context.AsBoolean(KeyNoTotal); } - public static bool ShouldSkipSlowTotal(this Context context) + public static ICloneBuilder WithNoTotal(this ICloneBuilder builder, bool value = true) { - return context.Headers.ContainsKey(NoSlowTotal); + return builder.WithBoolean(KeyNoTotal, value); } - public static ICloneBuilder WithoutSlowTotal(this ICloneBuilder builder, bool value = true) + public static bool NoSlowTotal(this Context context) { - return builder.WithBoolean(NoSlowTotal, value); + return context.AsBoolean(KeyNoSlowTotal); + } + + public static ICloneBuilder WithNoSlowTotal(this ICloneBuilder builder, bool value = true) + { + return builder.WithBoolean(KeyNoSlowTotal, value); + } + + public static ICloneBuilder WithNumber(this ICloneBuilder builder, string key, int value) + { + if (value != 0) + { + builder.SetHeader(key, value.ToString(CultureInfo.InvariantCulture)); + } + else + { + builder.Remove(key); + } + + return builder; } public static ICloneBuilder WithBoolean(this ICloneBuilder builder, string key, bool value) @@ -81,4 +110,29 @@ public static ICloneBuilder WithStrings(this ICloneBuilder builder, string key, return builder; } + + public static bool AsBoolean(this Context context, string key) + { + return context.Headers.ContainsKey(key); + } + + public static int AsNumber(this Context context, string key) + { + if (context.Headers.TryGetValue(key, out var value) && int.TryParse(value, CultureInfo.InvariantCulture, out var result)) + { + return result; + } + + return 0; + } + + public static IEnumerable AsStrings(this Context context, string key) + { + if (context.Headers.TryGetValue(key, out var value)) + { + return value.Split(Separators, StringSplitOptions.RemoveEmptyEntries).Select(x => x.Trim()).Distinct(); + } + + return Enumerable.Empty(); + } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/Queries/RuleEnricher.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/Queries/RuleEnricher.cs index b59b47a35b..8d046d8daf 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/Queries/RuleEnricher.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/Queries/RuleEnricher.cs @@ -50,7 +50,7 @@ public async Task> EnrichAsync(IEnumerable x.AppId.Id)) { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuerWorker.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuerWorker.cs index 6b689a416f..3a81881d34 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuerWorker.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuerWorker.cs @@ -6,7 +6,6 @@ // ========================================================================== using System.Collections.Concurrent; -using System.Threading.Tasks.Dataflow; using Microsoft.Extensions.Logging; using NodaTime; using Squidex.Domain.Apps.Core.HandleRules; @@ -22,7 +21,7 @@ namespace Squidex.Domain.Apps.Entities.Rules; public sealed class RuleDequeuerWorker : IBackgroundProcess { private readonly ConcurrentDictionary executing = new ConcurrentDictionary(); - private readonly ITargetBlock requestBlock; + private readonly PartitionedScheduler requestScheduler; private readonly IRuleEventRepository ruleEventRepository; private readonly IRuleService ruleService; private readonly IRuleUsageTracker ruleUsageTracker; @@ -42,9 +41,7 @@ public RuleDequeuerWorker( this.ruleUsageTracker = ruleUsageTracker; this.log = log; - requestBlock = - new PartitionedActionBlock(HandleAsync, x => x.Job.ExecutionPartition, - new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 32 }); + requestScheduler = new PartitionedScheduler(HandleAsync, 32, 2); } public Task StartAsync( @@ -60,9 +57,7 @@ public async Task StopAsync( { await (timer?.StopAsync() ?? Task.CompletedTask); - requestBlock.Complete(); - - await requestBlock.Completion; + await requestScheduler.CompleteAsync(); } public async Task QueryAsync( @@ -72,7 +67,10 @@ public async Task QueryAsync( { var now = Clock.GetCurrentInstant(); - await ruleEventRepository.QueryPendingAsync(now, requestBlock.SendAsync, ct); + await ruleEventRepository.QueryPendingAsync(now, async @event => + { + await requestScheduler.ScheduleAsync(@event.Job.ExecutionPartition, @event, ct); + }, ct); } catch (Exception ex) { @@ -80,7 +78,8 @@ public async Task QueryAsync( } } - public async Task HandleAsync(IRuleEventEntity @event) + public async Task HandleAsync(IRuleEventEntity @event, + CancellationToken ct) { if (!executing.TryAdd(@event.Id, false)) { @@ -91,7 +90,7 @@ public async Task HandleAsync(IRuleEventEntity @event) { var job = @event.Job; - var (response, elapsed) = await ruleService.InvokeAsync(job.ActionName, job.ActionData); + var (response, elapsed) = await ruleService.InvokeAsync(job.ActionName, job.ActionData, ct); var jobDelay = ComputeJobDelay(response.Status, @event, job); var jobResult = ComputeJobResult(response.Status, jobDelay); @@ -108,11 +107,11 @@ public async Task HandleAsync(IRuleEventEntity @event) JobResult = jobResult }; - await ruleEventRepository.UpdateAsync(@event.Job, update); + await ruleEventRepository.UpdateAsync(@event.Job, update, default); if (response.Status == RuleResult.Failed) { - await ruleUsageTracker.TrackAsync(job.AppId, job.RuleId, now.ToDateOnly(), 0, 0, 1); + await ruleUsageTracker.TrackAsync(job.AppId, job.RuleId, now.ToDateOnly(), 0, 0, 1, default); log.LogWarning(response.Exception, "Failed to execute rule event with rule id {ruleId}/{description}.", @event.Job.RuleId, @@ -120,7 +119,7 @@ public async Task HandleAsync(IRuleEventEntity @event) } else { - await ruleUsageTracker.TrackAsync(job.AppId, job.RuleId, now.ToDateOnly(), 0, 1, 0); + await ruleUsageTracker.TrackAsync(job.AppId, job.RuleId, now.ToDateOnly(), 0, 1, 0, default); } } catch (Exception ex) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Tags/TagService.cs b/backend/src/Squidex.Domain.Apps.Entities/Tags/TagService.cs index 6f7b2adef1..9b5c9e7a78 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Tags/TagService.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Tags/TagService.cs @@ -5,7 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Threading.Tasks.Dataflow; using Squidex.Domain.Apps.Core.Tags; using Squidex.Infrastructure; using Squidex.Infrastructure.States; @@ -312,48 +311,15 @@ private async Task> GetStateAsync(DomainId id, string group, public async Task ClearAsync( CancellationToken ct = default) { - var writerBlock = new ActionBlock[]>(async batch => - { - try - { - var isChanged = !batch.All(x => !x.Value.Clear()); - - if (isChanged) - { - var jobs = batch.Select(x => new SnapshotWriteJob(x.Key, x.Value, x.Version)); + // Run batch first, because it is cheaper as it has less items. + var batches = persistenceFactory.Snapshots.ReadAllAsync(ct).Batch(500, ct).Buffered(2, ct: ct); - await persistenceFactory.Snapshots.WriteManyAsync(jobs, ct); - } - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, - new ExecutionDataflowBlockOptions + await Parallel.ForEachAsync(batches, ct, async (batch, ct) => { - BoundedCapacity = 2, - MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, - }); + // Convert to list for the tests, actually not needed. + var jobs = batch.Where(x => x.Value.Clear()).Select(x => new SnapshotWriteJob(x.Key, x.Value, x.Version)).ToList(); - // Create batches of 500 items to clear the tag count for better performance. - var batchBlock = new BatchBlock>(500, new GroupingDataflowBlockOptions - { - BoundedCapacity = 500 + await persistenceFactory.Snapshots.WriteManyAsync(jobs, ct); }); - - batchBlock.BidirectionalLinkTo(writerBlock); - - await foreach (var state in persistenceFactory.Snapshots.ReadAllAsync(ct)) - { - // Uses back-propagation to not query additional items from the database, when queue is full. - await batchBlock.SendAsync(state, ct); - } - - batchBlock.Complete(); - - await writerBlock.Completion; } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj b/backend/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj index b12f9044e9..47516ece4d 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj +++ b/backend/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj @@ -22,7 +22,6 @@ - diff --git a/backend/src/Squidex.Infrastructure/CollectionExtensions.cs b/backend/src/Squidex.Infrastructure/CollectionExtensions.cs index 7f4f70b1f8..47221d2ab8 100644 --- a/backend/src/Squidex.Infrastructure/CollectionExtensions.cs +++ b/backend/src/Squidex.Infrastructure/CollectionExtensions.cs @@ -5,6 +5,7 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; @@ -91,31 +92,48 @@ public static bool SetEquals(this IReadOnlyCollection source, IReadOnlyCol return source.Count == other.Count && source.Intersect(other).Count() == other.Count; } - public static IEnumerable> Batch(this IEnumerable source, int size) + public static IEnumerable> Batch(this IEnumerable source, int size) { - TSource[]? bucket = null; - - var bucketIndex = 0; + List? bucket = null; foreach (var item in source) { - bucket ??= new TSource[size]; - bucket[bucketIndex++] = item; + bucket ??= new List(size); + bucket.Add(item); - if (bucketIndex != size) + if (bucket.Count == size) { - continue; + yield return bucket; + bucket = null; } + } + if (bucket?.Count > 0) + { yield return bucket; + } + } + + public static async IAsyncEnumerable> Batch(this IAsyncEnumerable source, int size, + [EnumeratorCancellation] CancellationToken ct = default) + { + List? bucket = null; + + await foreach (var item in source.WithCancellation(ct)) + { + bucket ??= new List(size); + bucket.Add(item); - bucket = null; - bucketIndex = 0; + if (bucket.Count == size) + { + yield return bucket; + bucket = null; + } } - if (bucket != null && bucketIndex > 0) + if (bucket?.Count > 0) { - yield return bucket.Take(bucketIndex); + yield return bucket; } } @@ -471,4 +489,26 @@ static async IAsyncEnumerable Core(IAsyncEnumerable source, Fu } } } + + public static async Task> SelectManyAsync(this IEnumerable source, Func>> selector, + CancellationToken ct = default) + { + var result = new ConcurrentBag(); + + var sourceWithIndex = source.Select((x, i) => (Item: x, Index: i)); + + await Parallel.ForEachAsync(sourceWithIndex, + ct, + async (item, ct) => + { + var createdItems = await selector(item.Item, item.Index, ct); + + foreach (var created in createdItems) + { + result.Add(created); + } + }); + + return result; + } } diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index bf9a40c6d5..efb001f674 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -5,8 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Threading.Tasks.Dataflow; -using Google.Api; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Squidex.Caching; @@ -93,75 +91,40 @@ private async Task InsertManyAsync(IAsyncEnumerable source, { var store = serviceProvider.GetRequiredService>(); - var parallelism = Environment.ProcessorCount; - var handledIds = new HashSet(); var handlerErrors = 0; using (localCache.StartContext()) { - var workerBlock = new ActionBlock(async ids => + // Run batch first, because it is cheaper as it has less items. + var batches = source.Where(handledIds.Add).Batch(batchSize, ct).Buffered(2, ct); + + await Parallel.ForEachAsync(batches, ct, async (batch, ct) => { - try + await using (var context = store.WithBatchContext(typeof(T))) { - await using (var context = store.WithBatchContext(typeof(T))) + await context.LoadAsync(batch); + + foreach (var id in batch) { - await context.LoadAsync(ids); + try + { + var domainObject = domainObjectFactory.Create(id, context); - foreach (var id in ids) + await domainObject.RebuildStateAsync(ct); + } + catch (DomainObjectNotFoundException) { - try - { - var domainObject = domainObjectFactory.Create(id, context); - - await domainObject.RebuildStateAsync(ct); - } - catch (DomainObjectNotFoundException) - { - return; - } - catch (Exception ex) - { - log.LogWarning(ex, "Found corrupt domain object of type {type} with ID {id}.", typeof(T), id); - Interlocked.Increment(ref handlerErrors); - } + return; + } + catch (Exception ex) + { + log.LogWarning(ex, "Found corrupt domain object of type {type} with ID {id}.", typeof(T), id); + Interlocked.Increment(ref handlerErrors); } } } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, - new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = parallelism, - MaxMessagesPerTask = 10, - BoundedCapacity = parallelism - }); - - var batchBlock = new BatchBlock(batchSize, new GroupingDataflowBlockOptions - { - BoundedCapacity = batchSize }); - - batchBlock.BidirectionalLinkTo(workerBlock); - - await foreach (var id in source.WithCancellation(ct)) - { - if (handledIds.Add(id)) - { - if (!await batchBlock.SendAsync(id, ct)) - { - break; - } - } - } - - batchBlock.Complete(); - - await workerBlock.Completion; } var errorRate = (double)handlerErrors / handledIds.Count; diff --git a/backend/src/Squidex.Infrastructure/Reflection/SimpleMapper.cs b/backend/src/Squidex.Infrastructure/Reflection/SimpleMapper.cs index 02f7a9bafe..9abf6e053a 100644 --- a/backend/src/Squidex.Infrastructure/Reflection/SimpleMapper.cs +++ b/backend/src/Squidex.Infrastructure/Reflection/SimpleMapper.cs @@ -15,6 +15,13 @@ namespace Squidex.Infrastructure.Reflection; public static class SimpleMapper { + private readonly record struct MappingContext + { + required public CultureInfo Culture { get; init; } + + required public bool NullableAsOptional { get; init; } + } + private sealed class StringConversionPropertyMapper : PropertyMapper { public StringConversionPropertyMapper( @@ -24,7 +31,7 @@ public StringConversionPropertyMapper( { } - public override void MapProperty(object source, object target, CultureInfo culture) + public override void MapProperty(object source, object target, ref MappingContext context) { var value = GetValue(source); @@ -32,6 +39,39 @@ public override void MapProperty(object source, object target, CultureInfo cultu } } + private sealed class NullablePropertyMapper : PropertyMapper + { + private readonly object? defaultValue; + + public NullablePropertyMapper( + PropertyAccessor sourceAccessor, + PropertyAccessor targetAccessor, + object? defaultValue) + : base(sourceAccessor, targetAccessor) + { + this.defaultValue = defaultValue; + } + + public override void MapProperty(object source, object target, ref MappingContext context) + { + var value = GetValue(source); + + if (value == null) + { + if (context.NullableAsOptional) + { + return; + } + else + { + value = defaultValue; + } + } + + SetValue(target, value); + } + } + private sealed class ConversionPropertyMapper : PropertyMapper { private readonly Type targetType; @@ -45,7 +85,7 @@ public ConversionPropertyMapper( this.targetType = targetType; } - public override void MapProperty(object source, object target, CultureInfo culture) + public override void MapProperty(object source, object target, ref MappingContext context) { var value = GetValue(source); @@ -56,7 +96,7 @@ public override void MapProperty(object source, object target, CultureInfo cultu try { - var converted = Convert.ChangeType(value, targetType, culture); + var converted = Convert.ChangeType(value, targetType, context.Culture); SetValue(target, converted); } @@ -80,7 +120,7 @@ public TypeConverterPropertyMapper( this.converter = converter; } - public override void MapProperty(object source, object target, CultureInfo culture) + public override void MapProperty(object source, object target, ref MappingContext context) { var value = GetValue(source); @@ -91,7 +131,7 @@ public override void MapProperty(object source, object target, CultureInfo cultu try { - var converted = converter.ConvertFrom(null, culture, value); + var converted = converter.ConvertFrom(null, context.Culture, value); SetValue(target, converted); } @@ -113,7 +153,7 @@ public PropertyMapper(PropertyAccessor sourceAccessor, PropertyAccessor targetAc this.targetAccessor = targetAccessor; } - public virtual void MapProperty(object source, object target, CultureInfo culture) + public virtual void MapProperty(object source, object target, ref MappingContext context) { var value = GetValue(source); @@ -171,6 +211,13 @@ static ClassMapper() new PropertyAccessor(sourceProperty), new PropertyAccessor(targetProperty))); } + else if (IsNullableOf(sourceType, targetType)) + { + Mappers.Add(new NullablePropertyMapper( + new PropertyAccessor(sourceProperty), + new PropertyAccessor(targetProperty), + Activator.CreateInstance(targetType))); + } else { var converter = TypeDescriptor.GetConverter(targetType); @@ -191,15 +238,22 @@ static ClassMapper() } } } + + static bool IsNullableOf(Type type, Type wrappedType) + { + return type.IsGenericType && + type.GetGenericTypeDefinition() == typeof(Nullable<>) && + type.GenericTypeArguments[0] == wrappedType; + } } - public static TTarget MapClass(TSource source, TTarget destination, CultureInfo culture) + public static TTarget MapClass(TSource source, TTarget destination, ref MappingContext context) { for (var i = 0; i < Mappers.Count; i++) { var mapper = Mappers[i]; - mapper.MapProperty(source, destination, culture); + mapper.MapProperty(source, destination, ref context); } return destination; @@ -210,10 +264,10 @@ public static TTarget Map(TSource source, TTarget target) where TSource : class where TTarget : class { - return Map(source, target, CultureInfo.CurrentCulture); + return Map(source, target, CultureInfo.CurrentCulture, true); } - public static TTarget Map(TSource source, TTarget target, CultureInfo culture) + public static TTarget Map(TSource source, TTarget target, CultureInfo culture, bool nullableAsOptional) where TSource : class where TTarget : class { @@ -221,6 +275,12 @@ public static TTarget Map(TSource source, TTarget target, Cult Guard.NotNull(culture); Guard.NotNull(target); - return ClassMapper.MapClass(source, target, culture); + var context = new MappingContext + { + Culture = culture, + NullableAsOptional = nullableAsOptional + }; + + return ClassMapper.MapClass(source, target, ref context); } } diff --git a/backend/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj b/backend/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj index 22df15365f..07098f4f97 100644 --- a/backend/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj +++ b/backend/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj @@ -37,7 +37,6 @@ - diff --git a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs deleted file mode 100644 index 1f70759450..0000000000 --- a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs +++ /dev/null @@ -1,116 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschraenkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System.Threading.Tasks.Dataflow; - -namespace Squidex.Infrastructure.Tasks; - -public sealed class PartitionedActionBlock : ITargetBlock -{ - private readonly ITargetBlock distributor; - private readonly ActionBlock[] workers; - - public Task Completion - { - get => Task.WhenAll(workers.Select(x => x.Completion)); - } - - public PartitionedActionBlock(Func action, Func partitioner) - : this(action, partitioner, new ExecutionDataflowBlockOptions()) - { - } - - public PartitionedActionBlock(Func action, Func partitioner, ExecutionDataflowBlockOptions dataflowBlockOptions) - { - Guard.NotNull(action); - Guard.NotNull(partitioner); - Guard.NotNull(dataflowBlockOptions); - Guard.GreaterThan(dataflowBlockOptions.MaxDegreeOfParallelism, 1, nameof(dataflowBlockOptions.MaxDegreeOfParallelism)); - - workers = new ActionBlock[dataflowBlockOptions.MaxDegreeOfParallelism]; - - for (var i = 0; i < dataflowBlockOptions.MaxDegreeOfParallelism; i++) - { - workers[i] = new ActionBlock(action, new ExecutionDataflowBlockOptions - { - BoundedCapacity = dataflowBlockOptions.BoundedCapacity, - CancellationToken = dataflowBlockOptions.CancellationToken, - MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, - TaskScheduler = dataflowBlockOptions.TaskScheduler - }); - } - - distributor = new ActionBlock(async input => - { - try - { - var partition = Math.Abs(partitioner(input)) % workers.Length; - - await workers[partition].SendAsync(input); - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, new ExecutionDataflowBlockOptions - { - BoundedCapacity = 1, - MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1 - }); - - LinkCompletion(); - } - - public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock? source, bool consumeToAccept) - { - return distributor.OfferMessage(messageHeader, messageValue, source, consumeToAccept); - } - - public void Complete() - { - distributor.Complete(); - } - - public void Fault(Exception exception) - { - distributor.Fault(exception); - } - -#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void - private async void LinkCompletion() -#pragma warning restore RECS0165 // Asynchronous methods should return a Task instead of void - { - try - { - await distributor.Completion.ConfigureAwait(false); - } -#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body - catch -#pragma warning restore RECS0022 // A catch clause that catches System.Exception and has an empty body - { - // we do not want to change the stacktrace of the exception. - } - - if (distributor.Completion.IsFaulted && distributor.Completion.Exception != null) - { - foreach (var worker in workers) - { - ((IDataflowBlock)worker).Fault(distributor.Completion.Exception); - } - } - else - { - foreach (var worker in workers) - { - worker.Complete(); - } - } - } -} diff --git a/backend/src/Squidex.Infrastructure/Tasks/PartitionedScheduler.cs b/backend/src/Squidex.Infrastructure/Tasks/PartitionedScheduler.cs new file mode 100644 index 0000000000..1730728b4d --- /dev/null +++ b/backend/src/Squidex.Infrastructure/Tasks/PartitionedScheduler.cs @@ -0,0 +1,109 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Threading.Channels; + +namespace Squidex.Infrastructure.Tasks; + +public sealed class PartitionedScheduler : IAsyncDisposable +{ + private readonly Consumer[] consumers; + private Exception? exception; + + private sealed class Consumer + { + private readonly Channel channel; + private readonly Task worker; + + public Consumer(Func action, int bufferSize, + CancellationToken ct) + { + channel = Channel.CreateBounded(new BoundedChannelOptions(bufferSize) + { + SingleReader = true, + SingleWriter = false + }); + + worker = Task.Run(async () => + { + await foreach (var item in channel.Reader.ReadAllAsync(ct)) + { + await action(item, ct); + } + }, ct); + } + + public ValueTask ScheduleAsync(T item, + CancellationToken ct) + { + return channel.Writer.WriteAsync(item, ct); + } + + public Task CompleteAsync() + { + channel.Writer.TryComplete(); + + return worker; + } + } + + public PartitionedScheduler(Func action, + int maxWorkers, + int maxBuffer, + CancellationToken ct = default) + { + consumers = new Consumer[maxWorkers]; + + for (var i = 0; i < maxWorkers; i++) + { + consumers[i] = new Consumer(action, maxBuffer, ct); + } + } + + public async ValueTask ScheduleAsync(object key, T item, + CancellationToken ct = default) + { + if (exception != null) + { + throw exception; + } + + var consumerIndex = Math.Abs((key?.GetHashCode() ?? 0) % consumers.Length); + var consumerInstance = consumers[consumerIndex]; + + try + { + await consumerInstance.ScheduleAsync(item, ct); + } + catch (Exception ex) + { + exception = ex; + } + } + + public async Task CompleteAsync() + { + foreach (var consumer in consumers) + { +#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body + try + { + await consumer.CompleteAsync(); + } + catch + { + // Ensure we can complete all workers. + } +#pragma warning restore RECS0022 // A catch clause that catches System.Exception and has an empty body + } + } + + public async ValueTask DisposeAsync() + { + await CompleteAsync(); + } +} diff --git a/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs b/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs index 32e9c4603f..dbbb346734 100644 --- a/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs +++ b/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs @@ -5,7 +5,8 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Threading.Tasks.Dataflow; +using System.Runtime.CompilerServices; +using System.Threading.Channels; namespace Squidex.Infrastructure.Tasks; @@ -55,28 +56,60 @@ public static async Task WithCancellation(this Task task, } } -#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void - public static async void BidirectionalLinkTo(this ISourceBlock source, ITargetBlock target) -#pragma warning restore RECS0165 // Asynchronous methods should return a Task instead of void + public static async IAsyncEnumerable Buffered(this IAsyncEnumerable source, int capacity, + [EnumeratorCancellation] CancellationToken ct = default) { - source.LinkTo(target, new DataflowLinkOptions + var bufferChannel = Channel.CreateBounded(new BoundedChannelOptions(capacity) { - PropagateCompletion = true + SingleWriter = true, + SingleReader = true, }); + using var bufferCompletion = new CancellationTokenSource(); + + var producer = Task.Run(async () => + { + try + { + await foreach (var item in source.WithCancellation(bufferCompletion.Token).ConfigureAwait(false)) + { + await bufferChannel.Writer.WriteAsync(item, bufferCompletion.Token).ConfigureAwait(false); + } + } + catch (ChannelClosedException) + { + // Ignore + } + catch (OperationCanceledException) + { + // Ignore + } + finally + { + bufferChannel.Writer.TryComplete(); + } + }, bufferCompletion.Token); + try { - await target.Completion.ConfigureAwait(false); + await foreach (T item in bufferChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + yield return item; + + ct.ThrowIfCancellationRequested(); + } + + await producer.ConfigureAwait(false); // Propagate possible source error } - catch + finally { - // We do not want to change the stacktrace of the exception. - return; - } + if (!producer.IsCompleted) + { + bufferCompletion.Cancel(); + bufferChannel.Writer.TryComplete(); - if (target.Completion.IsFaulted && target.Completion.Exception != null) - { - source.Fault(target.Completion.Exception.Flatten()); + await Task.WhenAny(producer).ConfigureAwait(false); + } } } } diff --git a/backend/src/Squidex/Areas/Api/Config/OpenApi/AcceptHeader.cs b/backend/src/Squidex/Areas/Api/Config/OpenApi/AcceptHeader.cs index f36f3dc936..279d4da56b 100644 --- a/backend/src/Squidex/Areas/Api/Config/OpenApi/AcceptHeader.cs +++ b/backend/src/Squidex/Areas/Api/Config/OpenApi/AcceptHeader.cs @@ -22,7 +22,7 @@ public class AcceptHeader public sealed class UnpublishedAttribute : BaseAttribute { public UnpublishedAttribute() - : base(ContentHeaders.Unpublished, "Return unpublished content items.", JsonObjectType.Boolean) + : base(ContentHeaders.KeyUnpublished, "Return unpublished content items.", JsonObjectType.Boolean) { } } @@ -30,7 +30,7 @@ public UnpublishedAttribute() public sealed class FieldsAttribute : BaseAttribute { public FieldsAttribute() - : base(ContentHeaders.Fields, "The list of content fields (comma-separated).", JsonObjectType.String) + : base(ContentHeaders.KeyFields, "The list of content fields (comma-separated).", JsonObjectType.String) { } } @@ -38,7 +38,7 @@ public FieldsAttribute() public sealed class FlattenAttribute : BaseAttribute { public FlattenAttribute() - : base(ContentHeaders.Flatten, "Provide the data as flat object.", JsonObjectType.Boolean) + : base(ContentHeaders.KeyFlatten, "Provide the data as flat object.", JsonObjectType.Boolean) { } } @@ -46,7 +46,7 @@ public FlattenAttribute() public sealed class LanguagesAttribute : BaseAttribute { public LanguagesAttribute() - : base(ContentHeaders.Languages, "The list of languages to resolve (comma-separated).") + : base(ContentHeaders.KeyLanguages, "The list of languages to resolve (comma-separated).") { } } @@ -54,7 +54,7 @@ public LanguagesAttribute() public sealed class NoTotalAttribute : BaseAttribute { public NoTotalAttribute() - : base(ContextHeaders.NoTotal, "Do not return the total amount.", JsonObjectType.Boolean) + : base(ContextHeaders.KeyNoTotal, "Do not return the total amount.", JsonObjectType.Boolean) { } } @@ -62,7 +62,7 @@ public NoTotalAttribute() public sealed class NoSlowTotalAttribute : BaseAttribute { public NoSlowTotalAttribute() - : base(ContextHeaders.NoSlowTotal, "Do not return the total amount, if it would be slow.", JsonObjectType.Boolean) + : base(ContextHeaders.KeyNoSlowTotal, "Do not return the total amount, if it would be slow.", JsonObjectType.Boolean) { } } diff --git a/backend/src/Squidex/Areas/Api/Controllers/Assets/AssetContentController.cs b/backend/src/Squidex/Areas/Api/Controllers/Assets/AssetContentController.cs index 9ea321287d..7475a3393c 100644 --- a/backend/src/Squidex/Areas/Api/Controllers/Assets/AssetContentController.cs +++ b/backend/src/Squidex/Areas/Api/Controllers/Assets/AssetContentController.cs @@ -62,7 +62,7 @@ public AssetContentController( [AllowAnonymous] public async Task GetAssetContentBySlug(string app, string idOrSlug, AssetContentQueryDto request, string? more = null) { - var requestContext = Context.Clone(b => b.WithoutAssetEnrichment()); + var requestContext = Context.Clone(b => b.WithNoAssetEnrichment()); var asset = await assetQuery.FindAsync(requestContext, DomainId.Create(idOrSlug), ct: HttpContext.RequestAborted); @@ -90,7 +90,7 @@ public async Task GetAssetContentBySlug(string app, string idOrSl [Obsolete("Use overload with app name")] public async Task GetAssetContent(DomainId id, AssetContentQueryDto request) { - var requestContext = Context.Clone(b => b.WithoutAssetEnrichment()); + var requestContext = Context.Clone(b => b.WithNoAssetEnrichment()); var asset = await assetQuery.FindGlobalAsync(requestContext, id, HttpContext.RequestAborted); diff --git a/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/ContentDto.cs b/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/ContentDto.cs index a56a1ec48e..62efd52a9f 100644 --- a/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/ContentDto.cs +++ b/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/ContentDto.cs @@ -121,7 +121,7 @@ public static ContentDto FromDomain(IEnrichedContentEntity content, Resources re SchemaName = content.SchemaId.Name }); - if (resources.Context.ShouldFlatten()) + if (resources.Context.Flatten()) { response.Data = content.Data.ToFlatten(); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetsBulkUpdateCommandMiddlewareTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetsBulkUpdateCommandMiddlewareTests.cs index 7780597190..80b68c879f 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetsBulkUpdateCommandMiddlewareTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetsBulkUpdateCommandMiddlewareTests.cs @@ -22,9 +22,7 @@ public class AssetsBulkUpdateCommandMiddlewareTests : GivenContext public AssetsBulkUpdateCommandMiddlewareTests() { - var log = A.Fake>(); - - sut = new AssetsBulkUpdateCommandMiddleware(contextProvider, log); + sut = new AssetsBulkUpdateCommandMiddleware(contextProvider); } [Fact] @@ -61,7 +59,7 @@ public async Task Should_annotate_asset() Assert.Single(actual); Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); - A.CallTo(() => commandBus.PublishAsync(A.That.Matches(x => x.AssetId == id && x.FileName == "file"), CancellationToken)) + A.CallTo(() => commandBus.PublishAsync(A.That.Matches(x => x.AssetId == id && x.FileName == "file"), A._)) .MustHaveHappened(); } @@ -97,7 +95,7 @@ public async Task Should_move_asset() Assert.Single(actual); Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); - A.CallTo(() => commandBus.PublishAsync(A.That.Matches(x => x.AssetId == id), CancellationToken)) + A.CallTo(() => commandBus.PublishAsync(A.That.Matches(x => x.AssetId == id), A._)) .MustHaveHappened(); } @@ -134,7 +132,7 @@ public async Task Should_delete_asset() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.AssetId == id), CancellationToken)) + A.That.Matches(x => x.AssetId == id), A._)) .MustHaveHappened(); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/AssetQueryParserTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/AssetQueryParserTests.cs index bdb727ed52..c7a85d93c6 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/AssetQueryParserTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/AssetQueryParserTests.cs @@ -29,7 +29,7 @@ public AssetQueryParserTests() [Fact] public async Task Should_skip_total_if_set_in_context() { - var q = await sut.ParseAsync(ApiContext.Clone(b => b.WithoutTotal()), Q.Empty, CancellationToken); + var q = await sut.ParseAsync(ApiContext.Clone(b => b.WithNoTotal()), Q.Empty, CancellationToken); Assert.True(q.NoTotal); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/CalculateTokensTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/CalculateTokensTests.cs index 3f795ba65e..e248228d20 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/CalculateTokensTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/CalculateTokensTests.cs @@ -28,7 +28,7 @@ public async Task Should_not_enrich_asset_edit_tokens_if_disabled() { var asset = CreateAsset(); - await sut.EnrichAsync(ApiContext.Clone(b => b.WithoutAssetEnrichment()), Enumerable.Repeat(asset, 1), default); + await sut.EnrichAsync(ApiContext.Clone(b => b.WithNoAssetEnrichment()), Enumerable.Repeat(asset, 1), default); Assert.Null(asset.EditToken); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ConvertTagsTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ConvertTagsTests.cs index 4842ea7ae2..3dccc73163 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ConvertTagsTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ConvertTagsTests.cs @@ -38,7 +38,7 @@ public async Task Should_not_enrich_asset_with_tag_names_if_disabled() { var asset = new AssetEntity(); - await sut.EnrichAsync(ApiContext.Clone(b => b.WithoutAssetEnrichment()), Enumerable.Repeat(asset, 1), CancellationToken); + await sut.EnrichAsync(ApiContext.Clone(b => b.WithNoAssetEnrichment()), Enumerable.Repeat(asset, 1), CancellationToken); Assert.Null(asset.TagNames); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichForCachingTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichForCachingTests.cs index 66ada93d2c..982e75a396 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichForCachingTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichForCachingTests.cs @@ -63,7 +63,7 @@ public async Task Should_add_app_version_as_dependency() [Fact] public async Task Should_not_add_cache_headers_if_disabled() { - await sut.EnrichAsync(ApiContext.Clone(b => b.WithoutCacheKeys()), CancellationToken); + await sut.EnrichAsync(ApiContext.Clone(b => b.WithNoCacheKeys()), CancellationToken); A.CallTo(() => requestCache.AddHeader(A._)) .MustNotHaveHappened(); @@ -74,7 +74,7 @@ public async Task Should_not_add_cache_headers_for_assets_if_disabled() { var asset = CreateAsset(); - await sut.EnrichAsync(ApiContext.Clone(b => b.WithoutCacheKeys()), Enumerable.Repeat(asset, 1), CancellationToken); + await sut.EnrichAsync(ApiContext.Clone(b => b.WithNoCacheKeys()), Enumerable.Repeat(asset, 1), CancellationToken); A.CallTo(() => requestCache.AddHeader(A._)) .MustNotHaveHappened(); diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichWithMetadataTextTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichWithMetadataTextTests.cs index fa99707b50..2bf6c36bcc 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichWithMetadataTextTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/EnrichWithMetadataTextTests.cs @@ -32,7 +32,7 @@ public async Task Should_not_enrich_if_disabled() { var asset = new AssetEntity(); - await sut.EnrichAsync(ApiContext.Clone(b => b.WithoutAssetEnrichment()), Enumerable.Repeat(asset, 1), CancellationToken); + await sut.EnrichAsync(ApiContext.Clone(b => b.WithNoAssetEnrichment()), Enumerable.Repeat(asset, 1), CancellationToken); A.CallTo(() => assetMetadataSource1.Format(A._)) .MustNotHaveHappened(); diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ScriptAssetTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ScriptAssetTests.cs index 6be7d593af..3d15cb303c 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ScriptAssetTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/Queries/ScriptAssetTests.cs @@ -137,7 +137,7 @@ private static ScriptOptions ScriptOptions() private Context ContextWithNoScript() { var contextPermission = PermissionIds.ForApp(PermissionIds.AppNoScripting, App.Name).Id; - var contextInstance = CreateContext(false, contextPermission).Clone(b => b.WithoutScripting()); + var contextInstance = CreateContext(false, contextPermission).Clone(b => b.WithNoScripting()); return contextInstance; } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentsBulkUpdateCommandMiddlewareTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentsBulkUpdateCommandMiddlewareTests.cs index c9c3a78a07..d88f613fe6 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentsBulkUpdateCommandMiddlewareTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentsBulkUpdateCommandMiddlewareTests.cs @@ -30,9 +30,7 @@ public class ContentsBulkUpdateCommandMiddlewareTests : GivenContext public ContentsBulkUpdateCommandMiddlewareTests() { - var log = A.Fake>(); - - sut = new ContentsBulkUpdateCommandMiddleware(contentQuery, contextProvider, log); + sut = new ContentsBulkUpdateCommandMiddleware(contentQuery, contextProvider); } [Fact] @@ -82,10 +80,11 @@ public async Task Should_throw_exception_if_query_resolves_multiple_contents() A.CallTo(() => contentQuery.QueryAsync( A.That.Matches(x => - x.ShouldSkipCleanup() && - x.ShouldSkipContentEnrichment() && - x.ShouldSkipTotal()), - schemaId.Name, A.That.Matches(x => x.JsonQuery == query), CancellationToken)) + x.NoCleanup() && + x.NoEnrichment() && + x.NoTotal()), + schemaId.Id.ToString(), + A.That.Matches(x => x.JsonQuery == query), A._)) .Returns(ResultList.CreateFrom(2, CreateContent(id), CreateContent(id))); var command = BulkCommand(BulkUpdateContentType.ChangeStatus, new BulkUpdateJob { Query = query }); @@ -108,10 +107,12 @@ public async Task Should_upsert_content_with_resolved_id() A.CallTo(() => contentQuery.QueryAsync( A.That.Matches(x => - x.ShouldSkipCleanup() && - x.ShouldSkipContentEnrichment() && - x.ShouldSkipTotal()), - schemaId.Name, A.That.Matches(x => x.JsonQuery == query), CancellationToken)) + x.NoCleanup() && + x.NoEnrichment() && + x.NoTotal()), + schemaId.Id.ToString(), + A.That.Matches(x => x.JsonQuery == query), + A._)) .Returns(ResultList.CreateFrom(1, CreateContent(id))); var command = BulkCommand(BulkUpdateContentType.Upsert, new BulkUpdateJob { Query = query, Data = data }); @@ -122,7 +123,7 @@ public async Task Should_upsert_content_with_resolved_id() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.Data == data && x.ContentId == id), CancellationToken)) + A.That.Matches(x => x.Data == data && x.ContentId == id), A._)) .MustHaveHappenedOnceExactly(); } @@ -138,10 +139,12 @@ public async Task Should_upsert_content_with_resolved_ids() A.CallTo(() => contentQuery.QueryAsync( A.That.Matches(x => - x.ShouldSkipCleanup() && - x.ShouldSkipContentEnrichment() && - x.ShouldSkipTotal()), - schemaId.Name, A.That.Matches(x => x.JsonQuery == query), CancellationToken)) + x.NoCleanup() && + x.NoEnrichment() && + x.NoTotal()), + schemaId.Id.ToString(), + A.That.Matches(x => x.JsonQuery == query), + A._)) .Returns(ResultList.CreateFrom(2, CreateContent(id1), CreateContent(id2))); @@ -157,11 +160,11 @@ public async Task Should_upsert_content_with_resolved_ids() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id2 && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.Data == data && x.ContentId == id1), CancellationToken)) + A.That.Matches(x => x.Data == data && x.ContentId == id1), A._)) .MustHaveHappenedOnceExactly(); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.Data == data && x.ContentId == id2), CancellationToken)) + A.That.Matches(x => x.Data == data && x.ContentId == id2), A._)) .MustHaveHappenedOnceExactly(); } @@ -180,7 +183,7 @@ public async Task Should_upsert_content_with_random_id_if_no_query_and_id_define Assert.Single(actual, x => x.JobIndex == 0 && x.Id != default && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.Data == data && x.ContentId != default), CancellationToken)) + A.That.Matches(x => x.Data == data && x.ContentId != default), A._)) .MustHaveHappenedOnceExactly(); } @@ -199,7 +202,7 @@ public async Task Should_upsert_content_with_random_id_if_query_returns_no_actua Assert.Single(actual, x => x.JobIndex == 0 && x.Id != default && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.Data == data && x.ContentId != default), CancellationToken)) + A.That.Matches(x => x.Data == data && x.ContentId != default), A._)) .MustHaveHappenedOnceExactly(); } @@ -218,7 +221,7 @@ public async Task Should_upsert_content_if_id_defined() Assert.Single(actual, x => x.JobIndex == 0 && x.Id != default && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.Data == data && x.ContentId == id), CancellationToken)) + A.That.Matches(x => x.Data == data && x.ContentId == id), A._)) .MustHaveHappenedOnceExactly(); } @@ -237,7 +240,7 @@ public async Task Should_upsert_content_with_custom_id() Assert.Single(actual, x => x.JobIndex == 0 && x.Id != default && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.Data == data && x.ContentId == id), CancellationToken)) + A.That.Matches(x => x.Data == data && x.ContentId == id), A._)) .MustHaveHappenedOnceExactly(); } @@ -256,7 +259,7 @@ public async Task Should_create_content() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.ContentId == id && x.Data == data), CancellationToken)) + A.That.Matches(x => x.ContentId == id && x.Data == data), A._)) .MustHaveHappened(); } @@ -293,7 +296,7 @@ public async Task Should_update_content() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.ContentId == id && x.Data == data), CancellationToken)) + A.That.Matches(x => x.ContentId == id && x.Data == data), A._)) .MustHaveHappened(); } @@ -330,7 +333,7 @@ public async Task Should_patch_content() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.ContentId == id && x.Data == data), CancellationToken)) + A.That.Matches(x => x.ContentId == id && x.Data == data), A._)) .MustHaveHappened(); } @@ -367,7 +370,7 @@ public async Task Should_change_content_status() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.ContentId == id && x.DueTime == null), CancellationToken)) + A.That.Matches(x => x.ContentId == id && x.DueTime == null), A._)) .MustHaveHappened(); } @@ -386,7 +389,7 @@ public async Task Should_change_content_status_with_due_time() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.ContentId == id && x.DueTime == time), CancellationToken)) + A.That.Matches(x => x.ContentId == id && x.DueTime == time), A._)) .MustHaveHappened(); } @@ -423,7 +426,7 @@ public async Task Should_validate_content() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.ContentId == id), CancellationToken)) + A.That.Matches(x => x.ContentId == id), A._)) .MustHaveHappened(); } @@ -460,7 +463,7 @@ public async Task Should_delete_content() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.ContentId == id), CancellationToken)) + A.That.Matches(x => x.ContentId == id), A._)) .MustHaveHappened(); } @@ -487,7 +490,7 @@ public async Task Should_override_schema_name() { SetupContext(PermissionIds.AppContentsDeleteOwn); - A.CallTo(() => contentQuery.GetSchemaOrThrowAsync(A._, schemaCustomId.Name, CancellationToken)) + A.CallTo(() => contentQuery.GetSchemaOrThrowAsync(A._, schemaCustomId.Name, A._)) .Returns(Mocks.Schema(AppId, schemaCustomId)); var (id, _, _) = CreateTestData(false); @@ -500,7 +503,7 @@ public async Task Should_override_schema_name() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception == null); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.SchemaId == schemaCustomId), CancellationToken)) + A.That.Matches(x => x.SchemaId == schemaCustomId), A._)) .MustHaveHappened(); } @@ -522,7 +525,7 @@ public async Task Should_throw_exception_if_schema_name_not_defined() Assert.Single(actual, x => x.JobIndex == 0 && x.Id == id && x.Exception is DomainObjectNotFoundException); A.CallTo(() => commandBus.PublishAsync( - A.That.Matches(x => x.SchemaId == schemaCustomId), CancellationToken)) + A.That.Matches(x => x.SchemaId == schemaCustomId), A._)) .MustNotHaveHappened(); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/GraphQL/GraphQLTestBase.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/GraphQL/GraphQLTestBase.cs index 48d34bd2ca..6de067fb48 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/GraphQL/GraphQLTestBase.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/GraphQL/GraphQLTestBase.cs @@ -201,8 +201,8 @@ protected Context MatchsAssetContext() { return A.That.Matches(x => x.App == TestApp.Default && - x.ShouldSkipCleanup() && - x.ShouldSkipAssetEnrichment() && + x.NoCleanup() && + x.NoAssetEnrichment() && x.UserPrincipal == requestContext.UserPrincipal); } @@ -210,8 +210,8 @@ protected Context MatchsContentContext() { return A.That.Matches(x => x.App == TestApp.Default && - x.ShouldSkipCleanup() && - x.ShouldSkipContentEnrichment() && + x.NoCleanup() && + x.NoEnrichment() && x.UserPrincipal == requestContext.UserPrincipal); } } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ContentQueryParserTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ContentQueryParserTests.cs index 42ef1cf2d2..2cd11559f0 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ContentQueryParserTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ContentQueryParserTests.cs @@ -44,7 +44,7 @@ public ContentQueryParserTests() [Fact] public async Task Should_skip_total_if_set_in_context() { - var q = await sut.ParseAsync(ApiContext.Clone(b => b.WithoutTotal()), Q.Empty, ct: CancellationToken); + var q = await sut.ParseAsync(ApiContext.Clone(b => b.WithNoTotal()), Q.Empty, ct: CancellationToken); Assert.True(q.NoTotal); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/EnrichForCachingTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/EnrichForCachingTests.cs index d614404c4a..3a7a6b3458 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/EnrichForCachingTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/EnrichForCachingTests.cs @@ -67,7 +67,7 @@ public async Task Should_add_app_version_and_schema_as_dependency() [Fact] public async Task Should_not_add_cache_headers_if_disabled() { - await sut.EnrichAsync(ApiContext.Clone(b => b.WithoutCacheKeys()), CancellationToken); + await sut.EnrichAsync(ApiContext.Clone(b => b.WithNoCacheKeys()), CancellationToken); A.CallTo(() => requestCache.AddHeader(A._)) .MustNotHaveHappened(); @@ -78,7 +78,7 @@ public async Task Should_not_add_cache_headers_for_contents_if_disabled() { var content = CreateContent(); - await sut.EnrichAsync(ApiContext.Clone(b => b.WithoutCacheKeys()), Enumerable.Repeat(content, 1), SchemaProvider(), CancellationToken); + await sut.EnrichAsync(ApiContext.Clone(b => b.WithNoCacheKeys()), Enumerable.Repeat(content, 1), SchemaProvider(), CancellationToken); A.CallTo(() => requestCache.AddHeader(A._)) .MustNotHaveHappened(); diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveAssetsTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveAssetsTests.cs index 719098723c..0139fdc659 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveAssetsTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveAssetsTests.cs @@ -82,7 +82,7 @@ public async Task Should_add_assets_id_and_versions_as_dependency() }; A.CallTo(() => assetQuery.QueryAsync( - A.That.Matches(x => x.ShouldSkipAssetEnrichment() && x.ShouldSkipTotal()), null, A.That.HasIds(doc1.Id, doc2.Id), CancellationToken)) + A.That.Matches(x => x.NoAssetEnrichment() && x.NoTotal()), null, A.That.HasIds(doc1.Id, doc2.Id), CancellationToken)) .Returns(ResultList.CreateFrom(4, doc1, doc2)); await sut.EnrichAsync(FrontendContext, contents, schemaProvider, CancellationToken); @@ -114,7 +114,7 @@ public async Task Should_enrich_with_asset_urls() }; A.CallTo(() => assetQuery.QueryAsync( - A.That.Matches(x => x.ShouldSkipAssetEnrichment() && x.ShouldSkipTotal()), null, A.That.HasIds(doc1.Id, doc2.Id, img1.Id, img2.Id), CancellationToken)) + A.That.Matches(x => x.NoAssetEnrichment() && x.NoTotal()), null, A.That.HasIds(doc1.Id, doc2.Id, img1.Id, img2.Id), CancellationToken)) .Returns(ResultList.CreateFrom(4, img1, img2, doc1, doc2)); await sut.EnrichAsync(FrontendContext, contents, schemaProvider, CancellationToken); @@ -164,7 +164,7 @@ public async Task Should_not_enrich_references_if_disabled() CreateContent(new[] { DomainId.NewGuid() }, Array.Empty()) }; - await sut.EnrichAsync(FrontendContext.Clone(b => b.WithoutContentEnrichment(true)), contents, schemaProvider, CancellationToken); + await sut.EnrichAsync(FrontendContext.Clone(b => b.WithNoEnrichment(true)), contents, schemaProvider, CancellationToken); Assert.Null(contents[0].ReferenceData); @@ -204,7 +204,7 @@ public async Task Should_only_query_first_assets() Assert.NotNull(contents[0].ReferenceData); A.CallTo(() => assetQuery.QueryAsync( - A.That.Matches(x => x.ShouldSkipAssetEnrichment() && x.ShouldSkipTotal()), null, A.That.HasIds(id1), A._)) + A.That.Matches(x => x.NoAssetEnrichment() && x.NoTotal()), null, A.That.HasIds(id1), A._)) .MustHaveHappened(); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveReferencesTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveReferencesTests.cs index b235ec3ad2..99d529364a 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveReferencesTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ResolveReferencesTests.cs @@ -95,7 +95,7 @@ public async Task Should_add_referenced_id_and_as_dependency() }; A.CallTo(() => contentQuery.QueryAsync( - A.That.Matches(x => x.ShouldSkipContentEnrichment() && x.ShouldSkipTotal()), A.That.HasIds(ref1_1.Id, ref1_2.Id, ref2_1.Id, ref2_2.Id), A._)) + A.That.Matches(x => x.NoEnrichment() && x.NoTotal()), A.That.HasIds(ref1_1.Id, ref1_2.Id, ref2_1.Id, ref2_2.Id), A._)) .Returns(ResultList.CreateFrom(4, ref1_1, ref1_2, ref2_1, ref2_2)); await sut.EnrichAsync(FrontendContext, contents, schemaProvider, default); @@ -134,7 +134,7 @@ public async Task Should_enrich_with_reference_data() }; A.CallTo(() => contentQuery.QueryAsync( - A.That.Matches(x => x.ShouldSkipContentEnrichment() && x.ShouldSkipTotal()), A.That.HasIds(ref1_1.Id, ref1_2.Id, ref2_1.Id, ref2_2.Id), CancellationToken)) + A.That.Matches(x => x.NoEnrichment() && x.NoTotal()), A.That.HasIds(ref1_1.Id, ref1_2.Id, ref2_1.Id, ref2_2.Id), CancellationToken)) .Returns(ResultList.CreateFrom(4, ref1_1, ref1_2, ref2_1, ref2_2)); await sut.EnrichAsync(FrontendContext, contents, schemaProvider, CancellationToken); @@ -187,7 +187,7 @@ public async Task Should_not_enrich_if_content_has_more_items() }; A.CallTo(() => contentQuery.QueryAsync( - A.That.Matches(x => x.ShouldSkipContentEnrichment() && x.ShouldSkipTotal()), A.That.HasIds(ref1_1.Id, ref1_2.Id, ref2_1.Id, ref2_2.Id), CancellationToken)) + A.That.Matches(x => x.NoEnrichment() && x.NoTotal()), A.That.HasIds(ref1_1.Id, ref1_2.Id, ref2_1.Id, ref2_2.Id), CancellationToken)) .Returns(ResultList.CreateFrom(4, ref1_1, ref1_2, ref2_1, ref2_2)); await sut.EnrichAsync(FrontendContext, contents, schemaProvider, CancellationToken); @@ -249,7 +249,7 @@ public async Task Should_not_enrich_references_if_disabled() CreateContent(new[] { DomainId.NewGuid() }, Array.Empty()) }; - await sut.EnrichAsync(FrontendContext.Clone(b => b.WithoutContentEnrichment(true)), contents, schemaProvider, CancellationToken); + await sut.EnrichAsync(FrontendContext.Clone(b => b.WithNoEnrichment(true)), contents, schemaProvider, CancellationToken); Assert.Null(contents[0].ReferenceData); diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ScriptContentTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ScriptContentTests.cs index 12477f7e2b..4eac378534 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ScriptContentTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Queries/ScriptContentTests.cs @@ -181,7 +181,7 @@ private static ScriptOptions ScriptOptions() private Context ContextWithNoScript() { var contextPermission = PermissionIds.ForApp(PermissionIds.AppNoScripting, App.Name).Id; - var contextInstance = CreateContext(false, contextPermission).Clone(b => b.WithoutScripting()); + var contextInstance = CreateContext(false, contextPermission).Clone(b => b.WithNoScripting()); return contextInstance; } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleDequeuerWorkerTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleDequeuerWorkerTests.cs index 9a664748ae..4ce85534c6 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleDequeuerWorkerTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleDequeuerWorkerTests.cs @@ -63,7 +63,7 @@ public async Task Should_ignore_rule_service_exceptions_and_log() A.CallTo(() => ruleService.InvokeAsync(A._, A._, default)) .Throws(new InvalidOperationException()); - await sut.HandleAsync(@event); + await sut.HandleAsync(@event, default); A.CallTo(log).Where(x => x.Method.Name == "Log") .MustHaveHappened(); @@ -86,8 +86,8 @@ public async Task Should_not_execute_if_already_running() }); await Task.WhenAll( - sut.HandleAsync(event1), - sut.HandleAsync(event2)); + sut.HandleAsync(event1, default), + sut.HandleAsync(event2, default)); A.CallTo(() => ruleService.InvokeAsync(A._, A._, default)) .MustHaveHappenedOnceExactly(); @@ -115,7 +115,7 @@ public async Task Should_set_next_attempt_based_on_num_calls(int calls, int minu var now = clock.GetCurrentInstant(); - await sut.HandleAsync(@event); + await sut.HandleAsync(@event, default); if (actual == RuleResult.Failed) { diff --git a/backend/tests/Squidex.Infrastructure.Tests/CollectionExtensionsTests.cs b/backend/tests/Squidex.Infrastructure.Tests/CollectionExtensionsTests.cs index 6a46fc5017..c7eb256e22 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/CollectionExtensionsTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/CollectionExtensionsTests.cs @@ -332,4 +332,36 @@ public void Foreach_should_call_action_foreach_item_with_index() Assert.Equal(source, targetItems); } + + [Fact] + public void Should_batch() + { + var source = new[] { 1, 2, 3, 4, 5 }; + + var actual = source.Batch(2).ToArray(); + + actual.Should().BeEquivalentTo( + new[] + { + new List { 1, 2 }, + new List { 3, 4 }, + new List { 5 }, + }); + } + + [Fact] + public async Task Should_batch_async() + { + var source = new[] { 1, 2, 3, 4, 5 }; + + var actual = await source.ToAsyncEnumerable().Batch(2).ToArrayAsync(); + + actual.Should().BeEquivalentTo( + new[] + { + new List { 1, 2 }, + new List { 3, 4 }, + new List { 5 }, + }); + } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs index 15e00d853c..5ad4024996 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs @@ -11,6 +11,10 @@ namespace Squidex.Infrastructure.Reflection; public class SimpleMapperTests { +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter + public record struct ValueType(int Value); +#pragma warning restore SA1313 // Parameter names should begin with lower-case letter + public class Class1Base { public T1 P1 { get; set; } @@ -91,7 +95,6 @@ public void Should_map_to_convertible_type() { var obj1 = new Class1 { - P1 = 6, P2 = 8 }; var obj2 = SimpleMapper.Map(obj1, new Class2()); @@ -101,11 +104,10 @@ public void Should_map_to_convertible_type() } [Fact] - public void Should_map_from_nullable() + public void Should_map_from_convertible_nullable() { var obj1 = new Class1 { - P1 = 6, P2 = 8 }; var obj2 = SimpleMapper.Map(obj1, new Class2()); @@ -115,11 +117,23 @@ public void Should_map_from_nullable() } [Fact] - public void Should_map_to_nullable() + public void Should_map_from_nullable() + { + var obj1 = new Class1 + { + P2 = new ValueType(8) + }; + var obj2 = SimpleMapper.Map(obj1, new Class2()); + + Assert.Equal(new ValueType(8), obj2.P2); + Assert.Equal(new ValueType(0), obj2.P3); + } + + [Fact] + public void Should_map_to_convertible_nullable() { var obj1 = new Class1 { - P1 = 6, P2 = 8 }; var obj2 = SimpleMapper.Map(obj1, new Class2()); @@ -128,12 +142,24 @@ public void Should_map_to_nullable() Assert.Null(obj2.P3); } + [Fact] + public void Should_map_to_nullable() + { + var obj1 = new Class1 + { + P2 = new ValueType(8) + }; + var obj2 = SimpleMapper.Map(obj1, new Class2()); + + Assert.Equal(new ValueType(8), obj2.P2); + Assert.Null(obj2.P3); + } + [Fact] public void Should_map_if_convertible_is_null() { var obj1 = new Class1 { - P1 = null, P2 = null }; var obj2 = SimpleMapper.Map(obj1, new Class1()); @@ -147,7 +173,6 @@ public void Should_convert_to_string() { var obj1 = new Class1 { - P1 = RefToken.User("1"), P2 = RefToken.User("2") }; var obj2 = SimpleMapper.Map(obj1, new Class2()); @@ -161,7 +186,6 @@ public void Should_return_default_if_conversion_failed() { var obj1 = new Class1 { - P1 = long.MaxValue, P2 = long.MaxValue }; var obj2 = SimpleMapper.Map(obj1, new Class2()); @@ -171,19 +195,22 @@ public void Should_return_default_if_conversion_failed() } [Fact] - public void Should_ignore_write_only() + public void Should_ignore_read_only() { - var obj1 = new Writeonly(); - var obj2 = SimpleMapper.Map(obj1, new Class1()); + var obj1 = new Class1 + { + P1 = 10 + }; + var obj2 = SimpleMapper.Map(obj1, new Readonly()); Assert.Equal(0, obj2.P1); } [Fact] - public void Should_ignore_read_only() + public void Should_ignore_write_only() { - var obj1 = new Class1 { P1 = 10 }; - var obj2 = SimpleMapper.Map(obj1, new Readonly()); + var obj1 = new Writeonly(); + var obj2 = SimpleMapper.Map(obj1, new Class1()); Assert.Equal(0, obj2.P1); } diff --git a/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs index 1e01c432a4..3038e22efa 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs @@ -5,8 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Threading.Tasks.Dataflow; - namespace Squidex.Infrastructure.Tasks; public class PartitionedActionBlockTests @@ -23,31 +21,24 @@ public async Task Should_propagate_in_order() lists[i] = new List(); } - var block = new PartitionedActionBlock<(int P, int V)>(x => + var scheduler = new PartitionedScheduler<(int Partition, int Value)>((item, ct) => { Random.Shared.Next(10); - lists[x.P].Add(x.V); + lists[item.Partition].Add(item.Value); return Task.CompletedTask; - }, x => x.P, new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = 100, - MaxMessagesPerTask = 1, - BoundedCapacity = 100 - }); + }, 32, 10000); - for (var i = 0; i < Partitions; i++) + for (var partition = 0; partition < Partitions; partition++) { - for (var j = 0; j < 10; j++) + for (var value = 0; value < 10; value++) { - await block.SendAsync((i, j)); + await scheduler.ScheduleAsync(partition, (partition, value)); } } - block.Complete(); - - await block.Completion; + await scheduler.CompleteAsync(); foreach (var list in lists) {