diff --git a/src/Api/Storage/FileStorageMetadata.cs b/src/Api/Storage/FileStorageMetadata.cs index 06b66bff8..ec3e3b310 100644 --- a/src/Api/Storage/FileStorageMetadata.cs +++ b/src/Api/Storage/FileStorageMetadata.cs @@ -116,5 +116,7 @@ public virtual void SetFailed() { File.SetFailed(); } + + public string? PayloadId { get; set; } } } diff --git a/src/Configuration/ConfigurationValidator.cs b/src/Configuration/ConfigurationValidator.cs index c19d175d4..f54381be2 100644 --- a/src/Configuration/ConfigurationValidator.cs +++ b/src/Configuration/ConfigurationValidator.cs @@ -120,7 +120,7 @@ private bool IsValidDirectory(string source, string directory) private bool IsValidBucketName(string source, string bucketName) { var valid = IsNotNullOrWhiteSpace(source, bucketName); - var regex = new Regex("(?=^.{3,63}$)(^[a-z0-9]+[a-z0-9\\-]+[a-z0-9]+$)"); + var regex = new Regex("(?=^.{3,63}$)(^[a-z0-9]+[a-z0-9\\-]+[a-z0-9]+$)", new RegexOptions(), TimeSpan.FromSeconds(5)); if (!regex.IsMatch(bucketName)) { valid = false; diff --git a/src/Database/EntityFramework/Test/SourceApplicationEntityRepositoryTest.cs b/src/Database/EntityFramework/Test/SourceApplicationEntityRepositoryTest.cs index 536e165df..ad6487ef8 100644 --- a/src/Database/EntityFramework/Test/SourceApplicationEntityRepositoryTest.cs +++ b/src/Database/EntityFramework/Test/SourceApplicationEntityRepositoryTest.cs @@ -119,6 +119,7 @@ public async Task GivenAAETitleName_WhenFindByAETAsyncIsCalled_ExpectItToReturnM Assert.Equal("AET1", actual.FirstOrDefault()!.Name); actual = await store.FindByAETAsync("AET6").ConfigureAwait(false); + Assert.NotNull(actual); Assert.Empty(actual); } diff --git a/src/Database/MongoDB/Integration.Test/SourceApplicationEntityRepositoryTest.cs b/src/Database/MongoDB/Integration.Test/SourceApplicationEntityRepositoryTest.cs index 3760f2d7b..83ec01f0f 100644 --- a/src/Database/MongoDB/Integration.Test/SourceApplicationEntityRepositoryTest.cs +++ b/src/Database/MongoDB/Integration.Test/SourceApplicationEntityRepositoryTest.cs @@ -124,6 +124,7 @@ public async Task GivenAETitle_WhenFindByAETitleAsyncIsCalled_ExpectItToReturnMa Assert.Equal("AET1", actual.FirstOrDefault()!.Name); actual = await store.FindByAETAsync("AET6").ConfigureAwait(false); + Assert.NotNull(actual); Assert.Empty(actual); } diff --git a/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs b/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs index e6e9957c4..afdad8c78 100644 --- a/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs +++ b/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs @@ -27,8 +27,8 @@ public static partial class Log [LoggerMessage(EventId = 4001, Level = LogLevel.Debug, Message = "Upload statistics: {threads} threads, {seconds} seconds.")] public static partial void UploadStats(this ILogger logger, int threads, double seconds); - [LoggerMessage(EventId = 4002, Level = LogLevel.Debug, Message = "Uploading file to temporary store at {filePath}.")] - public static partial void UploadingFileToTemporaryStore(this ILogger logger, string filePath); + [LoggerMessage(EventId = 4002, Level = LogLevel.Debug, Message = "Uploading file to storeage at {filePath}.")] + public static partial void UploadingFileToStoreage(this ILogger logger, string filePath); [LoggerMessage(EventId = 4003, Level = LogLevel.Information, Message = "Instance queued for upload {identifier}. Items in queue {count} using memory {memoryUsageKb}KB.")] public static partial void InstanceAddedToUploadQueue(this ILogger logger, string identifier, int count, double memoryUsageKb); @@ -36,11 +36,11 @@ public static partial class Log [LoggerMessage(EventId = 4004, Level = LogLevel.Debug, Message = "Error removing objects that are pending upload during startup.")] public static partial void ErrorRemovingPendingUploadObjects(this ILogger logger, Exception ex); - [LoggerMessage(EventId = 4005, Level = LogLevel.Error, Message = "Error uploading temporary store. Waiting {timeSpan} before next retry. Retry attempt {retryCount}.")] + [LoggerMessage(EventId = 4005, Level = LogLevel.Error, Message = "Error uploading storeage. Waiting {timeSpan} before next retry. Retry attempt {retryCount}.")] public static partial void ErrorUploadingFileToTemporaryStore(this ILogger logger, TimeSpan timespan, int retryCount, Exception ex); - [LoggerMessage(EventId = 4006, Level = LogLevel.Information, Message = "File uploaded to temporary store at {filePath}.")] - public static partial void UploadedFileToTemporaryStore(this ILogger logger, string filePath); + [LoggerMessage(EventId = 4006, Level = LogLevel.Information, Message = "File uploaded to storeage at {filePath}.")] + public static partial void UploadedFileToStoreage(this ILogger logger, string filePath); [LoggerMessage(EventId = 4007, Level = LogLevel.Debug, Message = "Items in queue {count}.")] public static partial void InstanceInUploadQueue(this ILogger logger, int count); diff --git a/src/InformaticsGateway/Services/Connectors/DataRetrievalService.cs b/src/InformaticsGateway/Services/Connectors/DataRetrievalService.cs index 88cf26757..c664e9aeb 100644 --- a/src/InformaticsGateway/Services/Connectors/DataRetrievalService.cs +++ b/src/InformaticsGateway/Services/Connectors/DataRetrievalService.cs @@ -198,8 +198,11 @@ private async Task NotifyNewInstance(InferenceRequest inferenceRequest, Dictiona { retrievedFiles[key].SetWorkflows(inferenceRequest.Application.Id); } + var FileMeta = retrievedFiles[key]; + + var payloadId = await _payloadAssembler.Queue(inferenceRequest.TransactionId, retrievedFiles[key]).ConfigureAwait(false); + retrievedFiles[key].PayloadId = payloadId.ToString(); _uploadQueue.Queue(retrievedFiles[key]); - await _payloadAssembler.Queue(inferenceRequest.TransactionId, retrievedFiles[key]).ConfigureAwait(false); } } diff --git a/src/InformaticsGateway/Services/Connectors/IPayloadAssembler.cs b/src/InformaticsGateway/Services/Connectors/IPayloadAssembler.cs index 2a5ebde02..5206e90d7 100644 --- a/src/InformaticsGateway/Services/Connectors/IPayloadAssembler.cs +++ b/src/InformaticsGateway/Services/Connectors/IPayloadAssembler.cs @@ -14,6 +14,7 @@ * limitations under the License. */ +using System; using System.Threading; using System.Threading.Tasks; using Monai.Deploy.InformaticsGateway.Api.Storage; @@ -30,7 +31,7 @@ internal interface IPayloadAssembler /// /// The bucket group the file belongs to. /// Path to the file to be added to the payload bucket. - Task Queue(string bucket, FileStorageMetadata file); + Task Queue(string bucket, FileStorageMetadata file); /// /// Queue a new file for the spcified payload bucket. @@ -38,7 +39,7 @@ internal interface IPayloadAssembler /// The bucket group the file belongs to. /// Path to the file to be added to the payload bucket. /// Number of seconds to wait for additional files. - Task Queue(string bucket, FileStorageMetadata file, uint timeout); + Task Queue(string bucket, FileStorageMetadata file, uint timeout); /// /// Dequeue a payload from the queue for the message broker to notify subscribers. diff --git a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs index 265e4dbd0..83bf3973e 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs @@ -87,7 +87,7 @@ private async Task RemovePendingPayloads() /// /// Name of the bucket where the file would be added to /// Instance to be queued - public async Task Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false); + public async Task Queue(string bucket, FileStorageMetadata file) => await Queue(bucket, file, DEFAULT_TIMEOUT).ConfigureAwait(false); /// /// Queues a new instance of . @@ -95,7 +95,7 @@ private async Task RemovePendingPayloads() /// Name of the bucket where the file would be added to /// Instance to be queued /// Number of seconds the bucket shall wait before sending the payload to be processed. Note: timeout cannot be modified once the bucket is created. - public async Task Queue(string bucket, FileStorageMetadata file, uint timeout) + public async Task Queue(string bucket, FileStorageMetadata file, uint timeout) { Guard.Against.Null(file); @@ -106,6 +106,7 @@ public async Task Queue(string bucket, FileStorageMetadata file, uint timeout) var payload = await CreateOrGetPayload(bucket, file.CorrelationId, timeout).ConfigureAwait(false); payload.Add(file); _logger.FileAddedToBucket(payload.Key, payload.Count); + return payload.PayloadId; } /// diff --git a/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs b/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs index bfc65d558..0906f8f4f 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadMoveActionHandler.cs @@ -77,7 +77,6 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock moveQueue var stopwatch = Stopwatch.StartNew(); try { - await Move(payload, cancellationToken).ConfigureAwait(false); await NotifyIfCompleted(payload, notificationQueue, cancellationToken).ConfigureAwait(false); } catch (Exception ex) @@ -127,149 +126,6 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock notif } } - private async Task Move(Payload payload, CancellationToken cancellationToken) - { - Guard.Against.Null(payload); - - _logger.MovingFIlesInPayload(payload.PayloadId, _options.Value.Storage.StorageServiceBucketName); - - var options = new ParallelOptions - { - CancellationToken = cancellationToken, - MaxDegreeOfParallelism = _options.Value.Storage.ConcurrentUploads - }; - - var exceptions = new List(); - await Parallel.ForEachAsync(payload.Files, options, async (file, cancellationToke) => - { - try - { - switch (file) - { - case DicomFileStorageMetadata dicom: - if (!string.IsNullOrWhiteSpace(dicom.JsonFile.TemporaryPath)) - { - await MoveFile(payload.PayloadId, dicom.Id, dicom.JsonFile, cancellationToken).ConfigureAwait(false); - } - break; - } - - await MoveFile(payload.PayloadId, file.Id, file.File, cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - exceptions.Add(ex); - } - }).ConfigureAwait(false); - - if (exceptions.Any()) - { - throw new AggregateException(exceptions); - } - } - - private async Task MoveFile(Guid payloadId, string identity, StorageObjectMetadata file, CancellationToken cancellationToken) - { - Guard.Against.NullOrWhiteSpace(identity); - Guard.Against.Null(file); - - if (file.IsMoveCompleted) - { - _logger.AlreadyMoved(payloadId, file.UploadPath); - return; - } - - _logger.MovingFileToPayloadDirectory(payloadId, file.UploadPath); - - try - { - await _storageService.CopyObjectAsync( - file.TemporaryBucketName, - file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), - _options.Value.Storage.StorageServiceBucketName, - file.GetPayloadPath(payloadId), - cancellationToken).ConfigureAwait(false); - - await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false); - } - catch (StorageObjectNotFoundException ex) when (ex.Message.Contains("Not found", StringComparison.OrdinalIgnoreCase)) // TODO: StorageLib shall not throw any errors from MINIO - { - // when file cannot be found on the Storage Service, we assume file has been moved previously by verifying the file exists on destination. - _logger.FileMissingInPayload(payloadId, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), ex); - await VerifyFileExists(payloadId, file, cancellationToken).ConfigureAwait(false); - } - catch (StorageConnectionException ex) - { - _logger.StorageServiceConnectionError(ex); - throw new PayloadNotifyException(PayloadNotifyException.FailureReason.ServiceUnavailable); - } - catch (Exception ex) - { - _logger.PayloadMoveException(ex); - await LogFilesInMinIo(file.TemporaryBucketName, cancellationToken).ConfigureAwait(false); - throw new FileMoveException(file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), file.UploadPath, ex); - } - - try - { - _logger.DeletingFileFromTemporaryBbucket(file.TemporaryBucketName, identity, file.TemporaryPath); - await _storageService.RemoveObjectAsync(file.TemporaryBucketName, file.GetTempStoragPath(_options.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false); - } - catch (Exception) - { - _logger.ErrorDeletingFileAfterMoveComplete(file.TemporaryBucketName, identity, file.TemporaryPath); - } - finally - { - file.SetMoved(_options.Value.Storage.StorageServiceBucketName); - } - } - - private async Task VerifyFileExists(Guid payloadId, StorageObjectMetadata file, CancellationToken cancellationToken) - { - await Policy - .Handle() - .WaitAndRetryAsync( - _options.Value.Storage.Retries.RetryDelays, - (exception, timeSpan, retryCount, context) => - { - _logger.ErrorUploadingFileToTemporaryStore(timeSpan, retryCount, exception); - }) - .ExecuteAsync(async () => - { - var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - internalCancellationTokenSource.CancelAfter(_options.Value.Storage.StorageServiceListTimeout); - var exists = await _storageService.VerifyObjectExistsAsync(_options.Value.Storage.StorageServiceBucketName, file.GetPayloadPath(payloadId), cancellationToken).ConfigureAwait(false); - if (!exists) - { - _logger.FileMovedVerificationFailure(payloadId, file.UploadPath); - throw new PayloadNotifyException(PayloadNotifyException.FailureReason.MoveFailure, false); - } - }) - .ConfigureAwait(false); - } - - private async Task LogFilesInMinIo(string bucketName, CancellationToken cancellationToken) - { - try - { - var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - internalCancellationTokenSource.CancelAfter(_options.Value.Storage.StorageServiceListTimeout); - var listingResults = await _storageService.ListObjectsAsync(bucketName, recursive: true, cancellationToken: internalCancellationTokenSource.Token).ConfigureAwait(false); - _logger.FilesFounddOnStorageService(bucketName, listingResults.Count); - var files = new List(); - foreach (var item in listingResults) - { - files.Add(item.FilePath); - } - _logger.FileFounddOnStorageService(bucketName, string.Join(Environment.NewLine, files)); - } - catch (Exception ex) - { - _logger.ErrorListingFilesOnStorageService(ex); - } - } - private async Task UpdatePayloadState(Payload payload, Exception ex, CancellationToken cancellationToken = default) { Guard.Against.Null(payload); diff --git a/src/InformaticsGateway/Services/DicomWeb/IStreamsWriter.cs b/src/InformaticsGateway/Services/DicomWeb/IStreamsWriter.cs index 7f9cbb2df..1c3cfad60 100644 --- a/src/InformaticsGateway/Services/DicomWeb/IStreamsWriter.cs +++ b/src/InformaticsGateway/Services/DicomWeb/IStreamsWriter.cs @@ -168,12 +168,14 @@ private async Task SaveInstance(Stream stream, string studyInstanceUid, string w { dicomInfo.SetWorkflows(workflowName); } + // for DICOMweb, use correlation ID as the grouping key + var payloadId = await _payloadAssembler.Queue(correlationId, dicomInfo, _configuration.Value.DicomWeb.Timeout).ConfigureAwait(false); + dicomInfo.PayloadId = payloadId.ToString(); await dicomInfo.SetDataStreams(dicomFile, dicomFile.ToJson(_configuration.Value.Dicom.WriteDicomJson, _configuration.Value.Dicom.ValidateDicomOnSerialization), _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false); _uploadQueue.Queue(dicomInfo); - // for DICOMweb, use correlation ID as the grouping key - await _payloadAssembler.Queue(correlationId, dicomInfo, _configuration.Value.DicomWeb.Timeout).ConfigureAwait(false); + _logger.QueuedStowInstance(); AddSuccess(null, uids); diff --git a/src/InformaticsGateway/Services/Fhir/FhirService.cs b/src/InformaticsGateway/Services/Fhir/FhirService.cs index f1cc9eab6..45b706ce9 100644 --- a/src/InformaticsGateway/Services/Fhir/FhirService.cs +++ b/src/InformaticsGateway/Services/Fhir/FhirService.cs @@ -87,8 +87,9 @@ public async Task StoreAsync(HttpRequest request, string correl throw new FhirStoreException(correlationId, $"Provided resource is of type '{content.InternalResourceType}' but request targeted type '{resourceType}'.", IssueType.Invalid); } + var payloadId = await _payloadAssembler.Queue(correlationId, content.Metadata, Resources.PayloadAssemblerTimeout).ConfigureAwait(false); + content.Metadata.PayloadId = payloadId.ToString(); _uploadQueue.Queue(content.Metadata); - await _payloadAssembler.Queue(correlationId, content.Metadata, Resources.PayloadAssemblerTimeout).ConfigureAwait(false); _logger.QueuedStowInstance(); content.StatusCode = StatusCodes.Status201Created; diff --git a/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs b/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs index 8e2ae602d..5746a9ba6 100644 --- a/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs +++ b/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs @@ -170,8 +170,9 @@ private async Task OnDisconnect(IMllpClient client, MllpClientResult result) { var hl7Fileetadata = new Hl7FileStorageMetadata(client.ClientId.ToString()); await hl7Fileetadata.SetDataStream(message.HL7Message, _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false); + var payloadId = await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Fileetadata).ConfigureAwait(false); + hl7Fileetadata.PayloadId = payloadId.ToString(); _uploadQueue.Queue(hl7Fileetadata); - await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Fileetadata).ConfigureAwait(false); } } catch (Exception ex) diff --git a/src/InformaticsGateway/Services/Scp/ApplicationEntityHandler.cs b/src/InformaticsGateway/Services/Scp/ApplicationEntityHandler.cs index 34d7337ff..b2d9ca5af 100644 --- a/src/InformaticsGateway/Services/Scp/ApplicationEntityHandler.cs +++ b/src/InformaticsGateway/Services/Scp/ApplicationEntityHandler.cs @@ -113,12 +113,14 @@ public async Task HandleInstanceAsync(DicomCStoreRequest request, string calledA } await dicomInfo.SetDataStreams(request.File, request.File.ToJson(_dicomJsonOptions, _validateDicomValueOnJsonSerialization), _options.Value.Storage.TemporaryDataStorage, _fileSystem, _options.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false); - _uploadQueue.Queue(dicomInfo); var dicomTag = FellowOakDicom.DicomTag.Parse(_configuration.Grouping); _logger.QueueInstanceUsingDicomTag(dicomTag); var key = request.Dataset.GetSingleValue(dicomTag); - await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false); + + var payloadid = await _payloadAssembler.Queue(key, dicomInfo, _configuration.Timeout).ConfigureAwait(false); + dicomInfo.PayloadId = payloadid.ToString(); + _uploadQueue.Queue(dicomInfo); } private bool AcceptsSopClass(string sopClassUid) diff --git a/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs b/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs index 89159e3a2..fb30a5d30 100644 --- a/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs +++ b/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs @@ -158,12 +158,12 @@ private async Task ProcessObject(int thread, FileStorageMetadata blob) case DicomFileStorageMetadata dicom: if (!string.IsNullOrWhiteSpace(dicom.JsonFile.TemporaryPath)) { - await UploadFileAndConfirm(dicom.Id, dicom.JsonFile, dicom.Source, dicom.Workflows, _cancellationTokenSource.Token).ConfigureAwait(false); + await UploadFileAndConfirm(dicom.Id, dicom.JsonFile, dicom.Source, dicom.Workflows, blob.PayloadId, _cancellationTokenSource.Token).ConfigureAwait(false); } break; } - await UploadFileAndConfirm(blob.Id, blob.File, blob.Source, blob.Workflows, _cancellationTokenSource.Token).ConfigureAwait(false); + await UploadFileAndConfirm(blob.Id, blob.File, blob.Source, blob.Workflows, blob.PayloadId, _cancellationTokenSource.Token).ConfigureAwait(false); } catch (Exception ex) { @@ -177,7 +177,7 @@ private async Task ProcessObject(int thread, FileStorageMetadata blob) } } - private async Task UploadFileAndConfirm(string identifier, StorageObjectMetadata storageObjectMetadata, string source, List workflows, CancellationToken cancellationToken) + private async Task UploadFileAndConfirm(string identifier, StorageObjectMetadata storageObjectMetadata, string source, List workflows, string payloadId, CancellationToken cancellationToken) { Guard.Against.NullOrWhiteSpace(identifier); Guard.Against.Null(storageObjectMetadata); @@ -192,12 +192,14 @@ private async Task UploadFileAndConfirm(string identifier, StorageObjectMetadata var count = 3; do { - await UploadFile(storageObjectMetadata, source, workflows, cancellationToken).ConfigureAwait(false); + await UploadFile(storageObjectMetadata, source, workflows, payloadId, cancellationToken).ConfigureAwait(false); if (count-- <= 0) { throw new FileUploadException($"Failed to upload file after retries {identifier}."); } - } while (!(await VerifyExists(storageObjectMetadata.GetTempStoragPath(_configuration.Value.Storage.RemoteTemporaryStoragePath), cancellationToken).ConfigureAwait(false))); + } while (!( + await VerifyExists(storageObjectMetadata.GetPayloadPath(Guid.Parse(payloadId)), cancellationToken).ConfigureAwait(false) + )); } private async Task VerifyExists(string path, CancellationToken cancellationToken) @@ -214,16 +216,16 @@ private async Task VerifyExists(string path, CancellationToken cancellatio { var internalCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); internalCancellationTokenSource.CancelAfter(_configuration.Value.Storage.StorageServiceListTimeout); - var exists = await _storageService.VerifyObjectExistsAsync(_configuration.Value.Storage.TemporaryStorageBucket, path).ConfigureAwait(false); + var exists = await _storageService.VerifyObjectExistsAsync(_configuration.Value.Storage.StorageServiceBucketName, path).ConfigureAwait(false); _logger.VerifyFileExists(path, exists); return exists; }) .ConfigureAwait(false); } - private async Task UploadFile(StorageObjectMetadata storageObjectMetadata, string source, List workflows, CancellationToken cancellationToken) + private async Task UploadFile(StorageObjectMetadata storageObjectMetadata, string source, List workflows, string payloadId, CancellationToken cancellationToken) { - _logger.UploadingFileToTemporaryStore(storageObjectMetadata.TemporaryPath); + _logger.UploadingFileToStoreage(storageObjectMetadata.TemporaryPath); var metadata = new Dictionary { { FileMetadataKeys.Source, source }, @@ -242,17 +244,21 @@ await Policy { if (storageObjectMetadata.IsUploaded) { return; } + var bucket = _configuration.Value.Storage.StorageServiceBucketName; + var path = storageObjectMetadata.GetPayloadPath(Guid.Parse(payloadId)); + storageObjectMetadata.Data.Seek(0, System.IO.SeekOrigin.Begin); await _storageService.PutObjectAsync( - _configuration.Value.Storage.TemporaryStorageBucket, - storageObjectMetadata.GetTempStoragPath(_configuration.Value.Storage.RemoteTemporaryStoragePath), + bucket, + path, storageObjectMetadata.Data, storageObjectMetadata.Data.Length, storageObjectMetadata.ContentType, metadata, cancellationToken).ConfigureAwait(false); - storageObjectMetadata.SetUploaded(_configuration.Value.Storage.TemporaryStorageBucket); - _logger.UploadedFileToTemporaryStore(storageObjectMetadata.TemporaryPath); + storageObjectMetadata.SetUploaded(_configuration.Value.Storage.TemporaryStorageBucket); // deletes local file and sets uploaded to true + _logger.UploadedFileToStoreage(storageObjectMetadata.TemporaryPath); + storageObjectMetadata.SetMoved(_configuration.Value.Storage.StorageServiceBucketName); // set bucket, date moved, and move complete }) .ConfigureAwait(false); } diff --git a/src/InformaticsGateway/Test/Services/Connectors/PayloadMoveActionHandlerTest.cs b/src/InformaticsGateway/Test/Services/Connectors/PayloadMoveActionHandlerTest.cs index 748229209..2bf054a68 100644 --- a/src/InformaticsGateway/Test/Services/Connectors/PayloadMoveActionHandlerTest.cs +++ b/src/InformaticsGateway/Test/Services/Connectors/PayloadMoveActionHandlerTest.cs @@ -190,25 +190,31 @@ public async Task GivenAPayload_WhenAllFilesAreMove_ExpectPayloadToBeAddedToNoti }); var correlationId = Guid.NewGuid(); + var file1 = new DicomFileStorageMetadata(correlationId.ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString()); + file1.File.SetMoved("test"); + file1.JsonFile.SetMoved("test"); + var file2 = new FhirFileStorageMetadata(correlationId.ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Api.Rest.FhirStorageFormat.Json); + file2.File.SetMoved("test"); var payload = new Payload("key", correlationId.ToString(), 0) { RetryCount = 3, State = Payload.PayloadState.Move, Files = new List { - new DicomFileStorageMetadata(correlationId.ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString()), - new FhirFileStorageMetadata(correlationId.ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Api.Rest.FhirStorageFormat.Json), + file1, + file2, }, }; + var handler = new PayloadMoveActionHandler(_serviceScopeFactory.Object, _logger.Object, _options); await handler.MoveFilesAsync(payload, moveAction, notifyAction, _cancellationTokenSource.Token); Assert.True(notifyEvent.Wait(TimeSpan.FromSeconds(5))); - _storageService.Verify(p => p.CopyObjectAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.AtLeast(2)); - _storageService.Verify(p => p.RemoveObjectAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.AtLeast(2)); + //_storageService.Verify(p => p.CopyObjectAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.AtLeast(2)); + //_storageService.Verify(p => p.RemoveObjectAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.AtLeast(2)); } } } diff --git a/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs b/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs index 7aa593aab..b601a5876 100644 --- a/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs +++ b/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs @@ -119,7 +119,7 @@ public async Task GivenADicomFileStorageMetadata_WhenQueuedForUpload_ExpectTwoFi _storageService.Verify(p => p.PutObjectAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>(), It.IsAny()), Times.Exactly(2)); } - [RetryFact(10, 250)] + [RetryFact(10, 25000)] public async Task GivenAFhirFileStorageMetadata_WhenQueuedForUpload_ExpectSingleFileToBeUploaded() { var countdownEvent = new CountdownEvent(1); @@ -146,6 +146,7 @@ private async Task GenerateFhirFileStorageMetadata() var file = new FhirFileStorageMetadata(Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), FhirStorageFormat.Json); await file.SetDataStream("[]", TemporaryDataStorageLocation.Memory); + file.PayloadId = Guid.NewGuid().ToString(); return file; } @@ -166,6 +167,7 @@ private async Task GenerateDicomFileStorageMetadata() }; var dicomFile = new DicomFile(dataset); await file.SetDataStreams(dicomFile, "[]", TemporaryDataStorageLocation.Memory); + file.PayloadId = Guid.NewGuid().ToString(); return file; } } diff --git a/tests/Integration.Test/Common/DicomCStoreDataClient.cs b/tests/Integration.Test/Common/DicomCStoreDataClient.cs index 848590694..5b4400860 100644 --- a/tests/Integration.Test/Common/DicomCStoreDataClient.cs +++ b/tests/Integration.Test/Common/DicomCStoreDataClient.cs @@ -62,7 +62,7 @@ public async Task SendAsync(DataProvider dataProvider, params object[] args) var failureStatus = new List(); for (int i = 0; i < associations; i++) { - var files = dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations).Take(filesPerAssociations).ToList(); + var files = dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations).Take(filesPerAssociations).ToList(); // .Take(20) if (i + 1 == associations && dataProvider.DicomSpecs.Files.Count > (i + 1) * filesPerAssociations) { files.AddRange(dataProvider.DicomSpecs.Files.Skip(i * filesPerAssociations)); diff --git a/tests/Integration.Test/Drivers/RabbitMqConsumer.cs b/tests/Integration.Test/Drivers/RabbitMqConsumer.cs index 38f07d716..e82f53d1a 100644 --- a/tests/Integration.Test/Drivers/RabbitMqConsumer.cs +++ b/tests/Integration.Test/Drivers/RabbitMqConsumer.cs @@ -45,10 +45,10 @@ public RabbitMqConsumer(RabbitMQMessageSubscriberService subscriberService, stri _outputHelper = outputHelper ?? throw new ArgumentNullException(nameof(outputHelper)); _messages = new ConcurrentBag(); - subscriberService.Subscribe( + subscriberService.SubscribeAsync( queueName, queueName, - (eventArgs) => + async (eventArgs) => { _outputHelper.WriteLine($"Message received from queue {queueName} for {queueName}."); _messages.Add(eventArgs.Message); diff --git a/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs b/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs index 6d62a6723..ecdce7b2e 100644 --- a/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/ExportServicesStepDefinitions.cs @@ -36,7 +36,7 @@ namespace Monai.Deploy.InformaticsGateway.Integration.Test.StepDefinitions [CollectionDefinition("SpecFlowNonParallelizableFeatures", DisableParallelization = true)] public class DicomDimseScuServicesStepDefinitions { - internal static readonly TimeSpan DicomScpWaitTimeSpan = TimeSpan.FromMinutes(3); + internal static readonly TimeSpan DicomScpWaitTimeSpan = TimeSpan.FromMinutes(20); private readonly InformaticsGatewayConfiguration _informaticsGatewayConfiguration; private readonly Configurations _configuration; private readonly DicomScp _dicomServer; diff --git a/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs b/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs index 896923162..da3f396d2 100644 --- a/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs +++ b/tests/Integration.Test/StepDefinitions/SharedDefinitions.cs @@ -26,7 +26,7 @@ namespace Monai.Deploy.InformaticsGateway.Integration.Test.StepDefinitions [CollectionDefinition("SpecFlowNonParallelizableFeatures", DisableParallelization = true)] public class SharedDefinitions { - internal static readonly TimeSpan MessageWaitTimeSpan = TimeSpan.FromMinutes(10); + internal static readonly TimeSpan MessageWaitTimeSpan = TimeSpan.FromMinutes(3); private readonly InformaticsGatewayConfiguration _informaticsGatewayConfiguration; private readonly RabbitMqConsumer _receivedMessages; private readonly Assertions _assertions;