Skip to content

Commit

Permalink
add storage of workflow name in payload
Browse files Browse the repository at this point in the history
Signed-off-by: Neil South <[email protected]>
  • Loading branch information
neildsouth committed May 23, 2024
1 parent 4eb56e4 commit 5187476
Show file tree
Hide file tree
Showing 13 changed files with 22 additions and 66 deletions.
1 change: 0 additions & 1 deletion src/TaskManager/Plug-ins/Argo/ArgoClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
using System.Globalization;
using System.Text;
using Argo;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging;
using System.Net;
Expand Down
1 change: 0 additions & 1 deletion src/TaskManager/Plug-ins/Email/EmailPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


using System.Net.Mail;
using Ardalis.GuardClauses;
using FellowOakDicom;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down
9 changes: 1 addition & 8 deletions src/WorkflowManager/Common/Interfaces/IPayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ Task<IList<PayloadDto>> GetAllAsync(int? skip = null,
/// <param name="payloadId">payload id to delete.</param>
Task<bool> DeletePayloadFromStorageAsync(string payloadId);

/// <summary>
/// Updates a payload
/// </summary>
/// <param name="payload"></param>
/// <returns></returns>
Task<bool> UpdateWorkflowInstanceIdsAsync(string payloadId, IEnumerable<string> workflowInstances);

/// <summary>
/// Gets the expiry date for a payload.
/// </summary>
Expand All @@ -69,6 +62,6 @@ Task<IList<PayloadDto>> GetAllAsync(int? skip = null,
/// <param name="payloadId">payload id to update.</param>
/// <param name="payload">updated payload.</param>
/// <returns>true if the update is successful, false otherwise.</returns>
Task<bool> UpdateAsync(Payload payload);
Task<bool> UpdateAsyncWorkflowIds(Payload payload);
}
}
19 changes: 6 additions & 13 deletions src/WorkflowManager/Common/Services/PayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public async Task<bool> DeletePayloadFromStorageAsync(string payloadId)

// update the payload to in progress before we request deletion from storage
payload.PayloadDeleted = PayloadDeleted.InProgress;
await _payloadRepository.UpdateAsync(payload);
await _payloadRepository.UpdateAsyncWorkflowIds(payload);

// run deletion in alternative thread so the user isn't held up
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Expand All @@ -238,26 +238,19 @@ public async Task<bool> DeletePayloadFromStorageAsync(string payloadId)
}
finally
{
await _payloadRepository.UpdateAsync(payload);
await _payloadRepository.UpdateAsyncWorkflowIds(payload);
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

return true;
}

public async Task<bool> UpdateWorkflowInstanceIdsAsync(string payloadId, IEnumerable<string> workflowInstances)
public Task<bool> UpdateAsyncWorkflowIds(Payload payload)
{
if (await _payloadRepository.UpdateAssociatedWorkflowInstancesAsync(payloadId, workflowInstances))
{
_logger.PayloadUpdated(payloadId);
return true;
}
else
{
_logger.PayloadUpdateFailed(payloadId);
return false;
}
ArgumentNullException.ThrowIfNull(payload, nameof(payload));

return _payloadRepository.UpdateAsyncWorkflowIds(payload);
}

public Task<bool> UpdateAsync(Payload payload)
Expand Down
1 change: 0 additions & 1 deletion src/WorkflowManager/Common/Services/WorkflowService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Monai.Deploy.WorkflowManager.Common.Miscellaneous.Interfaces;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
Expand Down
10 changes: 1 addition & 9 deletions src/WorkflowManager/Database/Interfaces/IPayloadRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,7 @@ public interface IPayloadRepository
/// </summary>
/// <param name="payload">The payload to update.</param>
/// <returns>The updated payload.</returns>
Task<bool> UpdateAsync(Payload payload);

/// <summary>
/// Updates a payload in the database.
/// </summary>
/// <param name="payloadId"></param>
/// <param name="workflowInstances"></param>
/// <returns></returns>
Task<bool> UpdateAssociatedWorkflowInstancesAsync(string payloadId, IEnumerable<string> workflowInstances);
Task<bool> UpdateAsyncWorkflowIds(Payload payload);

/// <summary>
/// Gets all the payloads that might need deleted
Expand Down
27 changes: 6 additions & 21 deletions src/WorkflowManager/Database/Repositories/PayloadRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
Expand Down Expand Up @@ -123,7 +122,7 @@ public async Task<Payload> GetByIdAsync(string payloadId)
return payload;
}

public async Task<bool> UpdateAsync(Payload payload)
public async Task<bool> UpdateAsyncWorkflowIds(Payload payload)
{
ArgumentNullException.ThrowIfNull(payload, nameof(payload));

Expand All @@ -132,31 +131,17 @@ public async Task<bool> UpdateAsync(Payload payload)
var filter = Builders<Payload>.Filter.Eq(p => p.PayloadId, payload.PayloadId);
await _payloadCollection.ReplaceOneAsync(filter, payload);

return true;
}
catch (Exception ex)
{
_logger.DbUpdatePayloadError(payload.PayloadId, ex);
return false;
}
}

