Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ai 358 #932

Merged
merged 4 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@
}
else
{
item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList();
item.Artifacts = item.Artifacts.Union(existing.Artifacts).ToList();
var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts);
await _artifactReceivedItemsCollection
.UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update)
.ConfigureAwait(false);
}
}
catch (Exception ex)

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / task-manager-integration-tests

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / unit-tests-and-codecov

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / unit-tests-and-codecov

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / docs

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / scan

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / analyze

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / workflow-executor-integration-tests

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / workflow-executor-integration-tests

The variable 'ex' is declared but never used

Check warning on line 193 in src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs

View workflow job for this annotation

GitHub Actions / docs

The variable 'ex' is declared but never used
{

throw;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ await _artifactsRepository
return true;
}

private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject taskTemplate, string taskId)
{
var artifactList = message.Artifacts.Select(a => $"{a.Path}").ToList();
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary<string, bool>();
Expand All @@ -263,22 +263,36 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message

var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{m.Path}").Value).ToList();

var addedNew = false;
var validArtifacts = new Dictionary<string, string>();
foreach (var artifact in messageArtifactsInStorage)
{
var match = task.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
var match = taskTemplate.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
if (match is not null && validArtifacts.ContainsKey(match!.Name) is false)
{
validArtifacts.Add(match.Name, $"{artifact.Path}");

}
}

var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId);

currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes

_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
foreach (var artifact in validArtifacts)
{
if (currentTask?.OutputArtifacts.ContainsKey(artifact.Key) is false)
{
// adding the actual paths here, the parent function does the saving of the changes
currentTask?.OutputArtifacts.Add(artifact.Key, artifact.Value);
addedNew = true;
}
}

if (currentTask is not null && addedNew)
{
_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask);
}
}

private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,
Expand Down Expand Up @@ -511,9 +525,13 @@ public async Task<bool> ProcessExportComplete(ExportCompleteEvent message, strin
return false;
}

if (string.Compare(task.TaskType, ValidationConstants.ExportTaskType, true) == 0)
switch (task.TaskType)
{
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
case TaskTypeConstants.DicomExportTask:
case TaskTypeConstants.HL7ExportTask:
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
default:
break;
}
}

Expand Down Expand Up @@ -612,7 +630,12 @@ private async Task<bool> ExternalAppRequest(ExternalAppRequestEvent externalAppR
return true;
}

private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List<string>? plugins = null)
private async Task HandleDicomExportAsync(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
List<string>? plugins = null)
{
plugins ??= new List<string>();
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
Expand All @@ -629,15 +652,20 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched);
}

private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
bool enforceDcmOnly = true)
{
var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray();
if (exportList is null || !exportList.Any())
{
exportList = null;
}

var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId);
var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId, enforceDcmOnly);

if (artifactValues.IsNullOrEmpty())
{
Expand All @@ -646,7 +674,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
return (exportList, artifactValues);
}

private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId)
private async Task<string[]> GetArtifactValues(
WorkflowRevision workflow, WorkflowInstance workflowInstance,
TaskExecution task,
string[]? exportList,
string correlationId,
bool enforceDcmOnly = true)
{
var artifactValues = GetDicomExports(workflow, task, exportList);

Expand All @@ -660,7 +693,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl
artifact,
true);

var dcmFiles = objects?.Where(o => o.IsValidDicomFile())?.ToList();
var dcmFiles = objects?.Where(o => o.IsValidDicomFile() || enforceDcmOnly is false)?.ToList();

if (dcmFiles?.IsNullOrEmpty() is false)
{
Expand All @@ -681,7 +714,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl

private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
{
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId, false);

if (exportList is null || artifactValues is null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Monai.Deploy.WorkflowManager.Common.IntegrationTests.POCO;
#pragma warning disable CS8602 // Dereference of a possibly null reference.
Expand Down Expand Up @@ -209,7 +210,7 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName)
ExecutionId = Guid.NewGuid().ToString(),
TaskId = "7d7c8b83-6628-413c-9912-a89314e5e2d5",
OutputDirectory = "payloadId/workflows/workflowInstanceId/executionId/",
TaskType = "Export",
TaskType = TaskTypeConstants.DicomExportTask,
Status = TaskExecutionStatus.Dispatched
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.Messaging.Common;
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Artifact = Monai.Deploy.WorkflowManager.Common.Contracts.Models.Artifact;
// ReSharper disable ArrangeObjectCreationWhenTypeEvident
Expand Down Expand Up @@ -2174,7 +2175,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2235,7 +2236,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2296,7 +2297,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2358,7 +2359,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand All @@ -2375,7 +2376,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_2",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 3",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2437,7 +2438,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3184,44 +3184,7 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue()

Assert.True(result);
}
[Fact]
public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync()
{
var artifactPath = "some path here";
//incoming artifacts
var message = new ArtifactsReceivedEvent
{
WorkflowInstanceId = "123", TaskId = "456",
Artifacts = new List<Messaging.Common.Artifact>() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } }
};
var workflowInstance = new WorkflowInstance
{
WorkflowId = "789", Tasks = new List<TaskExecution>()
{ new TaskExecution() { TaskId = "456" } }
};
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))!
.ReturnsAsync(workflowInstance);
//expected artifacts
var templateArtifacts = new OutputArtifact[]
{
new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"},
};
var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } };
var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } };
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
.ReturnsAsync(workflowTemplate);

_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Dictionary<string, bool> { { $"{message.PayloadId}/{artifactPath}", true } });

//previously received artifacts
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))
.ReturnsAsync((List<ArtifactReceivedItems>?)null);

var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message);

_workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()), Times.Once());
}
[Fact]
public async Task ProcessPayload_WithExportTask_NoExportsFails()
{
Expand Down
Loading