Skip to content

Commit

Permalink
Merge pull request #884 from Project-MONAI/nds-externalAppPayload
Browse files Browse the repository at this point in the history
Nds external app payload
  • Loading branch information
neildsouth authored Oct 5, 2023
2 parents a4c63ac + b44a423 commit f8e1f11
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/Common/Configuration/WorkflowManagerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class WorkflowManagerOptions : PagedOptions
public string DicomTagsDisallowed { get; set; } = string.Empty;

[ConfigurationKeyName("migExternalAppPlugins")]
public List<string> MigExternalAppPlugins { get; set; }
public string[] MigExternalAppPlugins { get; set; }

public WorkflowManagerOptions()
{
Expand Down
3 changes: 3 additions & 0 deletions src/WorkflowManager/Logging/Log.200000.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,8 @@ public static partial class Log

[LoggerMessage(EventId = 210006, Level = LogLevel.Information, Message = "Attached PatientMetadata {metadata}.")]
public static partial void AttachedPatientMetadataToTaskExec(this ILogger logger, string metadata);

[LoggerMessage(EventId = 210007, Level = LogLevel.Information, Message = "Exporting to MIG task Id {taskid}, export destination {destination} number of files {fileCount} Mig data plugins {plugins}.")]
public static partial void LogMigExport(this ILogger logger, string taskid, string destination, int fileCount, string plugins);
}
}
3 changes: 3 additions & 0 deletions src/WorkflowManager/Logging/Log.500000.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,8 @@ public static partial class Log

[LoggerMessage(EventId = 500016, Level = LogLevel.Debug, Message = "Export complete message received.")]
public static partial void ExportCompleteReceived(this ILogger logger);

[LoggerMessage(EventId = 200017, Level = LogLevel.Debug, Message = "Workflow continuation event so not creating payload.")]
public static partial void WorkflowContinuation(this ILogger logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,29 @@ public async Task ReceiveWorkflowPayload(MessageReceivedEventArgs message)
return;
}

var payload = await PayloadService.CreateAsync(requestEvent);
if (payload is null)
if (string.IsNullOrWhiteSpace(requestEvent.WorkflowInstanceId) || string.IsNullOrWhiteSpace(requestEvent.TaskId))
{
Logger.WorkflowRequestRequeuePayloadCreateError(message.Message.MessageId);
await _messageSubscriber.RequeueWithDelay(message.Message);

return;
}
var payload = await PayloadService.CreateAsync(requestEvent);
if (payload is null)
{
Logger.WorkflowRequestRequeuePayloadCreateError(message.Message.MessageId);
await _messageSubscriber.RequeueWithDelay(message.Message);

if (!await WorkflowExecuterService.ProcessPayload(requestEvent, payload))
{
Logger.WorkflowRequestRequeuePayloadProcessError(message.Message.MessageId);
await _messageSubscriber.RequeueWithDelay(message.Message);
return;
}

return;
if (!await WorkflowExecuterService.ProcessPayload(requestEvent, payload))
{
Logger.WorkflowRequestRequeuePayloadProcessError(message.Message.MessageId);
await _messageSubscriber.RequeueWithDelay(message.Message);

return;
}
}
else
{
Logger.WorkflowContinuation();
}

_messageSubscriber.Acknowledge(message.Message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ private void SetupPolling()
_logger.EventSubscription(ServiceName, TaskStatusUpdateRoutingKey);

_messageSubscriber.SubscribeAsync(ExportCompleteRoutingKey, ExportCompleteRoutingKey, OnExportCompleteReceivedCallback);
_logger.EventSubscription(ServiceName, ExportCompleteRoutingKey); }
_logger.EventSubscription(ServiceName, ExportCompleteRoutingKey);
}
private async Task OnWorkflowRequestReceivedCallbackAsync(MessageReceivedEventArgs eventArgs)
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public WorkflowExecuterService(
_defaultPerTaskTypeTimeoutMinutes = configuration.Value.PerTaskTypeTimeoutMinutes;
TaskDispatchRoutingKey = configuration.Value.Messaging.Topics.TaskDispatchRequest;
TaskTimeoutRoutingKey = configuration.Value.Messaging.Topics.AideClinicalReviewCancelation;
_migExternalAppPlugins = configuration.Value.MigExternalAppPlugins;
_migExternalAppPlugins = configuration.Value.MigExternalAppPlugins.ToList();
ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}";

