From 23848177aee8d6dc3f8be8a58a6638afc75cd082 Mon Sep 17 00:00:00 2001 From: Neil South Date: Wed, 24 Jul 2024 18:06:16 +0100 Subject: [PATCH] rejig services for better seperation Signed-off-by: Neil South --- .github/.gitversion.yml | 8 +- doc/dependency_decisions.yml | 1 + src/InformaticsGateway/Program.cs | 4 +- .../Services/Export/Hl7ExportService.cs | 10 +- .../Services/HealthLevel7/MllpService.cs | 266 +---------------- .../Services/HealthLevel7/MllpServiceHost.cs | 269 ++++++++++++++++++ .../Repositories/MonaiServiceLocatorTest.cs | 4 +- .../Services/Export/ExportHl7ServiceTests.cs | 19 +- .../Services/HealthLevel7/MllpServiceTest.cs | 28 +- 9 files changed, 316 insertions(+), 293 deletions(-) mode change 100644 => 100755 .github/.gitversion.yml create mode 100755 src/InformaticsGateway/Services/HealthLevel7/MllpServiceHost.cs diff --git a/.github/.gitversion.yml b/.github/.gitversion.yml old mode 100644 new mode 100755 index 6486d5775..56e2c3c07 --- a/.github/.gitversion.yml +++ b/.github/.gitversion.yml @@ -18,13 +18,13 @@ branches: main: tag: '' release: - tag: rc + tag: 'rc' develop: - tag: beta + tag: 'beta' feature: - tag: alpha.{BranchName} + tag: 'alpha.{BranchName}' pull-request: - tag: pr + tag: 'pr' ignore: sha: [] diff --git a/doc/dependency_decisions.yml b/doc/dependency_decisions.yml index d9608f850..bdfc6e842 100755 --- a/doc/dependency_decisions.yml +++ b/doc/dependency_decisions.yml @@ -588,6 +588,7 @@ - 8.0.2 - 8.0.3 - 8.0.6 + - 8.0.7 :when: 2022-10-14T23:37:16.793Z :who: mocsharp :why: MIT (https://github.com/dotnet/runtime/raw/main/LICENSE.TXT) diff --git a/src/InformaticsGateway/Program.cs b/src/InformaticsGateway/Program.cs index a3352862a..c74ffb94b 100755 --- a/src/InformaticsGateway/Program.cs +++ b/src/InformaticsGateway/Program.cs @@ -36,6 +36,7 @@ using Monai.Deploy.InformaticsGateway.Services.DicomWeb; using Monai.Deploy.InformaticsGateway.Services.Export; using Monai.Deploy.InformaticsGateway.Services.Fhir; +using Monai.Deploy.InformaticsGateway.Services.HealthLevel7; using Monai.Deploy.InformaticsGateway.Services.Http; using Monai.Deploy.InformaticsGateway.Services.Scp; using Monai.Deploy.InformaticsGateway.Services.Scu; @@ -138,7 +139,6 @@ internal static IHostBuilder CreateHostBuilder(string[] args) => services.AddSingleton(); services.AddSingleton(); - var timeout = TimeSpan.FromSeconds(hostContext.Configuration.GetValue("InformaticsGateway:dicomWeb:clientTimeout", DicomWebConfiguration.DefaultClientTimeout)); services .AddHttpClient("dicomweb", configure => configure.Timeout = timeout) @@ -163,7 +163,7 @@ internal static IHostBuilder CreateHostBuilder(string[] args) => services.AddHostedService(); services.AddHostedService(); services.AddHostedService(); - services.AddHostedService(); + services.AddHostedService(); services.AddHostedService(); }) diff --git a/src/InformaticsGateway/Services/Export/Hl7ExportService.cs b/src/InformaticsGateway/Services/Export/Hl7ExportService.cs index 4e6e7ba47..0c6abf91d 100755 --- a/src/InformaticsGateway/Services/Export/Hl7ExportService.cs +++ b/src/InformaticsGateway/Services/Export/Hl7ExportService.cs @@ -28,9 +28,9 @@ using Monai.Deploy.InformaticsGateway.Configuration; using Monai.Deploy.InformaticsGateway.Database.Api.Repositories; using Monai.Deploy.InformaticsGateway.Logging; -using Monai.Deploy.InformaticsGateway.Api.Mllp; using Monai.Deploy.Messaging.Common; using Polly; +using Monai.Deploy.InformaticsGateway.Api.Mllp; namespace Monai.Deploy.InformaticsGateway.Services.Export { @@ -38,27 +38,28 @@ internal class Hl7ExportService : ExportServiceBase { private readonly ILogger _logger; private readonly InformaticsGatewayConfiguration _configuration; - private readonly IMllpService _mllpService; protected override ushort Concurrency { get; } public override string RoutingKey { get; } public override string ServiceName => "DICOM Export HL7 Service"; + private readonly IMllpService _mllpService; public Hl7ExportService( ILogger logger, IServiceScopeFactory serviceScopeFactory, IOptions configuration, - IDicomToolkit dicomToolkit) + IDicomToolkit dicomToolkit, + IMllpService mllpService) : base(logger, configuration, serviceScopeFactory, dicomToolkit) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _configuration = configuration.Value ?? throw new ArgumentNullException(nameof(configuration)); - _mllpService = serviceScopeFactory.CreateScope().ServiceProvider.GetRequiredService(); RoutingKey = $"{configuration.Value.Messaging.Topics.ExportHL7}"; ExportCompleteTopic = $"{configuration.Value.Messaging.Topics.ExportHl7Complete}"; Concurrency = _configuration.Dicom.Scu.MaximumNumberOfAssociations; + _mllpService = mllpService ?? throw new ArgumentNullException(nameof(mllpService)); } @@ -159,6 +160,5 @@ protected override Task ExecuteOutputDataEngineCallbac { return Task.FromResult(exportDataRequest); } - } } diff --git a/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs b/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs index c8163d6dc..3cbc42c2e 100755 --- a/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs +++ b/src/InformaticsGateway/Services/HealthLevel7/MllpService.cs @@ -1,267 +1,28 @@ -/* - * Copyright 2022-2023 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.IO.Abstractions; -using System.Linq; +using System; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; -using Ardalis.GuardClauses; using HL7.Dotnetcore; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Monai.Deploy.InformaticsGateway.Api.PlugIns; -using Monai.Deploy.InformaticsGateway.Api.Rest; -using Monai.Deploy.InformaticsGateway.Api.Storage; -using Monai.Deploy.InformaticsGateway.Common; +using Monai.Deploy.InformaticsGateway.Api.Mllp; using Monai.Deploy.InformaticsGateway.Configuration; -using Monai.Deploy.InformaticsGateway.Database.Api.Repositories; using Monai.Deploy.InformaticsGateway.Logging; -using Monai.Deploy.InformaticsGateway.Services.Common; -using Monai.Deploy.InformaticsGateway.Services.Connectors; -using Monai.Deploy.InformaticsGateway.Services.HealthLevel7; -using Monai.Deploy.InformaticsGateway.Services.Storage; -using Monai.Deploy.Messaging.Events; -namespace Monai.Deploy.InformaticsGateway.Api.Mllp +namespace Monai.Deploy.InformaticsGateway.Services.HealthLevel7 { - internal sealed class MllpService : IMllpService, IHostedService, IDisposable, IMonaiService + internal class MllpService : IMllpService { - private const int SOCKET_OPERATION_CANCELLED = 125; - private bool _disposedValue; - private readonly ITcpListener _tcpListener; - private readonly IMllpClientFactory _mllpClientFactory; - private readonly IObjectUploadQueue _uploadQueue; - private readonly IPayloadAssembler _payloadAssembler; - private readonly IFileSystem _fileSystem; - private readonly ILoggerFactory _logginFactory; - private readonly ILogger _logger; - private readonly IOptions _configuration; - private readonly IStorageInfoProvider _storageInfoProvider; - private readonly ConcurrentDictionary _activeTasks; - private readonly IMllpExtract _mIIpExtract; - private readonly IInputHL7DataPlugInEngine _inputHL7DataPlugInEngine; - private readonly IHl7ApplicationConfigRepository _hl7ApplicationConfigRepository; - private DateTime _lastConfigRead = new(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc); - - public int ActiveConnections - { - get - { - return _activeTasks.Count; - } - } - - public ServiceStatus Status { get; set; } = ServiceStatus.Unknown; - - public string ServiceName => "HL7 Service"; - - public MllpService(IServiceScopeFactory serviceScopeFactory, IOptions configuration) - { - ArgumentNullException.ThrowIfNull(serviceScopeFactory, nameof(serviceScopeFactory)); - - _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); - - var serviceScope = serviceScopeFactory.CreateScope(); - _logginFactory = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(ILoggerFactory)); - _logger = _logginFactory.CreateLogger(); - var tcpListenerFactory = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(ITcpListenerFactory)); - _tcpListener = tcpListenerFactory.CreateTcpListener(System.Net.IPAddress.Any, _configuration.Value.Hl7.Port); - _mllpClientFactory = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IMllpClientFactory)); - _uploadQueue = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IObjectUploadQueue)); - _payloadAssembler = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IPayloadAssembler)); - _fileSystem = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IFileSystem)); - _storageInfoProvider = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IStorageInfoProvider)); - _mIIpExtract = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IMllpExtract)); - _activeTasks = new ConcurrentDictionary(); - _inputHL7DataPlugInEngine = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IInputHL7DataPlugInEngine)); - _hl7ApplicationConfigRepository = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IHl7ApplicationConfigRepository)); - } - - public Task StartAsync(CancellationToken cancellationToken) - { - var task = Task.Run(async () => - { - _tcpListener.Start(); - await BackgroundProcessing(cancellationToken).ConfigureAwait(true); - }, CancellationToken.None); - - Status = ServiceStatus.Running; - _logger.ServiceRunning(ServiceName); - _logger.Hl7ListeningOnPort(_configuration.Value.Hl7.Port); - - if (task.IsCompleted) - return task; - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) - { - _logger.ServiceStopping(ServiceName); - _tcpListener.Stop(); - Status = ServiceStatus.Stopped; - return Task.CompletedTask; - } - - private async Task BackgroundProcessing(CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested) - { - IMllpClient? mllpClient = null; - try - { - WaitUntilAvailable(_configuration.Value.Hl7.MaximumNumberOfConnections); - var client = await _tcpListener.AcceptTcpClientAsync(cancellationToken).ConfigureAwait(false); - _logger.ClientConnected(); - - if (!_storageInfoProvider.HasSpaceAvailableToStore) - { - _logger.Hl7DisconnectedDueToLowStorageSpace(_storageInfoProvider.AvailableFreeSpace); - client.Close(); - await Task.Delay(5000, cancellationToken).ConfigureAwait(false); - continue; - } - - mllpClient = _mllpClientFactory.CreateClient(client, _configuration.Value.Hl7, _logginFactory.CreateLogger()); - _ = mllpClient.Start(OnDisconnect, cancellationToken); - _activeTasks.TryAdd(mllpClient.ClientId, mllpClient); - } - catch (System.Net.Sockets.SocketException ex) - { - _logger.Hl7SocketException(ex.Message); - - if (mllpClient is not null) - { - mllpClient.Dispose(); - _activeTasks.Remove(mllpClient.ClientId, out _); - } - - if (ex.ErrorCode == SOCKET_OPERATION_CANCELLED) - { - break; - } - } - catch (Exception ex) - { - _logger.ServiceInvalidOrCancelled(ServiceName, ex); - } - } - Status = ServiceStatus.Cancelled; - _logger.ServiceCancelled(ServiceName); - } - - private async Task OnDisconnect(IMllpClient client, MllpClientResult result) - { - Guard.Against.Null(client, nameof(client)); - Guard.Against.Null(result, nameof(result)); - - await ConfigurePlugInEngine().ConfigureAwait(false); - - try - { - foreach (var message in result.Messages) - { - var newMessage = message; - var hl7Filemetadata = new Hl7FileStorageMetadata(client.ClientId.ToString(), DataService.HL7, client.ClientIp); - var configItem = await _mIIpExtract.GetConfigItem(message).ConfigureAwait(false); - if (configItem is not null) - { - await _inputHL7DataPlugInEngine.ExecutePlugInsAsync(message, hl7Filemetadata, configItem).ConfigureAwait(false); - newMessage = await _mIIpExtract.ExtractInfo(hl7Filemetadata, message, configItem).ConfigureAwait(false); - - _logger.HL7MessageAfterPluginProcessing(newMessage.HL7Message, hl7Filemetadata.CorrelationId); - } - _logger.Hl7MessageReceieved(newMessage.HL7Message); - await hl7Filemetadata.SetDataStream(newMessage.HL7Message, _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false); - var payloadId = await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Filemetadata, new DataOrigin { DataService = DataService.HL7, Source = client.ClientIp, Destination = FileStorageMetadata.IpAddress() }).ConfigureAwait(false); - hl7Filemetadata.PayloadId ??= payloadId.ToString(); - _uploadQueue.Queue(hl7Filemetadata); - } - } - catch (Exception ex) - { - _logger.ErrorHandlingHl7Results(ex); - } - finally - { - _activeTasks.Remove(client.ClientId, out _); - _logger.Hl7ClientRemoved(client.ClientId); - client.Dispose(); - } - } - private async Task ConfigurePlugInEngine() - { - var configs = await _hl7ApplicationConfigRepository.GetAllAsync().ConfigureAwait(false); - if (configs is not null && configs.Count is not 0 && configs.Max(c => c.LastModified) > _lastConfigRead) - { - var pluginAssemblies = new List(); - foreach (var config in configs.Where(p => p.PlugInAssemblies?.Count > 0)) - { - try - { - pluginAssemblies.AddRange(config.PlugInAssemblies.Where(p => string.IsNullOrWhiteSpace(p) is false && pluginAssemblies.Contains(p) is false)); - } - catch (Exception ex) - { - _logger.HL7PluginLoadingExceptions(ex); - } - } - if (pluginAssemblies.Count is not 0) - { - _inputHL7DataPlugInEngine.Configure(pluginAssemblies); - } - } - _lastConfigRead = DateTime.UtcNow; - } - - private void WaitUntilAvailable(int maximumNumberOfConnections) - { - var count = 0; - while (ActiveConnections >= maximumNumberOfConnections) - { - if (++count % 25 == 1) - { - _logger.MaxedOutHl7Connections(maximumNumberOfConnections); - } - Thread.Sleep(100); - } - } + private readonly ILogger _logger; + private readonly InformaticsGatewayConfiguration _configuration; - private void Dispose(bool disposing) + public MllpService(ILogger logger, IOptions configuration) { - if (!_disposedValue) - { - if (disposing) - { - foreach (var client in _activeTasks.Values) - { - client.Dispose(); - } - } - - _disposedValue = true; - } + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _configuration = configuration?.Value ?? throw new ArgumentNullException(nameof(configuration)); ; } public async Task SendMllp(IPAddress address, int port, string hl7Message, CancellationToken cancellationToken) @@ -312,7 +73,7 @@ private async Task WriteMessage(byte[] sendMessageByteBuffer, IPAddress address, private async Task EnsureAck(NetworkStream networkStream) { using var s_cts = new CancellationTokenSource(); - s_cts.CancelAfter(_configuration.Value.Hl7.ClientTimeoutMilliseconds); + s_cts.CancelAfter(_configuration.Hl7.ClientTimeoutMilliseconds); var buffer = new byte[2048]; // get the SentHl7Message @@ -344,12 +105,5 @@ private async Task EnsureAck(NetworkStream networkStream) } throw new Hl7SendException("ACK message contains no ACK!"); } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } } } diff --git a/src/InformaticsGateway/Services/HealthLevel7/MllpServiceHost.cs b/src/InformaticsGateway/Services/HealthLevel7/MllpServiceHost.cs new file mode 100755 index 000000000..4b90666c5 --- /dev/null +++ b/src/InformaticsGateway/Services/HealthLevel7/MllpServiceHost.cs @@ -0,0 +1,269 @@ +/* + * Copyright 2022-2023 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO.Abstractions; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Ardalis.GuardClauses; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.InformaticsGateway.Api.PlugIns; +using Monai.Deploy.InformaticsGateway.Api.Rest; +using Monai.Deploy.InformaticsGateway.Api.Storage; +using Monai.Deploy.InformaticsGateway.Common; +using Monai.Deploy.InformaticsGateway.Configuration; +using Monai.Deploy.InformaticsGateway.Database.Api.Repositories; +using Monai.Deploy.InformaticsGateway.Logging; +using Monai.Deploy.InformaticsGateway.Services.Common; +using Monai.Deploy.InformaticsGateway.Services.Connectors; +using Monai.Deploy.InformaticsGateway.Services.Storage; +using Monai.Deploy.Messaging.Events; + +namespace Monai.Deploy.InformaticsGateway.Api.Mllp +{ + internal sealed class MllpServiceHost : IHostedService, IDisposable, IMonaiService + { + private const int SOCKET_OPERATION_CANCELLED = 125; + private bool _disposedValue; + private readonly ITcpListener _tcpListener; + private readonly IMllpClientFactory _mllpClientFactory; + private readonly IObjectUploadQueue _uploadQueue; + private readonly IPayloadAssembler _payloadAssembler; + private readonly IFileSystem _fileSystem; + private readonly ILoggerFactory _logginFactory; + private readonly ILogger _logger; + private readonly IOptions _configuration; + private readonly IStorageInfoProvider _storageInfoProvider; + private readonly ConcurrentDictionary _activeTasks; + private readonly IMllpExtract _mIIpExtract; + private readonly IInputHL7DataPlugInEngine _inputHL7DataPlugInEngine; + private readonly IHl7ApplicationConfigRepository _hl7ApplicationConfigRepository; + private DateTime _lastConfigRead = new(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc); + + public int ActiveConnections + { + get + { + return _activeTasks.Count; + } + } + + public ServiceStatus Status { get; set; } = ServiceStatus.Unknown; + + public string ServiceName => "HL7 Service"; + + public MllpServiceHost(IServiceScopeFactory serviceScopeFactory, IOptions configuration) + { + ArgumentNullException.ThrowIfNull(serviceScopeFactory, nameof(serviceScopeFactory)); + + _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); + + var serviceScope = serviceScopeFactory.CreateScope(); + _logginFactory = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(ILoggerFactory)); + _logger = _logginFactory.CreateLogger(); + var tcpListenerFactory = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(ITcpListenerFactory)); + _tcpListener = tcpListenerFactory.CreateTcpListener(System.Net.IPAddress.Any, _configuration.Value.Hl7.Port); + _mllpClientFactory = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IMllpClientFactory)); + _uploadQueue = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IObjectUploadQueue)); + _payloadAssembler = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IPayloadAssembler)); + _fileSystem = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IFileSystem)); + _storageInfoProvider = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IStorageInfoProvider)); + _mIIpExtract = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IMllpExtract)); + _activeTasks = new ConcurrentDictionary(); + _inputHL7DataPlugInEngine = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IInputHL7DataPlugInEngine)); + _hl7ApplicationConfigRepository = serviceScope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IHl7ApplicationConfigRepository)); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + var task = Task.Run(async () => + { + _tcpListener.Start(); + await BackgroundProcessing(cancellationToken).ConfigureAwait(true); + }, CancellationToken.None); + + Status = ServiceStatus.Running; + _logger.ServiceRunning(ServiceName); + _logger.Hl7ListeningOnPort(_configuration.Value.Hl7.Port); + + if (task.IsCompleted) + return task; + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _logger.ServiceStopping(ServiceName); + _tcpListener.Stop(); + Status = ServiceStatus.Stopped; + return Task.CompletedTask; + } + + private async Task BackgroundProcessing(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + IMllpClient? mllpClient = null; + try + { + WaitUntilAvailable(_configuration.Value.Hl7.MaximumNumberOfConnections); + var client = await _tcpListener.AcceptTcpClientAsync(cancellationToken).ConfigureAwait(false); + _logger.ClientConnected(); + + if (!_storageInfoProvider.HasSpaceAvailableToStore) + { + _logger.Hl7DisconnectedDueToLowStorageSpace(_storageInfoProvider.AvailableFreeSpace); + client.Close(); + await Task.Delay(5000, cancellationToken).ConfigureAwait(false); + continue; + } + + mllpClient = _mllpClientFactory.CreateClient(client, _configuration.Value.Hl7, _logginFactory.CreateLogger()); + _ = mllpClient.Start(OnDisconnect, cancellationToken); + _activeTasks.TryAdd(mllpClient.ClientId, mllpClient); + } + catch (System.Net.Sockets.SocketException ex) + { + _logger.Hl7SocketException(ex.Message); + + if (mllpClient is not null) + { + mllpClient.Dispose(); + _activeTasks.Remove(mllpClient.ClientId, out _); + } + + if (ex.ErrorCode == SOCKET_OPERATION_CANCELLED) + { + break; + } + } + catch (Exception ex) + { + _logger.ServiceInvalidOrCancelled(ServiceName, ex); + } + } + Status = ServiceStatus.Cancelled; + _logger.ServiceCancelled(ServiceName); + } + + private async Task OnDisconnect(IMllpClient client, MllpClientResult result) + { + Guard.Against.Null(client, nameof(client)); + Guard.Against.Null(result, nameof(result)); + + await ConfigurePlugInEngine().ConfigureAwait(false); + + try + { + foreach (var message in result.Messages) + { + var newMessage = message; + var hl7Filemetadata = new Hl7FileStorageMetadata(client.ClientId.ToString(), DataService.HL7, client.ClientIp); + var configItem = await _mIIpExtract.GetConfigItem(message).ConfigureAwait(false); + if (configItem is not null) + { + await _inputHL7DataPlugInEngine.ExecutePlugInsAsync(message, hl7Filemetadata, configItem).ConfigureAwait(false); + newMessage = await _mIIpExtract.ExtractInfo(hl7Filemetadata, message, configItem).ConfigureAwait(false); + + _logger.HL7MessageAfterPluginProcessing(newMessage.HL7Message, hl7Filemetadata.CorrelationId); + } + _logger.Hl7MessageReceieved(newMessage.HL7Message); + await hl7Filemetadata.SetDataStream(newMessage.HL7Message, _configuration.Value.Storage.TemporaryDataStorage, _fileSystem, _configuration.Value.Storage.LocalTemporaryStoragePath).ConfigureAwait(false); + var payloadId = await _payloadAssembler.Queue(client.ClientId.ToString(), hl7Filemetadata, new DataOrigin { DataService = DataService.HL7, Source = client.ClientIp, Destination = FileStorageMetadata.IpAddress() }).ConfigureAwait(false); + hl7Filemetadata.PayloadId ??= payloadId.ToString(); + _uploadQueue.Queue(hl7Filemetadata); + } + } + catch (Exception ex) + { + _logger.ErrorHandlingHl7Results(ex); + } + finally + { + _activeTasks.Remove(client.ClientId, out _); + _logger.Hl7ClientRemoved(client.ClientId); + client.Dispose(); + } + } + + private async Task ConfigurePlugInEngine() + { + var configs = await _hl7ApplicationConfigRepository.GetAllAsync().ConfigureAwait(false); + if (configs is not null && configs.Count is not 0 && configs.Max(c => c.LastModified) > _lastConfigRead) + { + var pluginAssemblies = new List(); + foreach (var config in configs.Where(p => p.PlugInAssemblies?.Count > 0)) + { + try + { + pluginAssemblies.AddRange(config.PlugInAssemblies.Where(p => string.IsNullOrWhiteSpace(p) is false && pluginAssemblies.Contains(p) is false)); + } + catch (Exception ex) + { + _logger.HL7PluginLoadingExceptions(ex); + } + } + if (pluginAssemblies.Count is not 0) + { + _inputHL7DataPlugInEngine.Configure(pluginAssemblies); + } + } + _lastConfigRead = DateTime.UtcNow; + } + + private void WaitUntilAvailable(int maximumNumberOfConnections) + { + var count = 0; + while (ActiveConnections >= maximumNumberOfConnections) + { + if (++count % 25 == 1) + { + _logger.MaxedOutHl7Connections(maximumNumberOfConnections); + } + Thread.Sleep(100); + } + } + + private void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + foreach (var client in _activeTasks.Values) + { + client.Dispose(); + } + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/src/InformaticsGateway/Test/Repositories/MonaiServiceLocatorTest.cs b/src/InformaticsGateway/Test/Repositories/MonaiServiceLocatorTest.cs index 24595d0c6..0202492e3 100755 --- a/src/InformaticsGateway/Test/Repositories/MonaiServiceLocatorTest.cs +++ b/src/InformaticsGateway/Test/Repositories/MonaiServiceLocatorTest.cs @@ -62,9 +62,7 @@ public void GetMonaiServices() var result = serviceLocator.GetMonaiServices(); Assert.Collection(result, - items => items.ServiceName.Equals("IMllpService"), items => items.ServiceName.Equals("IPayloadService")); - } [Fact(DisplayName = "GetServiceStatus")] @@ -73,7 +71,7 @@ public void GetServiceStatus() var serviceLocator = new MonaiServiceLocator(_serviceProvider.Object); var result = serviceLocator.GetServiceStatus(); - Assert.Equal(2, result.Count); + Assert.Equal(1, result.Count); foreach (var svc in result.Keys) { Assert.Equal(ServiceStatus.Running, result[svc]); diff --git a/src/InformaticsGateway/Test/Services/Export/ExportHl7ServiceTests.cs b/src/InformaticsGateway/Test/Services/Export/ExportHl7ServiceTests.cs index 0897a68a2..27677cce7 100755 --- a/src/InformaticsGateway/Test/Services/Export/ExportHl7ServiceTests.cs +++ b/src/InformaticsGateway/Test/Services/Export/ExportHl7ServiceTests.cs @@ -97,10 +97,11 @@ public ExportHl7ServiceTests() [RetryFact(1, 250, DisplayName = "Constructor - throws on null params")] public void Constructor_ThrowsOnNullParams() { - Assert.Throws(() => new Hl7ExportService(null, null, null, null)); - Assert.Throws(() => new Hl7ExportService(_logger.Object, null, null, null)); - Assert.Throws(() => new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, null, null)); - Assert.Throws(() => new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, null)); + Assert.Throws(() => new Hl7ExportService(null, null, null, null, null)); + Assert.Throws(() => new Hl7ExportService(_logger.Object, null, null, null, null)); + Assert.Throws(() => new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, null, null, null)); + Assert.Throws(() => new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, null, null)); + Assert.Throws(() => new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object, null)); } @@ -123,7 +124,7 @@ public async Task ShallFailWhenNoDestinationIsDefined() _storageService.Setup(p => p.GetObjectAsync(It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(new MemoryStream(Encoding.UTF8.GetBytes("test"))); - var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object); + var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object, _mllpService.Object); var dataflowCompleted = new ManualResetEvent(false); service.ReportActionCompleted += (sender, args) => @@ -168,7 +169,7 @@ public async Task ShallFailWhenDestinationIsNotConfigured() _repository.Setup(p => p.FindByNameAsync(It.IsAny(), It.IsAny())).ReturnsAsync(default(HL7DestinationEntity)); - var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object); + var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object, _mllpService.Object); var dataflowCompleted = new ManualResetEvent(false); service.ReportActionCompleted += (sender, args) => @@ -221,7 +222,7 @@ public async Task No_Ack_Sent() _repository.Setup(p => p.FindByNameAsync(It.IsAny(), It.IsAny())).ReturnsAsync(destination); _dicomToolkit.Setup(p => p.Load(It.IsAny())).Returns(InstanceGenerator.GenerateDicomFile(sopInstanceUid: sopInstanceUid)); - var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object); + var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object, _mllpService.Object); var dataflowCompleted = new ManualResetEvent(false); service.ReportActionCompleted += (sender, args) => @@ -258,7 +259,7 @@ public async Task Error_Loading_HL7_Content() _extAppScpLogger.Invocations.Clear(); var sopInstanceUid = DicomUIDGenerator.GenerateDerivedFromUUID().UID; var destination = new HL7DestinationEntity { HostIp = "192.168.0.0", Port = _port }; - var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object); + var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object, _mllpService.Object); _messagePublisherService.Setup(p => p.Publish(It.IsAny(), It.IsAny())); _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); @@ -310,7 +311,7 @@ public async Task Success_After_Hl7Send() _extAppScpLogger.Invocations.Clear(); var sopInstanceUid = DicomUIDGenerator.GenerateDerivedFromUUID().UID; var destination = new HL7DestinationEntity { HostIp = "192.168.0.0", Port = _port }; - var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object); + var service = new Hl7ExportService(_logger.Object, _serviceScopeFactory.Object, _configuration, _dicomToolkit.Object, _mllpService.Object); _messagePublisherService.Setup(p => p.Publish(It.IsAny(), It.IsAny())); _messageSubscriberService.Setup(p => p.Acknowledge(It.IsAny())); diff --git a/src/InformaticsGateway/Test/Services/HealthLevel7/MllpServiceTest.cs b/src/InformaticsGateway/Test/Services/HealthLevel7/MllpServiceTest.cs index 371e7ab69..363161662 100755 --- a/src/InformaticsGateway/Test/Services/HealthLevel7/MllpServiceTest.cs +++ b/src/InformaticsGateway/Test/Services/HealthLevel7/MllpServiceTest.cs @@ -56,7 +56,7 @@ public class MllpServiceTest private readonly Mock _fileSystem; private readonly CancellationTokenSource _cancellationTokenSource; private readonly Mock _serviceScope; - private readonly Mock> _logger; + private readonly Mock> _logger; private readonly IServiceProvider _serviceProvider; private readonly Mock _storageInfoProvider; private readonly Mock _mIIpExtract = new Mock(); @@ -79,7 +79,7 @@ public MllpServiceTest() _cancellationTokenSource = new CancellationTokenSource(); _serviceScope = new Mock(); - _logger = new Mock>(); + _logger = new Mock>(); _serviceScopeFactory.Setup(p => p.CreateScope()).Returns(_serviceScope.Object); @@ -109,16 +109,16 @@ public MllpServiceTest() [RetryFact(10, 250)] public void GivenAMllpService_WhenInitialized_ExpectParametersToBeValidated() { - Assert.Throws(() => new MllpService(null, null)); - Assert.Throws(() => new MllpService(_serviceScopeFactory.Object, null)); + Assert.Throws(() => new MllpServiceHost(null, null)); + Assert.Throws(() => new MllpServiceHost(_serviceScopeFactory.Object, null)); - new MllpService(_serviceScopeFactory.Object, _options); + new MllpServiceHost(_serviceScopeFactory.Object, _options); } [RetryFact(5, 250)] public void GivenAMllpService_WhenStartAsyncIsCalled_ExpectServiceStartupNormally() { - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); var task = service.StartAsync(_cancellationTokenSource.Token); Assert.NotNull(task); @@ -129,7 +129,7 @@ public void GivenAMllpService_WhenStartAsyncIsCalled_ExpectServiceStartupNormall public void GivenAMllpService_WhenStopAsyncIsCalled_ExpectServiceStopsNormally() { _tcpListener.Setup(p => p.Stop()); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); var task = service.StopAsync(_cancellationTokenSource.Token); Assert.NotNull(task); @@ -172,7 +172,7 @@ public async Task GivenTcpConnections_WhenConnectsAndDisconnectsFromMllpService_ } }); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); _ = service.StartAsync(_cancellationTokenSource.Token); Assert.True(checkEvent.Wait(3000)); @@ -208,7 +208,7 @@ public async Task GivenAMllpService_WhenMaximumConnectionLimitIsConfigure_Expect _tcpListener.Setup(p => p.AcceptTcpClientAsync(It.IsAny())) .Returns(ValueTask.FromResult((new Mock()).Object)); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); _ = service.StartAsync(_cancellationTokenSource.Token); checkEvent.Wait(); @@ -242,7 +242,7 @@ public async Task GivenConnectedTcpClients_WhenDisconnects_ExpectServiceToDispos _tcpListener.Setup(p => p.AcceptTcpClientAsync(It.IsAny())) .Returns(ValueTask.FromResult((new Mock()).Object)); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); _ = service.StartAsync(_cancellationTokenSource.Token); Assert.True(checkEvent.Wait(3000)); @@ -267,7 +267,7 @@ public async Task GivenATcpClientWithHl7Messages_WhenStorageSpaceIsLow_ExpectToD _tcpListener.Setup(p => p.AcceptTcpClientAsync(It.IsAny())) .Returns(ValueTask.FromResult(clientAdapter.Object)); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); _ = service.StartAsync(_cancellationTokenSource.Token); _cancellationTokenSource.CancelAfter(400); @@ -311,7 +311,7 @@ public async Task GivenATcpClientWithHl7Messages_WhenDisconnected_ExpectMessageT _tcpListener.Setup(p => p.AcceptTcpClientAsync(It.IsAny())) .Returns(ValueTask.FromResult((new Mock()).Object)); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); _ = service.StartAsync(_cancellationTokenSource.Token); Assert.True(checkEvent.Wait(3000)); @@ -358,7 +358,7 @@ public async Task GivenATcpClientWithHl7Messages_WhenDisconnected_ExpectMessageT _tcpListener.Setup(p => p.AcceptTcpClientAsync(It.IsAny())) .Returns(ValueTask.FromResult((new Mock()).Object)); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); _ = service.StartAsync(_cancellationTokenSource.Token); Assert.True(checkEvent.Wait(3000)); @@ -403,7 +403,7 @@ public async Task GivenATcpClientWithHl7Messages_ShouldntAdddBlankPlugin() _tcpListener.Setup(p => p.AcceptTcpClientAsync(It.IsAny())) .Returns(ValueTask.FromResult((new Mock()).Object)); - var service = new MllpService(_serviceScopeFactory.Object, _options); + var service = new MllpServiceHost(_serviceScopeFactory.Object, _options); _ = service.StartAsync(_cancellationTokenSource.Token); Assert.True(checkEvent.Wait(3000));