Skip to content

Commit

Permalink
Merge pull request #932 from Project-MONAI/AI-358
Browse files Browse the repository at this point in the history
Ai 358
  • Loading branch information
neildsouth authored Dec 21, 2023
2 parents 460e86a + 708ceb2 commit 84ee385
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
}
else
{
item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList();
item.Artifacts = item.Artifacts.Union(existing.Artifacts).ToList();
var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts);
await _artifactReceivedItemsCollection
.UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ await _artifactsRepository
return true;
}

private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject taskTemplate, string taskId)
{
var artifactList = message.Artifacts.Select(a => $"{a.Path}").ToList();
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary<string, bool>();
Expand All @@ -263,22 +263,36 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message

var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{m.Path}").Value).ToList();

var addedNew = false;
var validArtifacts = new Dictionary<string, string>();
foreach (var artifact in messageArtifactsInStorage)
{
var match = task.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
var match = taskTemplate.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
if (match is not null && validArtifacts.ContainsKey(match!.Name) is false)
{
validArtifacts.Add(match.Name, $"{artifact.Path}");

}
}

var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId);

currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes

_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
foreach (var artifact in validArtifacts)
{
if (currentTask?.OutputArtifacts.ContainsKey(artifact.Key) is false)
{
// adding the actual paths here, the parent function does the saving of the changes
currentTask?.OutputArtifacts.Add(artifact.Key, artifact.Value);
addedNew = true;
}
}

if (currentTask is not null && addedNew)
{
_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask);
}
}

private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,
Expand Down Expand Up @@ -511,9 +525,13 @@ public async Task<bool> ProcessExportComplete(ExportCompleteEvent message, strin
return false;
}

if (string.Compare(task.TaskType, ValidationConstants.ExportTaskType, true) == 0)
switch (task.TaskType)
{
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
case TaskTypeConstants.DicomExportTask:
case TaskTypeConstants.HL7ExportTask:
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
default:
break;
}
}

Expand Down Expand Up @@ -612,7 +630,12 @@ private async Task<bool> ExternalAppRequest(ExternalAppRequestEvent externalAppR
return true;
}

private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List<string>? plugins = null)
private async Task HandleDicomExportAsync(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
List<string>? plugins = null)
{
plugins ??= new List<string>();
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
Expand All @@ -629,15 +652,20 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched);
}

private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
bool enforceDcmOnly = true)
{
var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray();
if (exportList is null || !exportList.Any())
{
exportList = null;
}

var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId);
var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId, enforceDcmOnly);

if (artifactValues.IsNullOrEmpty())
{
Expand All @@ -646,7 +674,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
return (exportList, artifactValues);
}

private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId)
private async Task<string[]> GetArtifactValues(
WorkflowRevision workflow, WorkflowInstance workflowInstance,
TaskExecution task,
string[]? exportList,
string correlationId,
bool enforceDcmOnly = true)
{
var artifactValues = GetDicomExports(workflow, task, exportList);

Expand All @@ -660,7 +693,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl
artifact,
true);

var dcmFiles = objects?.Where(o => o.IsValidDicomFile())?.ToList();
var dcmFiles = objects?.Where(o => o.IsValidDicomFile() || enforceDcmOnly is false)?.ToList();

if (dcmFiles?.IsNullOrEmpty() is false)
{
Expand All @@ -681,7 +714,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl

private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
{
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId, false);

if (exportList is null || artifactValues is null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Monai.Deploy.WorkflowManager.Common.IntegrationTests.POCO;
#pragma warning disable CS8602 // Dereference of a possibly null reference.
Expand Down Expand Up @@ -209,7 +210,7 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName)
ExecutionId = Guid.NewGuid().ToString(),
TaskId = "7d7c8b83-6628-413c-9912-a89314e5e2d5",
OutputDirectory = "payloadId/workflows/workflowInstanceId/executionId/",
TaskType = "Export",
TaskType = TaskTypeConstants.DicomExportTask,
Status = TaskExecutionStatus.Dispatched
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.Messaging.Common;
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Artifact = Monai.Deploy.WorkflowManager.Common.Contracts.Models.Artifact;
// ReSharper disable ArrangeObjectCreationWhenTypeEvident
Expand Down Expand Up @@ -2174,7 +2175,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2235,7 +2236,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2296,7 +2297,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2358,7 +2359,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand All @@ -2375,7 +2376,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_2",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 3",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2437,7 +2438,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3184,44 +3184,7 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue()

Assert.True(result);
}
[Fact]
public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync()
{
var artifactPath = "some path here";
//incoming artifacts
var message = new ArtifactsReceivedEvent
{
WorkflowInstanceId = "123", TaskId = "456",
Artifacts = new List<Messaging.Common.Artifact>() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } }
};
var workflowInstance = new WorkflowInstance
{
WorkflowId = "789", Tasks = new List<TaskExecution>()
{ new TaskExecution() { TaskId = "456" } }
};
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))!
.ReturnsAsync(workflowInstance);
//expected artifacts
var templateArtifacts = new OutputArtifact[]
{
new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"},
};
var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } };
var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } };
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
.ReturnsAsync(workflowTemplate);

_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Dictionary<string, bool> { { $"{message.PayloadId}/{artifactPath}", true } });

//previously received artifacts
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))
.ReturnsAsync((List<ArtifactReceivedItems>?)null);

var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message);

_workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()), Times.Once());
}
[Fact]
public async Task ProcessPayload_WithExportTask_NoExportsFails()
{
Expand Down

0 comments on commit 84ee385

Please sign in to comment.