_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand Down Expand Up @@ -482,6 +482,7 @@ private async Task<bool> UpdateWorkflowInstanceStatus(WorkflowInstance workflowI

return true;
}

private async Task HandleExternalAppAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
{
var plugins = _migExternalAppPlugins;
Expand Down Expand Up @@ -524,7 +525,6 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns

return;
}

await DispatchDicomExport(workflowInstance, task, exportList, artifactValues, correlationId, plugins);
}

Expand Down Expand Up @@ -563,6 +563,7 @@ private async Task<bool> DispatchDicomExport(WorkflowInstance workflowInstance,
return false;
}

_logger.LogMigExport(task.TaskId, string.Join(",", exportDestinations), artifactValues.Length, string.Join(",", plugins));
await ExportRequest(workflowInstance, task, exportDestinations, artifactValues, correlationId, plugins);
return await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched);
}
Expand Down Expand Up @@ -812,6 +813,7 @@ private async Task<bool> TimeOutEvent(WorkflowInstance workflowInstance, TaskExe
var exportRequestEvent = EventMapper.GenerateTaskCancellationEvent("", taskExec.ExecutionId, workflowInstance.Id, taskExec.TaskId, FailureReason.TimedOut, "Timed out");
var jsonMesssage = new JsonMessage<TaskCancellationEvent>(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, correlationId, Guid.NewGuid().ToString());

_logger.TaskTimedOut(taskExec.TaskId, workflowInstance.Id, taskExec.Timeout);
await _messageBrokerPublisherService.Publish(TaskTimeoutRoutingKey, jsonMesssage.ToMessage());
return true;
}
Expand Down
39 changes: 21 additions & 18 deletions src/WorkflowManager/WorkflowManager/appsettings.Local.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,25 @@
"scuAgentName": "monaiscu"
}
},
"dicomTagsDisallowed": "PatientName,PatientID,IssuerOfPatientID,TypeOfPatientID,IssuerOfPatientIDQualifiersSequence,SourcePatientGroupIdentificationSequence,GroupOfPatientsIdentificationSequence,SubjectRelativePositionInImage,PatientBirthDate,PatientBirthTime,PatientBirthDateInAlternativeCalendar,PatientDeathDateInAlternativeCalendar,PatientAlternativeCalendar,PatientSex,PatientInsurancePlanCodeSequence,PatientPrimaryLanguageCodeSequence,PatientPrimaryLanguageModifierCodeSequence,QualityControlSubject,QualityControlSubjectTypeCodeSequence,StrainDescription,StrainNomenclature,StrainStockNumber,StrainSourceRegistryCodeSequence,StrainStockSequence,StrainSource,StrainAdditionalInformation,StrainCodeSequence,GeneticModificationsSequence,GeneticModificationsDescription,GeneticModificationsNomenclature,GeneticModificationsCodeSequence,OtherPatientIDsRETIRED,OtherPatientNames,OtherPatientIDsSequence,PatientBirthName,PatientAge,PatientSize,PatientSizeCodeSequence,PatientBodyMassIndex,MeasuredAPDimension,MeasuredLateralDimension,PatientWeight,PatientAddress,InsurancePlanIdentificationRETIRED,PatientMotherBirthName,MilitaryRank,BranchOfService,MedicalRecordLocatorRETIRED,ReferencedPatientPhotoSequence,MedicalAlerts,Allergies,CountryOfResidence,RegionOfResidence,PatientTelephoneNumbers,PatientTelecomInformation,EthnicGroup,Occupation,SmokingStatus,AdditionalPatientHistory,PregnancyStatus,LastMenstrualDate,PatientReligiousPreference,PatientSpeciesDescription,PatientSpeciesCodeSequence,PatientSexNeutered,AnatomicalOrientationType,PatientBreedDescription,PatientBreedCodeSequence,BreedRegistrationSequence,BreedRegistrationNumber,BreedRegistryCodeSequence,ResponsiblePerson,ResponsiblePersonRole,ResponsibleOrganization,PatientComments,ExaminedBodyThickness"
},
"InformaticsGateway": {
"apiHost": "http://localhost:5010",
"username": "aide",
"password": "example"
},
"Kestrel": {
"EndPoints": {
"Http": {
"Url": "http://localhost:5000"
}
"dicomTagsDisallowed": "PatientName,PatientID,IssuerOfPatientID,TypeOfPatientID,IssuerOfPatientIDQualifiersSequence,SourcePatientGroupIdentificationSequence,GroupOfPatientsIdentificationSequence,SubjectRelativePositionInImage,PatientBirthDate,PatientBirthTime,PatientBirthDateInAlternativeCalendar,PatientDeathDateInAlternativeCalendar,PatientAlternativeCalendar,PatientSex,PatientInsurancePlanCodeSequence,PatientPrimaryLanguageCodeSequence,PatientPrimaryLanguageModifierCodeSequence,QualityControlSubject,QualityControlSubjectTypeCodeSequence,StrainDescription,StrainNomenclature,StrainStockNumber,StrainSourceRegistryCodeSequence,StrainStockSequence,StrainSource,StrainAdditionalInformation,StrainCodeSequence,GeneticModificationsSequence,GeneticModificationsDescription,GeneticModificationsNomenclature,GeneticModificationsCodeSequence,OtherPatientIDsRETIRED,OtherPatientNames,OtherPatientIDsSequence,PatientBirthName,PatientAge,PatientSize,PatientSizeCodeSequence,PatientBodyMassIndex,MeasuredAPDimension,MeasuredLateralDimension,PatientWeight,PatientAddress,InsurancePlanIdentificationRETIRED,PatientMotherBirthName,MilitaryRank,BranchOfService,MedicalRecordLocatorRETIRED,ReferencedPatientPhotoSequence,MedicalAlerts,Allergies,CountryOfResidence,RegionOfResidence,PatientTelephoneNumbers,PatientTelecomInformation,EthnicGroup,Occupation,SmokingStatus,AdditionalPatientHistory,PregnancyStatus,LastMenstrualDate,PatientReligiousPreference,PatientSpeciesDescription,PatientSpeciesCodeSequence,PatientSexNeutered,AnatomicalOrientationType,PatientBreedDescription,PatientBreedCodeSequence,BreedRegistrationSequence,BreedRegistrationNumber,BreedRegistryCodeSequence,ResponsiblePerson,ResponsiblePersonRole,ResponsibleOrganization,PatientComments,ExaminedBodyThickness",
"migExternalAppPlugins": [
"Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution.DicomDeidentifier, Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution, Version=0.0.0.0"
],
"InformaticsGateway": {
"apiHost": "http://localhost:5010",
"username": "aide",
"password": "example"
},
"LogHttpRequestQuery": false,
"LogHttpRequestBody": false,
"LogHttpResponseBody": true
},
"AllowedHosts": "*"
}
"Kestrel": {
"EndPoints": {
"Http": {
"Url": "http://localhost:5000"
}
},
"LogHttpRequestQuery": false,
"LogHttpRequestBody": false,
"LogHttpResponseBody": true
},
"AllowedHosts": "*"
}
}
2 changes: 1 addition & 1 deletion src/WorkflowManager/WorkflowManager/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
}
},
"dicomTagsDisallowed": "PatientName,PatientID,IssuerOfPatientID,TypeOfPatientID,IssuerOfPatientIDQualifiersSequence,SourcePatientGroupIdentificationSequence,GroupOfPatientsIdentificationSequence,SubjectRelativePositionInImage,PatientBirthDate,PatientBirthTime,PatientBirthDateInAlternativeCalendar,PatientDeathDateInAlternativeCalendar,PatientAlternativeCalendar,PatientSex,PatientInsurancePlanCodeSequence,PatientPrimaryLanguageCodeSequence,PatientPrimaryLanguageModifierCodeSequence,QualityControlSubject,QualityControlSubjectTypeCodeSequence,StrainDescription,StrainNomenclature,StrainStockNumber,StrainSourceRegistryCodeSequence,StrainStockSequence,StrainSource,StrainAdditionalInformation,StrainCodeSequence,GeneticModificationsSequence,GeneticModificationsDescription,GeneticModificationsNomenclature,GeneticModificationsCodeSequence,OtherPatientIDsRETIRED,OtherPatientNames,OtherPatientIDsSequence,PatientBirthName,PatientAge,PatientSize,PatientSizeCodeSequence,PatientBodyMassIndex,MeasuredAPDimension,MeasuredLateralDimension,PatientWeight,PatientAddress,InsurancePlanIdentificationRETIRED,PatientMotherBirthName,MilitaryRank,BranchOfService,MedicalRecordLocatorRETIRED,ReferencedPatientPhotoSequence,MedicalAlerts,Allergies,CountryOfResidence,RegionOfResidence,PatientTelephoneNumbers,PatientTelecomInformation,EthnicGroup,Occupation,SmokingStatus,AdditionalPatientHistory,PregnancyStatus,LastMenstrualDate,PatientReligiousPreference,PatientSpeciesDescription,PatientSpeciesCodeSequence,PatientSexNeutered,AnatomicalOrientationType,PatientBreedDescription,PatientBreedCodeSequence,BreedRegistrationSequence,BreedRegistrationNumber,BreedRegistryCodeSequence,ResponsiblePerson,ResponsiblePersonRole,ResponsibleOrganization,PatientComments,ExaminedBodyThickness",
"migExternalAppPlugins": "Monai.Deploy.InformaticsGateway.ExecutionPlugins.ExternalAppOutgoing, Monai.Deploy.InformaticsGateway, Version=0.0.0.0"
"migExternalAppPlugins": [ "Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution.DicomDeidentifier, Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution, Version=0.0.0.0" ]
},
"InformaticsGateway": {
"apiHost": "http://localhost:5010",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,28 @@ public void ExportCompletePayload_ErrorIsThrown_MessageIsRejectedAndRequeued()
_mockMessageBrokerSubscriberService.VerifyNoOtherCalls();
}

