From 95f8dd7f36ee15c2b7edf304a78c21102f729223 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 24 Nov 2023 13:20:31 +0000 Subject: [PATCH 1/8] adding hl7 export Signed-off-by: Neil South --- .../MessageBrokerConfigurationKeys.cs | 14 +++ ...orkflowManager.Common.Configuration.csproj | 5 +- src/Common/Configuration/packages.lock.json | 21 ++-- .../Miscellaneous/ValidationConstants.cs | 8 +- src/Common/Miscellaneous/packages.lock.json | 22 ++-- src/Monai.Deploy.WorkflowManager.sln | 12 -- ...loy.WorkflowManager.TaskManager.API.csproj | 5 +- src/TaskManager/API/packages.lock.json | 12 ++ src/TaskManager/Database/packages.lock.json | 13 ++- .../AideClinicalReview/packages.lock.json | 24 ++-- .../Plug-ins/Argo/packages.lock.json | 24 ++-- ....Deploy.WorkflowManager.TaskManager.csproj | 8 +- src/TaskManager/TaskManager/appsettings.json | 1 + .../TaskManager/packages.lock.json | 33 +++--- .../Contracts/Constants/TaskTypeConstants.cs | 4 +- ...ai.Deploy.WorkflowManager.Contracts.csproj | 5 +- .../Database/packages.lock.json | 22 ++-- .../Logging/packages.lock.json | 22 ++-- .../PayloadListener/packages.lock.json | 25 ++-- .../Services/packages.lock.json | 24 ++-- .../Storage/packages.lock.json | 22 ++-- ...oy.WorkloadManager.WorkflowExecuter.csproj | 5 + .../Services/WorkflowExecuterService.cs | 107 ++++++++++++++---- .../WorkflowExecuter/packages.lock.json | 25 ++-- .../Monai.Deploy.WorkflowManager.csproj | 2 +- .../Validators/WorkflowValidator.cs | 20 ++++ .../WorkflowManager/packages.lock.json | 44 +++---- ...anager.TaskManager.IntegrationTests.csproj | 4 +- ...r.WorkflowExecutor.IntegrationTests.csproj | 8 +- ...y.WorkflowManager.TaskManager.Tests.csproj | 1 + .../WorkflowManager.Tests/packages.lock.json | 45 ++++---- 31 files changed, 368 insertions(+), 219 deletions(-) mode change 100755 => 100644 src/TaskManager/API/packages.lock.json mode change 100755 => 100644 src/TaskManager/Database/packages.lock.json mode change 100755 => 100644 src/WorkflowManager/Database/packages.lock.json mode change 100755 => 100644 src/WorkflowManager/Logging/packages.lock.json mode change 100755 => 100644 src/WorkflowManager/Storage/packages.lock.json mode change 100755 => 100644 src/WorkflowManager/WorkflowManager/packages.lock.json mode change 100755 => 100644 tests/UnitTests/WorkflowManager.Tests/packages.lock.json diff --git a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs index 886497f27..f5742f471 100644 --- a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs +++ b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs @@ -91,5 +91,19 @@ public class MessageBrokerConfigurationKeys [ConfigurationKeyName("artifactrecieved")] public string ArtifactRecieved { get; set; } = "md.workflow.artifactrecieved"; + + /// + /// Gets or sets the topic for publishing export requests. + /// Defaults to `md_export_request`. + /// + [ConfigurationKeyName("externalAppRequest")] + public string ExternalAppRequest { get; set; } = "md.externalapp.request"; + + /// + /// Gets or sets the topic for publishing workflow requests. + /// Defaults to `md.export.request`. + /// + [ConfigurationKeyName("exportHl7")] + public string ExportHL7 { get; set; } = "md.export.hl7"; } } diff --git a/src/Common/Configuration/Monai.Deploy.WorkflowManager.Common.Configuration.csproj b/src/Common/Configuration/Monai.Deploy.WorkflowManager.Common.Configuration.csproj index 7106c58c9..c0067ebc9 100644 --- a/src/Common/Configuration/Monai.Deploy.WorkflowManager.Common.Configuration.csproj +++ b/src/Common/Configuration/Monai.Deploy.WorkflowManager.Common.Configuration.csproj @@ -31,6 +31,7 @@ + @@ -44,10 +45,6 @@ - - - - true true diff --git a/src/Common/Configuration/packages.lock.json b/src/Common/Configuration/packages.lock.json index 77e5c324d..2e09a73d8 100644 --- a/src/Common/Configuration/packages.lock.json +++ b/src/Common/Configuration/packages.lock.json @@ -2,6 +2,18 @@ "version": 1, "dependencies": { "net6.0": { + "Monai.Deploy.Messaging": { + "type": "Direct", + "requested": "[1.0.5, )", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage": { "type": "Direct", "requested": "[0.2.18, )", @@ -124,15 +136,6 @@ "type": "Transitive", "resolved": "6.0.0", "contentHash": "/iUeP3tq1S0XdNNoMz5C9twLSrM/TH+qElHkXWaPvuNOt+99G75NrV0OS2EqHx5wMN7popYjpc8oTjC1y16DLg==" - }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } } } } diff --git a/src/Common/Miscellaneous/ValidationConstants.cs b/src/Common/Miscellaneous/ValidationConstants.cs index 44e323d13..eb6fabc04 100644 --- a/src/Common/Miscellaneous/ValidationConstants.cs +++ b/src/Common/Miscellaneous/ValidationConstants.cs @@ -118,6 +118,11 @@ public enum NotificationValues /// public const string ExternalAppTaskType = "remote_app_execution"; + /// + /// Key for Hl7 export task type. + /// + public const string HL7ExportTask = "export_hl7"; + /// /// Key for the export task type. /// @@ -141,7 +146,8 @@ public enum NotificationValues ExportTaskType, DockerTaskType, Email, - ExternalAppTaskType + ExternalAppTaskType, + HL7ExportTask }; } } diff --git a/src/Common/Miscellaneous/packages.lock.json b/src/Common/Miscellaneous/packages.lock.json index 7237cb1fa..034617ee8 100644 --- a/src/Common/Miscellaneous/packages.lock.json +++ b/src/Common/Miscellaneous/packages.lock.json @@ -144,6 +144,17 @@ "System.Runtime.CompilerServices.Unsafe": "6.0.0" } }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", @@ -222,19 +233,10 @@ "resolved": "6.0.0", "contentHash": "TY8/9+tI0mNaUMgntOxxaq2ndTkdXqLSxvPmas7XEqOlv9lQtB7wLjYGd756lOaO7Dvb5r/WXhluM+0Xe87v5Q==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } } diff --git a/src/Monai.Deploy.WorkflowManager.sln b/src/Monai.Deploy.WorkflowManager.sln index c64642007..fbc226aa9 100644 --- a/src/Monai.Deploy.WorkflowManager.sln +++ b/src/Monai.Deploy.WorkflowManager.sln @@ -90,10 +90,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Monai.Deploy.WorkflowManage EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Monai.Deploy.WorkflowManager.Services", "WorkflowManager\Services\Monai.Deploy.WorkflowManager.Services.csproj", "{76A9FF94-862D-43E8-A0A7-562DD1DDDB3B}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Monai.Deploy.Messaging", "..\..\monai-deploy-messaging\src\Messaging\Monai.Deploy.Messaging.csproj", "{03923799-9ACE-4527-8DDA-EB18978EFE96}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Monai.Deploy.Messaging.RabbitMQ", "..\..\monai-deploy-messaging\src\Plugins\RabbitMQ\Monai.Deploy.Messaging.RabbitMQ.csproj", "{B32E6BBD-12F0-4723-BD56-CF89CCF95AA7}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -256,14 +252,6 @@ Global {76A9FF94-862D-43E8-A0A7-562DD1DDDB3B}.Debug|Any CPU.Build.0 = Debug|Any CPU {76A9FF94-862D-43E8-A0A7-562DD1DDDB3B}.Release|Any CPU.ActiveCfg = Release|Any CPU {76A9FF94-862D-43E8-A0A7-562DD1DDDB3B}.Release|Any CPU.Build.0 = Release|Any CPU - {03923799-9ACE-4527-8DDA-EB18978EFE96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {03923799-9ACE-4527-8DDA-EB18978EFE96}.Debug|Any CPU.Build.0 = Debug|Any CPU - {03923799-9ACE-4527-8DDA-EB18978EFE96}.Release|Any CPU.ActiveCfg = Release|Any CPU - {03923799-9ACE-4527-8DDA-EB18978EFE96}.Release|Any CPU.Build.0 = Release|Any CPU - {B32E6BBD-12F0-4723-BD56-CF89CCF95AA7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {B32E6BBD-12F0-4723-BD56-CF89CCF95AA7}.Debug|Any CPU.Build.0 = Debug|Any CPU - {B32E6BBD-12F0-4723-BD56-CF89CCF95AA7}.Release|Any CPU.ActiveCfg = Release|Any CPU - {B32E6BBD-12F0-4723-BD56-CF89CCF95AA7}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj b/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj index d66166cae..00762e71f 100644 --- a/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj +++ b/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj @@ -39,11 +39,10 @@ + - - - + diff --git a/src/TaskManager/API/packages.lock.json b/src/TaskManager/API/packages.lock.json old mode 100755 new mode 100644 index 0031bc500..40473d9ab --- a/src/TaskManager/API/packages.lock.json +++ b/src/TaskManager/API/packages.lock.json @@ -2,6 +2,18 @@ "version": 1, "dependencies": { "net6.0": { + "Monai.Deploy.Messaging": { + "type": "Direct", + "requested": "[1.0.5, )", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Mongo.Migration": { "type": "Direct", "requested": "[3.1.4, )", diff --git a/src/TaskManager/Database/packages.lock.json b/src/TaskManager/Database/packages.lock.json old mode 100755 new mode 100644 index 146ad8458..adc174dc0 --- a/src/TaskManager/Database/packages.lock.json +++ b/src/TaskManager/Database/packages.lock.json @@ -245,6 +245,17 @@ "System.Security.Principal.Windows": "5.0.0" } }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Mongo.Migration": { "type": "Transitive", "resolved": "3.1.4", @@ -682,7 +693,7 @@ "monai.deploy.workflowmanager.taskmanager.api": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/TaskManager/Plug-ins/AideClinicalReview/packages.lock.json b/src/TaskManager/Plug-ins/AideClinicalReview/packages.lock.json index e316db0c3..03525f8d6 100644 --- a/src/TaskManager/Plug-ins/AideClinicalReview/packages.lock.json +++ b/src/TaskManager/Plug-ins/AideClinicalReview/packages.lock.json @@ -256,6 +256,17 @@ "resolved": "1.1.0", "contentHash": "aOZA3BWfz9RXjpzt0sRJJMjAscAUm3Hoa4UWAfceV9UTYxgwZ1lZt5nO2myFf+/jetYQo4uTP7zS8sJY67BBxg==" }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", @@ -719,19 +730,10 @@ "resolved": "4.5.0", "contentHash": "okurQJO6NRE/apDIP23ajJ0hpiNmJ+f0BwOlB/cSqTLQlw5upkf+5+96+iG2Jw40G1fCVCyPz/FhIABUjMR+RQ==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, @@ -745,7 +747,7 @@ "monai.deploy.workflowmanager.taskmanager.api": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/TaskManager/Plug-ins/Argo/packages.lock.json b/src/TaskManager/Plug-ins/Argo/packages.lock.json index b6dfa2240..97b297ed0 100644 --- a/src/TaskManager/Plug-ins/Argo/packages.lock.json +++ b/src/TaskManager/Plug-ins/Argo/packages.lock.json @@ -366,6 +366,17 @@ "resolved": "1.1.0", "contentHash": "aOZA3BWfz9RXjpzt0sRJJMjAscAUm3Hoa4UWAfceV9UTYxgwZ1lZt5nO2myFf+/jetYQo4uTP7zS8sJY67BBxg==" }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", @@ -857,19 +868,10 @@ "resolved": "13.3.1", "contentHash": "Q2dqDsb0xAlr092grgHk8/vTXI2snIiYM5ND3IXkgJDFIdPnqDYwYnlk+gwzSeRByDLhiSzTog8uT7xFwH68Zg==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, @@ -883,7 +885,7 @@ "monai.deploy.workflowmanager.taskmanager.api": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj b/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj index aacf775d5..921cd5060 100644 --- a/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj +++ b/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj @@ -58,6 +58,8 @@ + + true @@ -76,7 +78,6 @@ - @@ -84,10 +85,7 @@ - - - true - + diff --git a/src/TaskManager/TaskManager/appsettings.json b/src/TaskManager/TaskManager/appsettings.json index 9353ca939..f5b7589d3 100755 --- a/src/TaskManager/TaskManager/appsettings.json +++ b/src/TaskManager/TaskManager/appsettings.json @@ -77,6 +77,7 @@ "aideClinicalReviewCancelation": "aide.clinical_review.cancellation", "notificationEmailRequest": "aide.notification_email.request", "notificationEmailCancelation": "aide.notification_email.cancellation", + "exportHl7": "md.export.hl7" }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", diff --git a/src/TaskManager/TaskManager/packages.lock.json b/src/TaskManager/TaskManager/packages.lock.json index a99adaa9b..f48e9de8d 100644 --- a/src/TaskManager/TaskManager/packages.lock.json +++ b/src/TaskManager/TaskManager/packages.lock.json @@ -23,13 +23,25 @@ "Newtonsoft.Json.Bson": "1.0.2" } }, + "Monai.Deploy.Messaging": { + "type": "Direct", + "requested": "[1.0.5, )", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Messaging.RabbitMQ": { "type": "Direct", - "requested": "[1.0.4, )", - "resolved": "1.0.4", - "contentHash": "2llZ4XbE91Km2Q+JEKSSeTyhZLWRq3lN5xQ6+Klqow3V8SXBAlOQQ+b5//BEm6x0QdoycFberMOVAsZYYM0j7g==", + "requested": "[1.0.5, )", + "resolved": "1.0.5", + "contentHash": "L+BWU5Xq1ARjFRcpnefDJGuG52Zw4Iz3qql1tn8lYfqoC4B37fAUVz6k7Ar7v1OUwPo/JR8q4OP2IIMpqpKRRA==", "dependencies": { - "Monai.Deploy.Messaging": "1.0.4", + "Monai.Deploy.Messaging": "1.0.5", "Polly": "7.2.4", "RabbitMQ.Client": "6.5.0" } @@ -1124,19 +1136,10 @@ "resolved": "0.6.2", "contentHash": "jPao/LdUNLUz8rn3H1D8W7wQbZsRZM0iayvWI4xGejJg3XJHT56gcmYdgmCGPdJF1UEBqUjucCRrFB+4HbJsbw==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, @@ -1158,7 +1161,7 @@ "monai.deploy.workflowmanager.taskmanager.api": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs b/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs index 32632a8a3..91595ad88 100644 --- a/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs +++ b/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs @@ -20,8 +20,10 @@ public static class TaskTypeConstants { public const string RouterTask = "router"; - public const string ExportTask = "export"; + public const string DicomExportTask = "export"; public const string ExternalAppTask = "remote_app_execution"; + + public const string HL7ExportTask = "export_hl7"; } } diff --git a/src/WorkflowManager/Contracts/Monai.Deploy.WorkflowManager.Contracts.csproj b/src/WorkflowManager/Contracts/Monai.Deploy.WorkflowManager.Contracts.csproj index 2f41d39e0..eed15e760 100644 --- a/src/WorkflowManager/Contracts/Monai.Deploy.WorkflowManager.Contracts.csproj +++ b/src/WorkflowManager/Contracts/Monai.Deploy.WorkflowManager.Contracts.csproj @@ -38,11 +38,8 @@ + - - - - diff --git a/src/WorkflowManager/Database/packages.lock.json b/src/WorkflowManager/Database/packages.lock.json old mode 100755 new mode 100644 index ddd22e3af..27d229487 --- a/src/WorkflowManager/Database/packages.lock.json +++ b/src/WorkflowManager/Database/packages.lock.json @@ -267,6 +267,17 @@ "System.Security.Principal.Windows": "5.0.0" } }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "MongoDB.Bson": { "type": "Transitive", "resolved": "2.21.0", @@ -671,19 +682,10 @@ "resolved": "0.6.2", "contentHash": "jPao/LdUNLUz8rn3H1D8W7wQbZsRZM0iayvWI4xGejJg3XJHT56gcmYdgmCGPdJF1UEBqUjucCRrFB+4HbJsbw==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/WorkflowManager/Logging/packages.lock.json b/src/WorkflowManager/Logging/packages.lock.json old mode 100755 new mode 100644 index 4ca609e56..9deeb19b2 --- a/src/WorkflowManager/Logging/packages.lock.json +++ b/src/WorkflowManager/Logging/packages.lock.json @@ -208,6 +208,17 @@ "resolved": "1.1.0", "contentHash": "aOZA3BWfz9RXjpzt0sRJJMjAscAUm3Hoa4UWAfceV9UTYxgwZ1lZt5nO2myFf+/jetYQo4uTP7zS8sJY67BBxg==" }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Mongo.Migration": { "type": "Transitive", "resolved": "3.1.4", @@ -627,19 +638,10 @@ "resolved": "4.5.0", "contentHash": "okurQJO6NRE/apDIP23ajJ0hpiNmJ+f0BwOlB/cSqTLQlw5upkf+5+96+iG2Jw40G1fCVCyPz/FhIABUjMR+RQ==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/WorkflowManager/PayloadListener/packages.lock.json b/src/WorkflowManager/PayloadListener/packages.lock.json index decd50eaa..c24a10ae6 100644 --- a/src/WorkflowManager/PayloadListener/packages.lock.json +++ b/src/WorkflowManager/PayloadListener/packages.lock.json @@ -268,6 +268,17 @@ "System.Security.Principal.Windows": "5.0.0" } }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", @@ -757,15 +768,6 @@ "resolved": "0.6.2", "contentHash": "jPao/LdUNLUz8rn3H1D8W7wQbZsRZM0iayvWI4xGejJg3XJHT56gcmYdgmCGPdJF1UEBqUjucCRrFB+4HbJsbw==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { @@ -777,7 +779,7 @@ "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, @@ -800,7 +802,7 @@ "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } @@ -831,6 +833,7 @@ "monai.deploy.workloadmanager.workflowexecuter": { "type": "Project", "dependencies": { + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.WorkflowManager.Common": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Miscellaneous": "[1.0.0, )", diff --git a/src/WorkflowManager/Services/packages.lock.json b/src/WorkflowManager/Services/packages.lock.json index 76c0e129e..2d8a251f6 100644 --- a/src/WorkflowManager/Services/packages.lock.json +++ b/src/WorkflowManager/Services/packages.lock.json @@ -247,6 +247,17 @@ "System.Security.Principal.Windows": "5.0.0" } }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", @@ -711,15 +722,6 @@ "resolved": "0.6.2", "contentHash": "jPao/LdUNLUz8rn3H1D8W7wQbZsRZM0iayvWI4xGejJg3XJHT56gcmYdgmCGPdJF1UEBqUjucCRrFB+4HbJsbw==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { @@ -731,14 +733,14 @@ "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/WorkflowManager/Storage/packages.lock.json b/src/WorkflowManager/Storage/packages.lock.json old mode 100755 new mode 100644 index 382416b00..cff5b2f40 --- a/src/WorkflowManager/Storage/packages.lock.json +++ b/src/WorkflowManager/Storage/packages.lock.json @@ -233,6 +233,17 @@ "resolved": "1.1.0", "contentHash": "aOZA3BWfz9RXjpzt0sRJJMjAscAUm3Hoa4UWAfceV9UTYxgwZ1lZt5nO2myFf+/jetYQo4uTP7zS8sJY67BBxg==" }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage.S3Policy": { "type": "Transitive", "resolved": "0.2.18", @@ -661,19 +672,10 @@ "resolved": "4.5.0", "contentHash": "okurQJO6NRE/apDIP23ajJ0hpiNmJ+f0BwOlB/cSqTLQlw5upkf+5+96+iG2Jw40G1fCVCyPz/FhIABUjMR+RQ==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj b/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj index 9ef0454c9..0e3789995 100644 --- a/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj +++ b/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj @@ -37,6 +37,10 @@ + + + + @@ -46,6 +50,7 @@ + diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 7b4072c35..9e04ece02 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -60,6 +60,8 @@ public class WorkflowExecuterService : IWorkflowExecuterService private string TaskDispatchRoutingKey { get; } private string ExportRequestRoutingKey { get; } + private string ExternalAppRoutingKey { get; } + private string ExportHL7RoutingKey { get; } private string ClinicalReviewTimeoutRoutingKey { get; } public WorkflowExecuterService( @@ -94,7 +96,7 @@ public WorkflowExecuterService( ClinicalReviewTimeoutRoutingKey = configuration.Value.Messaging.Topics.AideClinicalReviewCancelation; _migExternalAppPlugins = configuration.Value.MigExternalAppPlugins.ToList(); ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}"; - + ExternalAppRoutingKey = configuration.Value.Messaging.Topics.ExternalAppRequest; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _workflowRepository = workflowRepository ?? throw new ArgumentNullException(nameof(workflowRepository)); _workflowInstanceRepository = workflowInstanceRepository ?? throw new ArgumentNullException(nameof(workflowInstanceRepository)); @@ -329,6 +331,7 @@ await SwitchTasksAsync(task, routerFunc: () => HandleTaskDestinations(workflowInstance, workflow, task, correlationId), exportFunc: () => HandleDicomExportAsync(workflow, workflowInstance, task, correlationId), externalFunc: () => HandleExternalAppAsync(workflow, workflowInstance, task, correlationId), + exportHl7Func: () => HandleHl7ExportAsync(workflow, workflowInstance, task, correlationId), notCreatedStatusFunc: () => { _logger.TaskPreviouslyDispatched(workflowInstance.PayloadId, task.TaskId); @@ -342,13 +345,15 @@ private static Task SwitchTasksAsync(TaskExecution task, Func routerFunc, Func exportFunc, Func externalFunc, + Func exportHl7Func, Func notCreatedStatusFunc, Func defaultFunc) => task switch { { TaskType: TaskTypeConstants.RouterTask } => routerFunc(), - { TaskType: TaskTypeConstants.ExportTask } => exportFunc(), + { TaskType: TaskTypeConstants.DicomExportTask } => exportFunc(), { TaskType: TaskTypeConstants.ExternalAppTask } => externalFunc(), + { TaskType: TaskTypeConstants.HL7ExportTask } => exportHl7Func(), { Status: var s } when s != TaskExecutionStatus.Created => notCreatedStatusFunc(), _ => defaultFunc() }; @@ -574,8 +579,44 @@ private async Task HandleExternalAppAsync(WorkflowRevision workflow, WorkflowIns private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List? plugins = null) { plugins ??= new List(); + var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); + + if (exportList is null || artifactValues is null) + { + return; + } + + await ExportDicomRequest(workflowInstance, task, exportList, artifactValues, correlationId, plugins); + } + + private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) + { var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray(); + if (exportList is null || !exportList.Any()) + { + exportList = null; + } + + var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId); + + if (artifactValues.IsNullOrEmpty()) + { + artifactValues = null; + } + return (exportList, artifactValues); + //await DispatchDicomExport(workflowInstance, task, exportList, artifactValues, correlationId, plugins); + } + + private async Task ExportDicomRequest(WorkflowInstance workflowInstance, TaskExecution task, string[] exportDestinations, string[] artifactValues, string correlationId, List plugins) + { + var jsonMesssage = GetJsonExportMessage(workflowInstance, task, exportDestinations, artifactValues, correlationId, plugins); + + await _messageBrokerPublisherService.Publish(ExportRequestRoutingKey, jsonMesssage.ToMessage()); + await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); + } + private async Task GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId) + { var artifactValues = GetDicomExports(workflow, task, exportList); var files = new List(); @@ -604,10 +645,43 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns _logger.ExportFilesNotFound(task.TaskId, workflowInstance.Id); await CompleteTask(task, workflowInstance, correlationId, TaskExecutionStatus.Failed); + } + return artifactValues; + } + private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) + { + var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); + + if (exportList is null || artifactValues is null) + { return; } - await DispatchDicomExport(workflowInstance, task, exportList, artifactValues, correlationId, plugins); + + await ExportHl7Request(workflowInstance, task, exportList, artifactValues, correlationId); + + } + private async Task ExportHl7Request(WorkflowInstance workflowInstance, TaskExecution task, string[] exportDestinations, string[] artifactValues, string correlationId) + { + var jsonMesssage = GetJsonExportMessage(workflowInstance, task, exportDestinations, artifactValues, correlationId, new List()); + + await _messageBrokerPublisherService.Publish(ExportHL7RoutingKey, jsonMesssage.ToMessage()); + await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); + } + + private JsonMessage GetJsonExportMessage( + WorkflowInstance workflowInstance, + TaskExecution task, + string[] exportDestinations, + string[] artifactValues, + string correlationId, + List plugins) + { + _logger.LogMigExport(task.TaskId, string.Join(",", exportDestinations), artifactValues.Length, string.Join(",", plugins)); + var exportRequestEvent = EventMapper.ToExportRequestEvent(artifactValues, exportDestinations, task.TaskId, workflowInstance.Id, correlationId); + exportRequestEvent.PayloadId = workflowInstance.PayloadId; + var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); + return jsonMesssage; } private string[] GetDicomExports(WorkflowRevision workflow, TaskExecution task, string[]? exportDestinations) @@ -638,17 +712,7 @@ private string[] GetDicomExports(WorkflowRevision workflow, TaskExecution task, return new List(task.InputArtifacts.Values).ToArray(); } - private async Task DispatchDicomExport(WorkflowInstance workflowInstance, TaskExecution task, string[]? exportDestinations, string[] artifactValues, string correlationId, List plugins) - { - if (exportDestinations is null || !exportDestinations.Any()) - { - return false; - } - _logger.LogMigExport(task.TaskId, string.Join(",", exportDestinations), artifactValues.Length, string.Join(",", plugins)); - await ExportRequest(workflowInstance, task, exportDestinations, artifactValues, correlationId, plugins); - return await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); - } private async Task HandleOutputArtifacts(WorkflowInstance workflowInstance, List outputs, TaskExecution task, WorkflowRevision workflowRevision) { @@ -724,7 +788,7 @@ private async Task DispatchTaskDestinations(WorkflowInstance workflowInsta continue; } - if (string.Equals(taskExec!.TaskType, TaskTypeConstants.ExportTask, StringComparison.InvariantCultureIgnoreCase)) + if (string.Equals(taskExec!.TaskType, TaskTypeConstants.DicomExportTask, StringComparison.InvariantCultureIgnoreCase)) { await HandleDicomExportAsync(workflow, workflowInstance, taskExec!, correlationId); @@ -738,6 +802,13 @@ private async Task DispatchTaskDestinations(WorkflowInstance workflowInsta continue; } + if (string.Equals(taskExec!.TaskType, TaskTypeConstants.HL7ExportTask, StringComparison.InvariantCultureIgnoreCase)) + { + await HandleHl7ExportAsync(workflow, workflowInstance, taskExec!, correlationId); + + continue; + } + processed &= await DispatchTask(workflowInstance, workflow, taskExec!, correlationId); if (processed is false) @@ -881,15 +952,7 @@ private async Task DispatchTask(WorkflowInstance workflowInstance, Workflo } } - private async Task ExportRequest(WorkflowInstance workflowInstance, TaskExecution taskExec, string[] exportDestinations, IList dicomImages, string correlationId, List plugins) - { - var exportRequestEvent = EventMapper.ToExportRequestEvent(dicomImages, exportDestinations, taskExec.TaskId, workflowInstance.Id, correlationId, plugins); - exportRequestEvent.PayloadId = workflowInstance.PayloadId; - var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); - await _messageBrokerPublisherService.Publish(ExportRequestRoutingKey, jsonMesssage.ToMessage()); - return true; - } private async Task ClinicalReviewTimeOutEvent(WorkflowInstance workflowInstance, TaskExecution taskExec, string correlationId) { diff --git a/src/WorkflowManager/WorkflowExecuter/packages.lock.json b/src/WorkflowManager/WorkflowExecuter/packages.lock.json index bcd3406c9..0266cb757 100644 --- a/src/WorkflowManager/WorkflowExecuter/packages.lock.json +++ b/src/WorkflowManager/WorkflowExecuter/packages.lock.json @@ -2,6 +2,18 @@ "version": 1, "dependencies": { "net6.0": { + "Monai.Deploy.Messaging": { + "type": "Direct", + "requested": "[1.0.5, )", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Ardalis.GuardClauses": { "type": "Transitive", "resolved": "4.1.1", @@ -757,15 +769,6 @@ "resolved": "0.6.2", "contentHash": "jPao/LdUNLUz8rn3H1D8W7wQbZsRZM0iayvWI4xGejJg3XJHT56gcmYdgmCGPdJF1UEBqUjucCRrFB+4HbJsbw==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { @@ -777,7 +780,7 @@ "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, @@ -800,7 +803,7 @@ "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } diff --git a/src/WorkflowManager/WorkflowManager/Monai.Deploy.WorkflowManager.csproj b/src/WorkflowManager/WorkflowManager/Monai.Deploy.WorkflowManager.csproj index f0cf8b3fe..50fd97dc0 100644 --- a/src/WorkflowManager/WorkflowManager/Monai.Deploy.WorkflowManager.csproj +++ b/src/WorkflowManager/WorkflowManager/Monai.Deploy.WorkflowManager.csproj @@ -42,6 +42,7 @@ + true @@ -69,7 +70,6 @@ - diff --git a/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs b/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs index 580bdac9f..99bf78a6c 100644 --- a/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs +++ b/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs @@ -346,6 +346,9 @@ private void TaskTypeSpecificValidation(Workflow workflow, TaskObject currentTas case Email: ValidateEmailTask(currentTask); break; + case HL7ExportTask: + ValidateHL7ExportTask(workflow, currentTask); + break; } } @@ -616,6 +619,23 @@ private void ValidateExportTask(Workflow workflow, TaskObject currentTask) ValidateInputs(currentTask); } + private void ValidateHL7ExportTask(Workflow workflow, TaskObject currentTask) + { + if (currentTask.ExportDestinations.Any() is false) + { + Errors.Add($"Task: '{currentTask.Id}' does not contain a destination."); + } + + CheckDestinationInMigDestinations(currentTask, workflow.InformaticsGateway); + + if (currentTask.ExportDestinations.Length != currentTask.ExportDestinations.Select(t => t.Name).Distinct().Count()) + { + Errors.Add($"Task: '{currentTask.Id}' contains duplicate destinations."); + } + + ValidateInputs(currentTask); + } + private void ValidateExternalAppTask(Workflow workflow, TaskObject currentTask) { if (currentTask.ExportDestinations.Any() is false) diff --git a/src/WorkflowManager/WorkflowManager/packages.lock.json b/src/WorkflowManager/WorkflowManager/packages.lock.json old mode 100755 new mode 100644 index 1e2ef4eed..426336806 --- a/src/WorkflowManager/WorkflowManager/packages.lock.json +++ b/src/WorkflowManager/WorkflowManager/packages.lock.json @@ -23,6 +23,17 @@ "Newtonsoft.Json.Bson": "1.0.2" } }, + "Monai.Deploy.Messaging.RabbitMQ": { + "type": "Direct", + "requested": "[1.0.5, )", + "resolved": "1.0.5", + "contentHash": "L+BWU5Xq1ARjFRcpnefDJGuG52Zw4Iz3qql1tn8lYfqoC4B37fAUVz6k7Ar7v1OUwPo/JR8q4OP2IIMpqpKRRA==", + "dependencies": { + "Monai.Deploy.Messaging": "1.0.5", + "Polly": "7.2.4", + "RabbitMQ.Client": "6.5.0" + } + }, "Monai.Deploy.Security": { "type": "Direct", "requested": "[0.1.3, )", @@ -472,6 +483,17 @@ "System.Reactive.Linq": "5.0.0" } }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", @@ -1026,23 +1048,6 @@ "resolved": "0.6.2", "contentHash": "jPao/LdUNLUz8rn3H1D8W7wQbZsRZM0iayvWI4xGejJg3XJHT56gcmYdgmCGPdJF1UEBqUjucCRrFB+4HbJsbw==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, - "monai.deploy.messaging.rabbitmq": { - "type": "Project", - "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", - "Polly": "[7.2.4, )", - "RabbitMQ.Client": "[6.5.0, )" - } - }, "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { @@ -1054,7 +1059,7 @@ "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, @@ -1077,7 +1082,7 @@ "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } @@ -1133,6 +1138,7 @@ "monai.deploy.workloadmanager.workflowexecuter": { "type": "Project", "dependencies": { + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.WorkflowManager.Common": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Miscellaneous": "[1.0.0, )", diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.csproj b/tests/IntegrationTests/TaskManager.IntegrationTests/Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.csproj index f79ef9076..50a196779 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.csproj +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.csproj @@ -27,6 +27,8 @@ + + @@ -40,11 +42,9 @@ - - diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj index fe4468e10..f871f144e 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj @@ -34,6 +34,8 @@ + + @@ -46,14 +48,10 @@ + - - - - - PayloadApi.feature diff --git a/tests/UnitTests/TaskManager.Tests/Monai.Deploy.WorkflowManager.TaskManager.Tests.csproj b/tests/UnitTests/TaskManager.Tests/Monai.Deploy.WorkflowManager.TaskManager.Tests.csproj index 5bc6c5979..8ae2d15fa 100644 --- a/tests/UnitTests/TaskManager.Tests/Monai.Deploy.WorkflowManager.TaskManager.Tests.csproj +++ b/tests/UnitTests/TaskManager.Tests/Monai.Deploy.WorkflowManager.TaskManager.Tests.csproj @@ -25,6 +25,7 @@ + diff --git a/tests/UnitTests/WorkflowManager.Tests/packages.lock.json b/tests/UnitTests/WorkflowManager.Tests/packages.lock.json old mode 100755 new mode 100644 index 047823ab7..bcb898dff --- a/tests/UnitTests/WorkflowManager.Tests/packages.lock.json +++ b/tests/UnitTests/WorkflowManager.Tests/packages.lock.json @@ -508,6 +508,27 @@ "System.Reactive.Linq": "5.0.0" } }, + "Monai.Deploy.Messaging": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, + "Monai.Deploy.Messaging.RabbitMQ": { + "type": "Transitive", + "resolved": "1.0.5", + "contentHash": "L+BWU5Xq1ARjFRcpnefDJGuG52Zw4Iz3qql1tn8lYfqoC4B37fAUVz6k7Ar7v1OUwPo/JR8q4OP2IIMpqpKRRA==", + "dependencies": { + "Monai.Deploy.Messaging": "1.0.5", + "Polly": "7.2.4", + "RabbitMQ.Client": "6.5.0" + } + }, "Monai.Deploy.Security": { "type": "Transitive", "resolved": "0.1.3", @@ -1838,29 +1859,12 @@ "resolved": "0.6.2", "contentHash": "jPao/LdUNLUz8rn3H1D8W7wQbZsRZM0iayvWI4xGejJg3XJHT56gcmYdgmCGPdJF1UEBqUjucCRrFB+4HbJsbw==" }, - "monai.deploy.messaging": { - "type": "Project", - "dependencies": { - "Ardalis.GuardClauses": "[4.1.1, )", - "Microsoft.Extensions.Diagnostics.HealthChecks": "[6.0.21, )", - "Newtonsoft.Json": "[13.0.3, )", - "System.IO.Abstractions": "[17.2.3, )" - } - }, - "monai.deploy.messaging.rabbitmq": { - "type": "Project", - "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", - "Polly": "[7.2.4, )", - "RabbitMQ.Client": "[6.5.0, )" - } - }, "monai.deploy.workflowmanager": { "type": "Project", "dependencies": { "AspNetCore.HealthChecks.MongoDb": "[6.0.2, )", "Microsoft.AspNetCore.Mvc.NewtonsoftJson": "[6.0.22, )", - "Monai.Deploy.Messaging.RabbitMQ": "[0.1.0, )", + "Monai.Deploy.Messaging.RabbitMQ": "[1.0.5, )", "Monai.Deploy.Security": "[0.1.3, )", "Monai.Deploy.Storage.MinIO": "[0.2.18, )", "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", @@ -1888,7 +1892,7 @@ "monai.deploy.workflowmanager.common.configuration": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.Storage": "[0.2.18, )" } }, @@ -1911,7 +1915,7 @@ "monai.deploy.workflowmanager.contracts": { "type": "Project", "dependencies": { - "Monai.Deploy.Messaging": "[0.1.0, )", + "Monai.Deploy.Messaging": "[1.0.5, )", "Mongo.Migration": "[3.1.4, )", "MongoDB.Bson": "[2.21.0, )" } @@ -1967,6 +1971,7 @@ "monai.deploy.workloadmanager.workflowexecuter": { "type": "Project", "dependencies": { + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.WorkflowManager.Common": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Miscellaneous": "[1.0.0, )", From 2d7e91d30658a735c6f46d201b65ed39b4080002 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 24 Nov 2023 17:28:56 +0000 Subject: [PATCH 2/8] adding tests Signed-off-by: Neil South --- .../Configuration/ConfigurationValidator.cs | 1 + .../Services/WorkflowExecuterService.cs | 30 +- .../WorkflowManager/appsettings.json | 3 +- .../Services/WorkflowExecuterServiceTests.cs | 521 +++++++++++++++++- 4 files changed, 534 insertions(+), 21 deletions(-) diff --git a/src/Common/Configuration/ConfigurationValidator.cs b/src/Common/Configuration/ConfigurationValidator.cs index b1ba52a89..487afa20f 100644 --- a/src/Common/Configuration/ConfigurationValidator.cs +++ b/src/Common/Configuration/ConfigurationValidator.cs @@ -58,6 +58,7 @@ public bool IsTopicsValid(MessageBrokerConfigurationKeys configurationKeys) valid &= IsStringValueNotNull(nameof(configurationKeys.WorkflowRequest), configurationKeys.WorkflowRequest); valid &= IsStringValueNotNull(nameof(configurationKeys.ExportRequestPrefix), configurationKeys.ExportRequestPrefix); valid &= IsStringValueNotNull(nameof(configurationKeys.TaskDispatchRequest), configurationKeys.TaskDispatchRequest); + valid &= IsStringValueNotNull(nameof(configurationKeys.ExportHL7), configurationKeys.ExportHL7); return valid; } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 9e04ece02..60d969104 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -97,6 +97,7 @@ public WorkflowExecuterService( _migExternalAppPlugins = configuration.Value.MigExternalAppPlugins.ToList(); ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}"; ExternalAppRoutingKey = configuration.Value.Messaging.Topics.ExternalAppRequest; + ExportHL7RoutingKey = configuration.Value.Messaging.Topics.ExportHL7; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _workflowRepository = workflowRepository ?? throw new ArgumentNullException(nameof(workflowRepository)); _workflowInstanceRepository = workflowInstanceRepository ?? throw new ArgumentNullException(nameof(workflowInstanceRepository)); @@ -586,7 +587,11 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns return; } - await ExportDicomRequest(workflowInstance, task, exportList, artifactValues, correlationId, plugins); + var exportRequestEvent = GetExportRequestEvent(workflowInstance, task, exportList, artifactValues, correlationId, plugins); + var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); + + await _messageBrokerPublisherService.Publish(ExportRequestRoutingKey, jsonMesssage.ToMessage()); + await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); } private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) @@ -604,15 +609,6 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns artifactValues = null; } return (exportList, artifactValues); - //await DispatchDicomExport(workflowInstance, task, exportList, artifactValues, correlationId, plugins); - } - - private async Task ExportDicomRequest(WorkflowInstance workflowInstance, TaskExecution task, string[] exportDestinations, string[] artifactValues, string correlationId, List plugins) - { - var jsonMesssage = GetJsonExportMessage(workflowInstance, task, exportDestinations, artifactValues, correlationId, plugins); - - await _messageBrokerPublisherService.Publish(ExportRequestRoutingKey, jsonMesssage.ToMessage()); - await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); } private async Task GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId) @@ -658,18 +654,15 @@ private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInsta return; } - await ExportHl7Request(workflowInstance, task, exportList, artifactValues, correlationId); - - } - private async Task ExportHl7Request(WorkflowInstance workflowInstance, TaskExecution task, string[] exportDestinations, string[] artifactValues, string correlationId) - { - var jsonMesssage = GetJsonExportMessage(workflowInstance, task, exportDestinations, artifactValues, correlationId, new List()); + var exportRequestEvent = GetExportRequestEvent(workflowInstance, task, exportList, artifactValues, correlationId, new List()); + exportRequestEvent.Target!.DataService = DataService.HL7; + var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); await _messageBrokerPublisherService.Publish(ExportHL7RoutingKey, jsonMesssage.ToMessage()); await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); } - private JsonMessage GetJsonExportMessage( + private ExportRequestEvent GetExportRequestEvent( WorkflowInstance workflowInstance, TaskExecution task, string[] exportDestinations, @@ -680,8 +673,7 @@ private JsonMessage GetJsonExportMessage( _logger.LogMigExport(task.TaskId, string.Join(",", exportDestinations), artifactValues.Length, string.Join(",", plugins)); var exportRequestEvent = EventMapper.ToExportRequestEvent(artifactValues, exportDestinations, task.TaskId, workflowInstance.Id, correlationId); exportRequestEvent.PayloadId = workflowInstance.PayloadId; - var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); - return jsonMesssage; + return exportRequestEvent; } private string[] GetDicomExports(WorkflowRevision workflow, TaskExecution task, string[]? exportDestinations) diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index 555e34085..959e66d7f 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -58,7 +58,8 @@ "exportComplete": "md.export.complete", "exportRequestPrefix": "md.export.request", "callbackRequest": "md.tasks.callback", - "aideClinicalReviewRequest": "aide.clinical_review.request" + "aideClinicalReviewRequest": "aide.clinical_review.request", + "exportHl7": "md.export.hl7" }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 9b726b4a6..ee73982c0 100644 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -90,7 +90,7 @@ public WorkflowExecuterServiceTests() PerTaskTypeTimeoutMinutes = new Dictionary { { "taskType", _timeoutForTypeTask } }, Messaging = new MessageBrokerConfiguration { - Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, + Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request", ExportHL7 = "md.export.hl7" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } }, MigExternalAppPlugins = new List { { "examplePlugin" } }.ToArray() @@ -3204,6 +3204,525 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat _workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once()); } + [Fact] + public async Task ProcessPayload_WithExportTask_NoExportsFails() + { + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + var dcmInfo = new Dictionary() { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out dcmInfo)).Returns(true); + + _messageBrokerPublisherService.Setup(m => m.Publish(It.IsAny(), It.IsAny())); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish($"{_configuration.Value.Messaging.Topics.ExportRequestPrefix}.{_configuration.Value.Messaging.DicomAgents.ScuAgentName}", It.IsAny()), Times.Exactly(0)); + + Assert.True(result); + +#pragma warning restore CS8604 // Possible null reference argument. + } + + [Fact] + public async Task ProcessPayload_WithHl7ExportTask_DispatchesExportHl7() + { + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + new ExportDestination + { + Name = "PROD_PACS" + } + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + var dcmInfo = new Dictionary() { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out dcmInfo)).Returns(true); + _storageService.Setup(w => w.ListObjectsAsync(workflowRequest.Bucket, "/dcm", true, It.IsAny())) + .ReturnsAsync(new List() + { + new VirtualFileInfo("testfile.dcm", "/dcm/testfile.dcm", "test", ulong.MaxValue) + }); + + Message? messageSent = null; + _messageBrokerPublisherService.Setup(m => m.Publish(It.IsAny(), It.IsAny())) + .Callback((string topic, Message m) => messageSent = m); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(1)); + + Assert.True(result); + Assert.NotNull(messageSent); +#pragma warning disable CS8604 // Possible null reference argument. + var body = Encoding.UTF8.GetString(messageSent?.Body); + var exportMessageBody = JsonConvert.DeserializeObject(body); + Assert.Empty(exportMessageBody!.PluginAssemblies); + + var exportEventMessage = messageSent.ConvertTo(); + Assert.NotNull(exportEventMessage.Target); + Assert.Equal(DataService.HL7, exportEventMessage.Target.DataService); + +#pragma warning restore CS8604 // Possible null reference argument. + } + + [Fact] + public async Task ProcessPayload_WithHl7ExportTask_NoExportsFails() + { + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + var dcmInfo = new Dictionary() { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out dcmInfo)).Returns(true); + + _messageBrokerPublisherService.Setup(m => m.Publish(It.IsAny(), It.IsAny())); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(0)); + + Assert.True(result); + +#pragma warning restore CS8604 // Possible null reference argument. + } + + [Fact] + public async Task ProcessPayload_WithInvalidHl7ExportTask_DoesNotDispatchExportHl7() + { + // because this test has no _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath + // it returns no matching artifacts. hence the failure ! + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + new ExportDestination + { + Name = "PROD_PINS" + } + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _artifactMapper.Setup(a => a.ConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(new Dictionary() { { "dicomexport", "/dcm" } }); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(0)); + Assert.True(result); + } + + [Fact] + public async Task ProcessTaskUpdate_ValidTaskUpdateEventWithExportHl7TaskDestination_ReturnsTrue() + { + var workflowInstanceId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var updateEvent = new TaskUpdateEvent + { + WorkflowInstanceId = workflowInstanceId, + TaskId = "pizza", + ExecutionId = Guid.NewGuid().ToString(), + Status = TaskExecutionStatus.Succeeded, + Reason = FailureReason.None, + Message = "This is a message", + Metadata = new Dictionary(), + CorrelationId = Guid.NewGuid().ToString() + }; + + var workflowId = Guid.NewGuid().ToString(); + + var workflow = new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname2", + Description = "Workflowdesc2", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = "pizza", + Type = "type", + Description = "taskdesc", + TaskDestinations = new TaskDestination[] + { + new TaskDestination + { + Name = "exporttaskid" + }, + } + }, + new TaskObject { + Id = "exporttaskid", + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + new ExportDestination + { + Name = "PROD_PACS" + } + } + } + } + } + }; + + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + WorkflowId = workflowId, + WorkflowName = workflow.Workflow.Name, + PayloadId = Guid.NewGuid().ToString(), + Status = Status.Created, + BucketId = "bucket", + Tasks = new List + { + new TaskExecution + { + TaskId = "pizza", + Status = TaskExecutionStatus.Dispatched + } + } + }; + + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(workflowInstance.Id, It.IsAny>())).ReturnsAsync(true); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowInstance.WorkflowId)).ReturnsAsync(workflow); + _payloadService.Setup(p => p.GetByIdAsync(It.IsAny())).ReturnsAsync(new Payload { PatientDetails = new PatientDetails { } }); + var expectedDcmValue = new Dictionary { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out expectedDcmValue)).Returns(true); + _storageService.Setup(w => w.ListObjectsAsync(It.IsAny(), "/dcm", true, It.IsAny())) + .ReturnsAsync(new List() + { + new VirtualFileInfo("testfile.dcm", "/dcm/testfile.dcm", "test", ulong.MaxValue) + }); + + var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(1)); + + response.Should().BeTrue(); + } + + [Fact] + public async Task ProcessTaskUpdate_ValidTaskUpdateEventWithExportHl7TaskDestination_NoExportDestinations_DoesNotDispatchExport() + { + var workflowInstanceId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var updateEvent = new TaskUpdateEvent + { + WorkflowInstanceId = workflowInstanceId, + TaskId = "pizza", + ExecutionId = Guid.NewGuid().ToString(), + Status = TaskExecutionStatus.Succeeded, + Reason = FailureReason.None, + Message = "This is a message", + Metadata = new Dictionary(), + CorrelationId = Guid.NewGuid().ToString() + }; + + var workflowId = Guid.NewGuid().ToString(); + + var workflow = new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname2", + Description = "Workflowdesc2", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle" + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = "pizza", + Type = "type", + Description = "taskdesc", + TaskDestinations = new TaskDestination[] + { + new TaskDestination + { + Name = "exporttaskid" + }, + } + }, + new TaskObject { + Id = "exporttaskid", + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty() + } + } + } + }; + + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + WorkflowId = workflowId, + WorkflowName = workflow.Workflow.Name, + PayloadId = Guid.NewGuid().ToString(), + Status = Status.Created, + BucketId = "bucket", + Tasks = new List + { + new TaskExecution + { + TaskId = "pizza", + Status = TaskExecutionStatus.Dispatched + } + } + }; + + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(workflowInstance.Id, It.IsAny>())).ReturnsAsync(true); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowInstance.WorkflowId)).ReturnsAsync(workflow); + _payloadService.Setup(p => p.GetByIdAsync(It.IsAny())).ReturnsAsync(new Payload { PatientDetails = new PatientDetails { } }); + _artifactMapper.Setup(a => a.ConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(new Dictionary { { "dicomexport", "/dcm" } }); + + var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(0)); + + response.Should().BeTrue(); + } + } + #pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. } From 4b9aafad7f6dbc33134ec9f8bb0bd162eb37aff3 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 24 Nov 2023 17:41:31 +0000 Subject: [PATCH 3/8] fix up merge Signed-off-by: Neil South --- .../Services/WorkflowExecuterService.cs | 3 +-- .../WorkflowExecuter/packages.lock.json | 11 ----------- .../Services/WorkflowExecuterServiceTests.cs | 7 +++---- 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index bc5f7fcb2..549690607 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -352,7 +352,6 @@ private static Task SwitchTasksAsync(TaskExecution task, Func externalFunc, Func exportHl7Func, Func notCreatedStatusFunc, - Func exportHl7Func, Func defaultFunc) => task switch { @@ -706,7 +705,7 @@ private ExportRequestEvent GetExportRequestEvent( List plugins) { _logger.LogMigExport(task.TaskId, string.Join(",", exportDestinations), artifactValues.Length, string.Join(",", plugins)); - var exportRequestEvent = EventMapper.ToExportRequestEvent(artifactValues, exportDestinations, task.TaskId, workflowInstance.Id, correlationId); + var exportRequestEvent = EventMapper.ToExportRequestEvent(artifactValues, exportDestinations, task.TaskId, workflowInstance.Id, correlationId, plugins); exportRequestEvent.PayloadId = workflowInstance.PayloadId; return exportRequestEvent; } diff --git a/src/WorkflowManager/WorkflowExecuter/packages.lock.json b/src/WorkflowManager/WorkflowExecuter/packages.lock.json index 269d62a26..0266cb757 100644 --- a/src/WorkflowManager/WorkflowExecuter/packages.lock.json +++ b/src/WorkflowManager/WorkflowExecuter/packages.lock.json @@ -280,17 +280,6 @@ "System.Security.Principal.Windows": "5.0.0" } }, - "Monai.Deploy.Messaging": { - "type": "Transitive", - "resolved": "1.0.5-rc0006", - "contentHash": "Yr6Ix8AeKdciz7t9aeteYuDAiNpmv3FmpF9bvdvjVh46gBazf+HBdvXdbWWXgzNTd3yevsQGBKazQXN9ecqwog==", - "dependencies": { - "Ardalis.GuardClauses": "4.1.1", - "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", - "Newtonsoft.Json": "13.0.3", - "System.IO.Abstractions": "17.2.3" - } - }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index d00d73108..7b1fadc36 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -2685,7 +2685,7 @@ public async Task ProcessPayload_WithExternalAppTask_Dispatches() var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); - _messageBrokerPublisherService.Verify(w => w.Publish($"{_configuration.Value.Messaging.Topics.ExternalAppRequest}", It.IsAny()), Times.Exactly(1)); + _messageBrokerPublisherService.Verify(w => w.Publish($"{_configuration.Value.Messaging.Topics.ExportRequestPrefix}.{_configuration.Value.Messaging.DicomAgents.ScuAgentName}", It.IsAny()), Times.Exactly(1)); Assert.True(result); Assert.NotNull(messageSent); @@ -3184,7 +3184,6 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue() Assert.True(result); } - [Fact] public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync() { @@ -3198,7 +3197,7 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat var workflowInstance = new WorkflowInstance { WorkflowId = "789", Tasks = new List() - { new TaskExecution() { TaskId = "456" } } + { new TaskExecution() { TaskId = "not456" } } }; _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! .ReturnsAsync(workflowInstance); @@ -3213,7 +3212,7 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat .ReturnsAsync(workflowTemplate); _storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .ReturnsAsync(new Dictionary { { $"{artifactPath}", true } }); + .ReturnsAsync(new Dictionary { { $"{message.PayloadId}/{artifactPath}", true } }); //previously received artifacts _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) From 595048ff2c663a8c853f0f4e3590aababf0bc772 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 24 Nov 2023 18:14:01 +0000 Subject: [PATCH 4/8] fix tests Signed-off-by: Neil South --- .../Services/WorkflowExecuterServiceTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 7b1fadc36..c849fb622 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -2685,7 +2685,7 @@ public async Task ProcessPayload_WithExternalAppTask_Dispatches() var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); - _messageBrokerPublisherService.Verify(w => w.Publish($"{_configuration.Value.Messaging.Topics.ExportRequestPrefix}.{_configuration.Value.Messaging.DicomAgents.ScuAgentName}", It.IsAny()), Times.Exactly(1)); + _messageBrokerPublisherService.Verify(w => w.Publish($"{_configuration.Value.Messaging.Topics.ExternalAppRequest}", It.IsAny()), Times.Exactly(1)); Assert.True(result); Assert.NotNull(messageSent); @@ -3192,12 +3192,12 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456", - Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = artifactPath } } + Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } } }; var workflowInstance = new WorkflowInstance { WorkflowId = "789", Tasks = new List() - { new TaskExecution() { TaskId = "not456" } } + { new TaskExecution() { TaskId = "456" } } }; _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! .ReturnsAsync(workflowInstance); From 92a7f9867c9c0a59e50b85d55dfae3bc3a9951da Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 30 Nov 2023 11:20:30 +0000 Subject: [PATCH 5/8] adding config Signed-off-by: Neil South --- .../Configuration/MessageBrokerConfigurationKeys.cs | 8 ++++++++ src/WorkflowManager/WorkflowManager/appsettings.json | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs index f5742f471..53e1d5ab4 100644 --- a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs +++ b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs @@ -105,5 +105,13 @@ public class MessageBrokerConfigurationKeys /// [ConfigurationKeyName("exportHl7")] public string ExportHL7 { get; set; } = "md.export.hl7"; + + + /// + /// Gets or sets the topic for publishing export complete requests. + /// Defaults to `md_export_complete`. + /// + [ConfigurationKeyName("exportHl7Complete")] + public string ExportHl7Complete { get; set; } = "md.export.hl7complete"; } } diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index 3908f5f68..502dabf97 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -59,7 +59,8 @@ "exportRequestPrefix": "md.export.request", "callbackRequest": "md.tasks.callback", "aideClinicalReviewRequest": "aide.clinical_review.request", - "exportHl7": "md.export.hl7" + "exportHl7": "md.export.hl7", + "exportHl7Complete": "md.export.hl7complete" }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", From 3082d6db34e291c01c446931b2e2643807c106de Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 1 Dec 2023 12:06:03 +0000 Subject: [PATCH 6/8] HL7export complete Signed-off-by: Neil South --- .../Configuration/ConfigurationValidator.cs | 5 +++++ .../MessageBrokerConfigurationKeys.cs | 7 +++++++ .../Services/PayloadListenerService.cs | 20 +++++++++++++++++++ .../WorkflowExecuter/Common/ArtifactMapper.cs | 2 +- .../Services/WorkflowExecuterService.cs | 4 ++-- .../WorkflowManager/appsettings.json | 7 +++++++ 6 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/Common/Configuration/ConfigurationValidator.cs b/src/Common/Configuration/ConfigurationValidator.cs index 487afa20f..d86270b40 100644 --- a/src/Common/Configuration/ConfigurationValidator.cs +++ b/src/Common/Configuration/ConfigurationValidator.cs @@ -58,7 +58,12 @@ public bool IsTopicsValid(MessageBrokerConfigurationKeys configurationKeys) valid &= IsStringValueNotNull(nameof(configurationKeys.WorkflowRequest), configurationKeys.WorkflowRequest); valid &= IsStringValueNotNull(nameof(configurationKeys.ExportRequestPrefix), configurationKeys.ExportRequestPrefix); valid &= IsStringValueNotNull(nameof(configurationKeys.TaskDispatchRequest), configurationKeys.TaskDispatchRequest); +<<<<<<< Updated upstream valid &= IsStringValueNotNull(nameof(configurationKeys.ExportHL7), configurationKeys.ExportHL7); +||||||| constructed merge base +======= + valid &= IsStringValueNotNull(nameof(configurationKeys.ExportHL7Complete), configurationKeys.ExportHL7Complete); +>>>>>>> Stashed changes return valid; } diff --git a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs index 53e1d5ab4..900261209 100644 --- a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs +++ b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs @@ -34,6 +34,13 @@ public class MessageBrokerConfigurationKeys [ConfigurationKeyName("exportComplete")] public string ExportComplete { get; set; } = "md.export.complete"; + /// + /// Gets or sets the topic for publishing workflow requests. + /// Defaults to `md.export.complete`. + /// + [ConfigurationKeyName("exportHL7Complete")] + public string ExportHL7Complete { get; set; } = "md.export.hl7complete"; + /// /// Gets or sets the topic for publishing workflow requests. /// Defaults to `md.export.request`. diff --git a/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs b/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs index 02b4497c8..96162aa55 100644 --- a/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs +++ b/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs @@ -43,6 +43,7 @@ public class PayloadListenerService : IHostedService, IMonaiService, IDisposable public string TaskStatusUpdateRoutingKey { get; set; } public string ExportCompleteRoutingKey { get; set; } public string ArtifactRecievedRoutingKey { get; set; } + public string ExportHL7CompleteRoutingKey { get; set; } protected int Concurrency { get; set; } public ServiceStatus Status { get; set; } = ServiceStatus.Unknown; public string ServiceName => "Payload Listener Service"; @@ -67,6 +68,7 @@ public PayloadListenerService( WorkflowRequestRoutingKey = configuration.Value.Messaging.Topics.WorkflowRequest; ExportCompleteRoutingKey = configuration.Value.Messaging.Topics.ExportComplete; ArtifactRecievedRoutingKey = configuration.Value.Messaging.Topics.ArtifactRecieved; + ExportHL7CompleteRoutingKey = configuration.Value.Messaging.Topics.ExportHL7Complete; Concurrency = 2; @@ -110,6 +112,9 @@ private void SetupPolling() _messageSubscriber.SubscribeAsync(ArtifactRecievedRoutingKey, ArtifactRecievedRoutingKey, OnArtifactReceivedtReceivedCallbackAsync); _logger.EventSubscription(ServiceName, ArtifactRecievedRoutingKey); + + _messageSubscriber.SubscribeAsync(ExportHL7CompleteRoutingKey, ExportHL7CompleteRoutingKey, OnExportHL7CompleteReceivedCallback); + _logger.EventSubscription(ServiceName, ExportHL7CompleteRoutingKey); } private async Task OnWorkflowRequestReceivedCallbackAsync(MessageReceivedEventArgs eventArgs) @@ -156,6 +161,21 @@ private async Task OnExportCompleteReceivedCallback(MessageReceivedEventArgs eve } + private async Task OnExportHL7CompleteReceivedCallback(MessageReceivedEventArgs eventArgs) + { + using var loggerScope = _logger.BeginScope(new Common.Miscellaneous.LoggingDataDictionary + { + ["correlationId"] = eventArgs.Message.CorrelationId, + ["source"] = eventArgs.Message.ApplicationId, + ["messageId"] = eventArgs.Message.MessageId, + ["messageDescription"] = eventArgs.Message.MessageDescription, + }); + + _logger.ExportCompleteReceived(); + await _eventPayloadListenerService.ExportCompletePayload(eventArgs); + + } + private async Task OnArtifactReceivedtReceivedCallbackAsync(MessageReceivedEventArgs eventArgs) { diff --git a/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs b/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs index 524922a99..00f19cfe6 100755 --- a/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs +++ b/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs @@ -181,7 +181,7 @@ private async Task> ConvertVariableStringToPath(Art var artifactName = variableWords[4]; var outputArtifact = task.OutputArtifacts?.FirstOrDefault(a => a.Key == artifactName); - if (!outputArtifact.HasValue) + if (!outputArtifact.HasValue || string.IsNullOrEmpty(outputArtifact.Value.Value)) { return default; } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 549690607..38f8840b4 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -275,7 +275,7 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId); - currentTask!.OutputArtifacts = validArtifacts; // added here are the parent function saves the object ! + currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes _logger.LogDebug($"adding files to workflowInstance {workflowInstance.Id} :Task {taskId} : {JsonConvert.SerializeObject(validArtifacts)}"); await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); @@ -495,7 +495,7 @@ public async Task ProcessExportComplete(ExportCompleteEvent message, strin await _workflowInstanceService.UpdateExportCompleteMetadataAsync(workflowInstance.Id, task.ExecutionId, message.FileStatuses); var succeededFileCount = message.FileStatuses.Count(f => f.Value == FileExportStatus.Success); - var totalFileCount = message.FileStatuses.Count(); + var totalFileCount = message.FileStatuses.Count; if (message.Status.Equals(ExportStatus.Success) && TaskExecutionStatus.Succeeded.IsTaskExecutionStatusUpdateValid(task.Status)) diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index 502dabf97..202fe99f7 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -58,9 +58,16 @@ "exportComplete": "md.export.complete", "exportRequestPrefix": "md.export.request", "callbackRequest": "md.tasks.callback", +<<<<<<< Updated upstream "aideClinicalReviewRequest": "aide.clinical_review.request", "exportHl7": "md.export.hl7", "exportHl7Complete": "md.export.hl7complete" +||||||| constructed merge base + "aideClinicalReviewRequest": "aide.clinical_review.request" +======= + "aideClinicalReviewRequest": "aide.clinical_review.request", + "exportHL7Complete": "md.export.hl7complete" +>>>>>>> Stashed changes }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", From d6920d7682c2fac261a509b454e7812165a7caf2 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 1 Dec 2023 12:56:04 +0000 Subject: [PATCH 7/8] mangled appsettings Signed-off-by: Neil South --- src/WorkflowManager/WorkflowManager/appsettings.json | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index 202fe99f7..502dabf97 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -58,16 +58,9 @@ "exportComplete": "md.export.complete", "exportRequestPrefix": "md.export.request", "callbackRequest": "md.tasks.callback", -<<<<<<< Updated upstream "aideClinicalReviewRequest": "aide.clinical_review.request", "exportHl7": "md.export.hl7", "exportHl7Complete": "md.export.hl7complete" -||||||| constructed merge base - "aideClinicalReviewRequest": "aide.clinical_review.request" -======= - "aideClinicalReviewRequest": "aide.clinical_review.request", - "exportHL7Complete": "md.export.hl7complete" ->>>>>>> Stashed changes }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", From 1b4851d36ef9d1ae277ef6b48ba747ce1a427632 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 1 Dec 2023 12:59:46 +0000 Subject: [PATCH 8/8] extra logging Signed-off-by: Neil South --- src/WorkflowManager/Logging/Log.200000.Workflow.cs | 3 +++ .../WorkflowExecuter/Services/WorkflowExecuterService.cs | 1 + 2 files changed, 4 insertions(+) diff --git a/src/WorkflowManager/Logging/Log.200000.Workflow.cs b/src/WorkflowManager/Logging/Log.200000.Workflow.cs index e69836004..fff85817a 100644 --- a/src/WorkflowManager/Logging/Log.200000.Workflow.cs +++ b/src/WorkflowManager/Logging/Log.200000.Workflow.cs @@ -108,5 +108,8 @@ public static partial class Log [LoggerMessage(EventId = 210007, Level = LogLevel.Information, Message = "Exporting to MIG task Id {taskid}, export destination {destination} number of files {fileCount} Mig data plugins {plugins}.")] public static partial void LogMigExport(this ILogger logger, string taskid, string destination, int fileCount, string plugins); + + [LoggerMessage(EventId = 200018, Level = LogLevel.Error, Message = "ExportList or Artifacts are empty! workflowInstanceId {workflowInstanceId} TaskId {taskId}")] + public static partial void ExportListOrArtifactsAreEmpty(this ILogger logger, string taskId, string workflowInstanceId); } } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 3378fee75..f7fc03b5c 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -685,6 +685,7 @@ private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInsta if (exportList is null || artifactValues is null) { + _logger.ExportListOrArtifactsAreEmpty(task.TaskId, workflowInstance.Id); return; }