Skip to content

Commit

Permalink
change back to fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Neil South <[email protected]>
  • Loading branch information
neildsouth committed Jul 25, 2024
1 parent f55ef05 commit 883af94
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,14 @@ internal sealed partial class PayloadAssembler : IPayloadAssembler, IDisposable,
private readonly Task _intializedTask;
private readonly BlockingCollection<Payload> _workItems;
private readonly System.Timers.Timer _timer;
private readonly IPayloadRepository _repository;
//private readonly IPayloadRepository _repository;

public PayloadAssembler(
ILogger<PayloadAssembler> logger,
IServiceScopeFactory serviceScopeFactory)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
var scope = _serviceScopeFactory.CreateScope();
_repository = scope.ServiceProvider.GetRequiredService<IPayloadRepository>();

_workItems = [];
_tokenSource = new CancellationTokenSource();
Expand All @@ -85,7 +83,9 @@ public PayloadAssembler(
private async Task RemovePendingPayloads()
{
_logger.RemovingPendingPayloads();
var removed = await _repository.RemovePendingPayloadsAsync().ConfigureAwait(false);
var scope = _serviceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IPayloadRepository>();
var removed = await repository.RemovePendingPayloadsAsync().ConfigureAwait(false);

_logger.TotalNumberOfPayloadsRemoved(removed);
}
Expand Down Expand Up @@ -200,7 +200,9 @@ private async Task QueueBucketForNotification(string key, Payload payload)
{
payload.State = Payload.PayloadState.Move;
var scope = _serviceScopeFactory.CreateScope();
await _repository.UpdateAsync(payload).ConfigureAwait(false);

var repository = scope.ServiceProvider.GetRequiredService<IPayloadRepository>();
await repository.UpdateAsync(payload).ConfigureAwait(false);
_logger.PayloadSaved(payload.PayloadId);
_workItems.Add(payload);
_logger.BucketReady(key, payload.Count);
Expand All @@ -227,7 +229,9 @@ private async Task<Payload> CreateOrGetPayload(string key, string correlationId,
private async Task<Payload> PayloadFactory(string key, string correlationId, string? workflowInstanceId, string? taskId, Messaging.Events.DataOrigin dataOrigin, uint timeout, CancellationToken cancellationToken)
{
var newPayload = new Payload(key, correlationId, workflowInstanceId, taskId, dataOrigin, timeout, null);
await _repository.AddAsync(newPayload, cancellationToken).ConfigureAwait(false);
var scope = _serviceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IPayloadRepository>();
await repository.AddAsync(newPayload, cancellationToken).ConfigureAwait(false);
_logger.BucketCreated(key, timeout);
return newPayload;
}
Expand Down

0 comments on commit 883af94

Please sign in to comment.