[Test]
public void ReceiveWorkflowPayload_With_WorkflowId_And_TaskID()
{

var exportRequestMessage = new WorkflowRequestEvent
{
CorrelationId = Guid.NewGuid().ToString(),
WorkflowInstanceId = Guid.NewGuid().ToString(),
TaskId = "exporttask"
};
var jsonMessage = new JsonMessage<WorkflowRequestEvent>(exportRequestMessage, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestMessage.CorrelationId);
var message = new MessageReceivedEventArgs(jsonMessage.ToMessage(), CancellationToken.None);

_mockEventPayloadValidator.Setup(p => p.ValidateWorkflowRequest(It.IsAny<WorkflowRequestEvent>())).Returns(true);

_workflowExecuterService.Setup(p => p.ProcessPayload(It.IsAny<WorkflowRequestEvent>(), It.IsAny<Payload>())).ReturnsAsync(true);

_eventPayloadReceiverService.ReceiveWorkflowPayload(message);

_payloadService.Verify(p => p.CreateAsync(It.IsAny<WorkflowRequestEvent>()), Times.Never());
}

private static MessageReceivedEventArgs CreateMessageReceivedEventArgs(string[] destinations)
{
var exportRequestMessage = new ExportRequestEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public WorkflowExecuterServiceTests()
Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" },
DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" }
},
MigExternalAppPlugins = new List<string> { { "examplePlugin" } }
});
MigExternalAppPlugins = new List<string> { { "examplePlugin" } }.ToArray()
}) ;

_storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary<string, string> { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } });

Expand Down

0 comments on commit f8e1f11

Please sign in to comment.