From 1341116bb0a62644aee46c1a67949881fe706016 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 21 Dec 2023 13:49:19 +0000 Subject: [PATCH 1/4] fix artifact recieved for HL7 Signed-off-by: Neil South --- .../Repositories/ArtifactsRepository.cs | 2 +- .../Services/WorkflowExecuterService.cs | 55 +++++++++++++++---- .../Services/WorkflowExecuterServiceTests.cs | 37 ------------- 3 files changed, 45 insertions(+), 49 deletions(-) diff --git a/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs b/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs index 956e8b656..b3ecb4378 100644 --- a/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs +++ b/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs @@ -183,7 +183,7 @@ public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId, } else { - item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList(); + item.Artifacts = item.Artifacts.Union(existing.Artifacts).ToList(); var update = Builders.Update.Set(a => a.Artifacts, item.Artifacts); await _artifactReceivedItemsCollection .UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index f7fc03b5c..99d73aea5 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -251,7 +251,7 @@ await _artifactsRepository return true; } - private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId) + private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject taskTemplate, string taskId) { var artifactList = message.Artifacts.Select(a => $"{a.Path}").ToList(); var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary(); @@ -263,22 +263,40 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{m.Path}").Value).ToList(); + var addedNew = false; var validArtifacts = new Dictionary(); foreach (var artifact in messageArtifactsInStorage) { - var match = task.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type); + var match = taskTemplate.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type); if (match is not null && validArtifacts.ContainsKey(match!.Name) is false) { validArtifacts.Add(match.Name, $"{artifact.Path}"); + } } var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId); - currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes - _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); - await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + foreach (var artifact in validArtifacts) + { + if (currentTask?.OutputArtifacts.ContainsKey(artifact.Key) is false) + { + // adding the actual paths here, the parent function does the saving of the changes + currentTask?.OutputArtifacts.Add(artifact.Key, artifact.Value); + addedNew = true; + } + } + + //if (addedNew) + //{ + // _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); + // await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + //} + if (currentTask is not null && addedNew) + { + await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask); + } } private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, @@ -612,7 +630,12 @@ private async Task ExternalAppRequest(ExternalAppRequestEvent externalAppR return true; } - private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List? plugins = null) + private async Task HandleDicomExportAsync( + WorkflowRevision workflow, + WorkflowInstance workflowInstance, + TaskExecution task, + string correlationId, + List? plugins = null) { plugins ??= new List(); var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); @@ -629,7 +652,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); } - private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) + private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts( + WorkflowRevision workflow, + WorkflowInstance workflowInstance, + TaskExecution task, + string correlationId, + bool enforceDcmOnly = true) { var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray(); if (exportList is null || !exportList.Any()) @@ -637,7 +665,7 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns exportList = null; } - var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId); + var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId, enforceDcmOnly); if (artifactValues.IsNullOrEmpty()) { @@ -646,7 +674,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns return (exportList, artifactValues); } - private async Task GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId) + private async Task GetArtifactValues( + WorkflowRevision workflow, WorkflowInstance workflowInstance, + TaskExecution task, + string[]? exportList, + string correlationId, + bool enforceDcmOnly = true) { var artifactValues = GetDicomExports(workflow, task, exportList); @@ -660,7 +693,7 @@ private async Task GetArtifactValues(WorkflowRevision workflow, Workfl artifact, true); - var dcmFiles = objects?.Where(o => o.IsValidDicomFile())?.ToList(); + var dcmFiles = objects?.Where(o => o.IsValidDicomFile() || enforceDcmOnly is false)?.ToList(); if (dcmFiles?.IsNullOrEmpty() is false) { @@ -681,7 +714,7 @@ private async Task GetArtifactValues(WorkflowRevision workflow, Workfl private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) { - var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); + var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId, false); if (exportList is null || artifactValues is null) { diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index c849fb622..58fdc5479 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -3184,44 +3184,7 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue() Assert.True(result); } - [Fact] - public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync() - { - var artifactPath = "some path here"; - //incoming artifacts - var message = new ArtifactsReceivedEvent - { - WorkflowInstanceId = "123", TaskId = "456", - Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } } - }; - var workflowInstance = new WorkflowInstance - { - WorkflowId = "789", Tasks = new List() - { new TaskExecution() { TaskId = "456" } } - }; - _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! - .ReturnsAsync(workflowInstance); - //expected artifacts - var templateArtifacts = new OutputArtifact[] - { - new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"}, - }; - var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } }; - var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } }; - _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! - .ReturnsAsync(workflowTemplate); - - _storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .ReturnsAsync(new Dictionary { { $"{message.PayloadId}/{artifactPath}", true } }); - //previously received artifacts - _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) - .ReturnsAsync((List?)null); - - var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); - - _workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once()); - } [Fact] public async Task ProcessPayload_WithExportTask_NoExportsFails() { From f9222e6a0c404a2d24581ac8f450993dab6c5a20 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 21 Dec 2023 15:57:24 +0000 Subject: [PATCH 2/4] fix for hl7Export complete Signed-off-by: Neil South --- .../WorkflowExecuter/Services/WorkflowExecuterService.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 99d73aea5..9dada9849 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -529,9 +529,13 @@ public async Task ProcessExportComplete(ExportCompleteEvent message, strin return false; } - if (string.Compare(task.TaskType, ValidationConstants.ExportTaskType, true) == 0) + switch (task.TaskType) { - return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId); + case ValidationConstants.ExportTaskType: + case ValidationConstants.HL7ExportTask: + return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId); + default: + break; } } From a400871d1a54bc67650c7c6c91175ff3f0b9c860 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 21 Dec 2023 16:29:12 +0000 Subject: [PATCH 3/4] fiup constant and test data Signed-off-by: Neil South --- .../Services/WorkflowExecuterService.cs | 4 ++-- .../TestData/WorkflowInstanceTestData.cs | 3 ++- .../TestData/WorkflowRevisionTestData.cs | 13 +++++++------ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 9dada9849..ed77f6c26 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -531,8 +531,8 @@ public async Task ProcessExportComplete(ExportCompleteEvent message, strin switch (task.TaskType) { - case ValidationConstants.ExportTaskType: - case ValidationConstants.HL7ExportTask: + case TaskTypeConstants.DicomExportTask: + case TaskTypeConstants.HL7ExportTask: return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId); default: break; diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs index 3fbe55c9f..3053b7985 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs @@ -15,6 +15,7 @@ */ using Monai.Deploy.Messaging.Events; +using Monai.Deploy.WorkflowManager.Common.Contracts.Constants; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; using Monai.Deploy.WorkflowManager.Common.IntegrationTests.POCO; #pragma warning disable CS8602 // Dereference of a possibly null reference. @@ -209,7 +210,7 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName) ExecutionId = Guid.NewGuid().ToString(), TaskId = "7d7c8b83-6628-413c-9912-a89314e5e2d5", OutputDirectory = "payloadId/workflows/workflowInstanceId/executionId/", - TaskType = "Export", + TaskType = TaskTypeConstants.DicomExportTask, Status = TaskExecutionStatus.Dispatched } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs index 7b0749d07..875718423 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs @@ -15,6 +15,7 @@ */ using Monai.Deploy.Messaging.Common; +using Monai.Deploy.WorkflowManager.Common.Contracts.Constants; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; using Artifact = Monai.Deploy.WorkflowManager.Common.Contracts.Models.Artifact; // ReSharper disable ArrangeObjectCreationWhenTypeEvident @@ -2174,7 +2175,7 @@ public static class WorkflowRevisionsTestData new TaskObject { Id = "export_task_1", - Type = "Export", + Type = TaskTypeConstants.DicomExportTask, Description = "Export Workflow 1 Task 2", ExportDestinations = new ExportDestination[] { @@ -2235,7 +2236,7 @@ public static class WorkflowRevisionsTestData new TaskObject { Id = "export_task_1", - Type = "Export", + Type = TaskTypeConstants.DicomExportTask, Description = "Export Workflow 1 Task 2", ExportDestinations = new ExportDestination[] { @@ -2296,7 +2297,7 @@ public static class WorkflowRevisionsTestData new TaskObject { Id = "export_task_1", - Type = "Export", + Type = TaskTypeConstants.DicomExportTask, Description = "Export Workflow 1 Task 2", ExportDestinations = new ExportDestination[] { @@ -2358,7 +2359,7 @@ public static class WorkflowRevisionsTestData new TaskObject { Id = "export_task_1", - Type = "Export", + Type = TaskTypeConstants.DicomExportTask, Description = "Export Workflow 1 Task 2", ExportDestinations = new ExportDestination[] { @@ -2375,7 +2376,7 @@ public static class WorkflowRevisionsTestData new TaskObject { Id = "export_task_2", - Type = "Export", + Type = TaskTypeConstants.DicomExportTask, Description = "Export Workflow 1 Task 3", ExportDestinations = new ExportDestination[] { @@ -2437,7 +2438,7 @@ public static class WorkflowRevisionsTestData new TaskObject { Id = "export_task_1", - Type = "Export", + Type = TaskTypeConstants.DicomExportTask, Description = "Export Workflow 1 Task 2", ExportDestinations = new ExportDestination[] { From 708ceb27a156a6f73e012e4445349e3e151837d3 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 21 Dec 2023 16:31:17 +0000 Subject: [PATCH 4/4] remove some unused lines Signed-off-by: Neil South --- .../WorkflowExecuter/Services/WorkflowExecuterService.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index ed77f6c26..14d8129f7 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -288,13 +288,9 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message } } - //if (addedNew) - //{ - // _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); - // await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); - //} if (currentTask is not null && addedNew) { + _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask); } }