public async Task<bool> UpdateAssociatedWorkflowInstancesAsync(string payloadId, IEnumerable<string> workflowInstances)
{
Guard.Against.NullOrEmpty(workflowInstances, nameof(workflowInstances));
ArgumentNullException.ThrowIfNullOrWhiteSpace(payloadId, nameof(payloadId));

try
{
await _payloadCollection.FindOneAndUpdateAsync(
i => i.Id == payloadId,
Builders<Payload>.Update.Set(p => p.WorkflowInstanceIds, workflowInstances));
i => i.Id == payload.Id,
Builders<Payload>.Update
.Set(p => p.TriggeredWorkflowNames, payload.TriggeredWorkflowNames)
.Set(p => p.WorkflowInstanceIds, payload.WorkflowInstanceIds));

return true;
}
catch (Exception ex)
{
_logger.DbUpdateWorkflowInstanceError(ex);
_logger.DbUpdatePayloadError(payload.PayloadId, ex);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ public async Task ReceiveWorkflowPayload(MessageReceivedEventArgs message)

if (string.IsNullOrWhiteSpace(string.Join("", payload.TriggeredWorkflowNames)) is false)
{
await PayloadService.UpdateAsync(payload);
await PayloadService.UpdateAsyncWorkflowIds(payload);
}

}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Monai.Deploy.Storage.API;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ public async Task<bool> ProcessPayload(WorkflowRequestEvent message, Payload pay
if (workflowInstances.Count != 0)
{
processed &= await _workflowInstanceRepository.CreateAsync(workflowInstances);

var workflowInstanceIds = workflowInstances.Select(workflowInstance => workflowInstance.Id);
await _payloadService.UpdateWorkflowInstanceIdsAsync(payload.Id, workflowInstanceIds).ConfigureAwait(false);
}

workflowInstances.AddRange(existingInstances.Where(e => e.PayloadId == message.PayloadId.ToString()));
Expand All @@ -180,7 +177,7 @@ public async Task<bool> ProcessPayload(WorkflowRequestEvent message, Payload pay
{
await ProcessFirstWorkflowTask(workflowInstance, message.CorrelationId, payload);
}
payload.WorkflowInstanceIds = workflowInstances.Select(w => w.WorkflowId).ToList();
payload.WorkflowInstanceIds = workflowInstances.Select(w => w.Id).ToList();
payload.TriggeredWorkflowNames = workflowInstances.Select(w => w.WorkflowName).ToList();

return true;
Expand Down Expand Up @@ -1171,7 +1168,7 @@ private static WorkflowInstance MakeInstance(WorkflowRequestEvent message, Workf
{
Id = workflowInstanceId,
WorkflowId = workflow.WorkflowId,
WorkflowName = workflow.Workflow.Name,
WorkflowName = workflow.Workflow?.Name ?? "",
PayloadId = message.PayloadId.ToString(),
StartTime = DateTime.UtcNow,
Status = Status.Created,
Expand Down
4 changes: 2 additions & 2 deletions tests/UnitTests/Common.Tests/Services/PayloadServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public async Task DeletePayloadFromStorageAsync_ReturnsTrue()
var payloadId = Guid.NewGuid().ToString();

_payloadRepository.Setup(p => p.GetByIdAsync(It.IsAny<string>())).ReturnsAsync(() => new Payload());
_payloadRepository.Setup(p => p.UpdateAsync(It.IsAny<Payload>())).ReturnsAsync(() => true);
_payloadRepository.Setup(p => p.UpdateAsyncWorkflowIds(It.IsAny<Payload>())).ReturnsAsync(() => true);
_workflowInstanceRepository.Setup(r => r.GetByPayloadIdsAsync(It.IsAny<List<string>>())).ReturnsAsync(() => new List<WorkflowInstance>());

_storageService.Setup(s => s.RemoveObjectsAsync(It.IsAny<string>(), It.IsAny<IEnumerable<string>>(), default));
Expand Down Expand Up @@ -374,7 +374,7 @@ public async Task DeletePayloadFromStorageAsync_ThrowsMonaiBadRequestExceptionWh
var payloadId = Guid.NewGuid().ToString();

_payloadRepository.Setup(p => p.GetByIdAsync(It.IsAny<string>())).ReturnsAsync(() => new Payload());
_payloadRepository.Setup(p => p.UpdateAsync(It.IsAny<Payload>())).ReturnsAsync(() => true);
_payloadRepository.Setup(p => p.UpdateAsyncWorkflowIds(It.IsAny<Payload>())).ReturnsAsync(() => true);
_workflowInstanceRepository.Setup(r => r.GetByPayloadIdsAsync(It.IsAny<List<string>>())).ReturnsAsync(() => new List<WorkflowInstance>
{
new WorkflowInstance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3779,7 +3779,7 @@ public async Task ProcessTaskUpdate_ValidTaskUpdateEventWith_Same_Status_returns

_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny<Message>()), Times.Exactly(0));

_logger.Verify(logger => logger.IsEnabled(LogLevel.Trace),Times.Once);
_logger.Verify(logger => logger.IsEnabled(LogLevel.Trace), Times.Once);

response.Should().BeTrue();
}
Expand Down Expand Up @@ -4154,7 +4154,6 @@ public async Task ProcessPayload_Payload_Should_Include_triggered_workflow_names


Assert.Contains(workflows[0].Workflow!.Name, payload.TriggeredWorkflowNames);
Assert.Contains(workflows[0].WorkflowId, payload.WorkflowInstanceIds);
Assert.True(result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public async Task GetDailyStatsAsync_ReturnsList()
var objectResult = Assert.IsType<OkObjectResult>(result);

var responseValue = (StatsPagedResponse<IEnumerable<ExecutionStatDayOverview>>)objectResult.Value;
responseValue.Data.First().Date.Should().Be(DateOnly.FromDateTime( _startTime));
responseValue.Data.First().Date.Should().Be(DateOnly.FromDateTime(_startTime));
responseValue.FirstPage.Should().Be("unitTest");
responseValue.LastPage.Should().Be("unitTest");
responseValue.PageNumber.Should().Be(1);
Expand Down

0 comments on commit 5187476

Please sign in to comment.