From bc06b2a9c0470e7898a12e4f1377514aea9903ad Mon Sep 17 00:00:00 2001 From: Dingping Zhang Date: Fri, 3 Sep 2021 02:24:01 +0800 Subject: [PATCH] refactor!(core): redesign all api. (#12) --- HandyIpc.Core/ClientBuilder.cs | 14 -- HandyIpc.Core/Configuration.cs | 46 ------ .../{Client.cs => ContainerClient.cs} | 9 +- HandyIpc.Core/ContainerClientBuilder.cs | 39 +++++ HandyIpc.Core/ContainerClientExtensions.cs | 7 + HandyIpc.Core/ContainerRegistryExtensions.cs | 36 +++++ HandyIpc.Core/ContainerServer.cs | 108 +++++++++++++ ...erBuilder.cs => ContainerServerBuilder.cs} | 51 ++++-- HandyIpc.Core/Core/ReceiverBase.cs | 14 -- HandyIpc.Core/Core/SenderBase.cs | 15 -- HandyIpc.Core/Extensions.cs | 33 +--- HandyIpc.Core/HandyIpcBuilder.cs | 9 -- HandyIpc.Core/IClient.cs | 18 +-- HandyIpc.Core/IClientBuilder.cs | 7 - HandyIpc.Core/IClientConfiguration.cs | 16 ++ HandyIpc.Core/IConfiguration.cs | 26 +-- HandyIpc.Core/IConnection.cs | 17 ++ HandyIpc.Core/IContainerClient.cs | 19 +++ ...erverRegistry.cs => IContainerRegistry.cs} | 6 +- HandyIpc.Core/IContainerServer.cs | 16 ++ HandyIpc.Core/IServer.cs | 12 +- HandyIpc.Core/IServerBuilder.cs | 7 - HandyIpc.Core/IServerConfiguration.cs | 16 ++ HandyIpc.Core/Implementation/AsyncPool.cs | 12 +- HandyIpc.Core/Implementation/Pool.cs | 12 +- HandyIpc.Core/Implementation/PoolBase.cs | 39 +++++ HandyIpc.Core/Implementation/RentedValue.cs | 2 +- .../Sender.cs | 70 ++++---- HandyIpc.Core/Server.cs | 65 -------- HandyIpc.Core/StreamConnection.cs | 35 ++++ HandyIpc.Core/StreamExtensions.cs | 99 ++++++++++++ HandyIpc.Generator/ClientProxy.cs | 5 +- HandyIpc.NamedPipe/Extensions.cs | 106 +----------- HandyIpc.NamedPipe/NamedPipeIpcClient.cs | 29 ++++ HandyIpc.NamedPipe/NamedPipeIpcServer.cs | 30 ++++ HandyIpc.NamedPipe/NamedPipeReceiver.cs | 75 --------- HandyIpc.Serializer.Json/Extensions.cs | 2 +- HandyIpc.Socket/Extensions.cs | 115 +------------ HandyIpc.Socket/TcpIpcClient.cs | 32 ++++ HandyIpc.Socket/TcpIpcServer.cs | 25 +++ HandyIpc.Socket/TcpReceiver.cs | 88 ---------- HandyIpc.Socket/TcpSender.cs | 153 ------------------ HandyIpc.Tests/EndToEndTestFixture.cs | 23 ++- 43 files changed, 694 insertions(+), 864 deletions(-) delete mode 100644 HandyIpc.Core/ClientBuilder.cs delete mode 100644 HandyIpc.Core/Configuration.cs rename HandyIpc.Core/{Client.cs => ContainerClient.cs} (75%) create mode 100644 HandyIpc.Core/ContainerClientBuilder.cs create mode 100644 HandyIpc.Core/ContainerClientExtensions.cs create mode 100644 HandyIpc.Core/ContainerRegistryExtensions.cs create mode 100644 HandyIpc.Core/ContainerServer.cs rename HandyIpc.Core/{ServerBuilder.cs => ContainerServerBuilder.cs} (63%) delete mode 100644 HandyIpc.Core/Core/ReceiverBase.cs delete mode 100644 HandyIpc.Core/Core/SenderBase.cs delete mode 100644 HandyIpc.Core/HandyIpcBuilder.cs delete mode 100644 HandyIpc.Core/IClientBuilder.cs create mode 100644 HandyIpc.Core/IClientConfiguration.cs create mode 100644 HandyIpc.Core/IConnection.cs create mode 100644 HandyIpc.Core/IContainerClient.cs rename HandyIpc.Core/{IServerRegistry.cs => IContainerRegistry.cs} (82%) create mode 100644 HandyIpc.Core/IContainerServer.cs delete mode 100644 HandyIpc.Core/IServerBuilder.cs create mode 100644 HandyIpc.Core/IServerConfiguration.cs create mode 100644 HandyIpc.Core/Implementation/PoolBase.cs rename HandyIpc.NamedPipe/NamedPipeSender.cs => HandyIpc.Core/Sender.cs (57%) delete mode 100644 HandyIpc.Core/Server.cs create mode 100644 HandyIpc.Core/StreamConnection.cs create mode 100644 HandyIpc.Core/StreamExtensions.cs create mode 100644 HandyIpc.NamedPipe/NamedPipeIpcClient.cs create mode 100644 HandyIpc.NamedPipe/NamedPipeIpcServer.cs delete mode 100644 HandyIpc.NamedPipe/NamedPipeReceiver.cs create mode 100644 HandyIpc.Socket/TcpIpcClient.cs create mode 100644 HandyIpc.Socket/TcpIpcServer.cs delete mode 100644 HandyIpc.Socket/TcpReceiver.cs delete mode 100644 HandyIpc.Socket/TcpSender.cs diff --git a/HandyIpc.Core/ClientBuilder.cs b/HandyIpc.Core/ClientBuilder.cs deleted file mode 100644 index 7312cb8..0000000 --- a/HandyIpc.Core/ClientBuilder.cs +++ /dev/null @@ -1,14 +0,0 @@ -using HandyIpc.Core; - -namespace HandyIpc -{ - internal class ClientBuilder : Configuration, IClientBuilder - { - public IClient Build() - { - SenderBase sender = SenderFactory(); - sender.SetLogger(LoggerFactory()); - return new Client(sender, SerializerFactory()); - } - } -} diff --git a/HandyIpc.Core/Configuration.cs b/HandyIpc.Core/Configuration.cs deleted file mode 100644 index 36f4bc0..0000000 --- a/HandyIpc.Core/Configuration.cs +++ /dev/null @@ -1,46 +0,0 @@ -using System; -using HandyIpc.Core; - -namespace HandyIpc -{ - internal abstract class Configuration : IConfiguration - { - protected Func ReceiverFactory { get; private set; } = () => throw new InvalidOperationException( - $"Must invoke the IHubBuilder.Use(Func<{nameof(ReceiverBase)}> factory) method " + - "to register a factory before invoking the Build method."); - - protected Func SenderFactory { get; private set; } = () => throw new InvalidOperationException( - $"Must invoke the IHubBuilder.Use(Func<{nameof(SenderBase)}> factory) method " + - "to register a factory before invoking the Build method."); - - protected Func SerializerFactory { get; private set; } = () => throw new InvalidOperationException( - $"Must invoke the IHubBuilder.Use(Func<{nameof(ISerializer)}> factory) method " + - "to register a factory before invoking the Build method."); - - protected Func LoggerFactory { get; private set; } = () => new DebugLogger(); - - public IConfiguration Use(Func factory) - { - SenderFactory = factory; - return this; - } - - public IConfiguration Use(Func factory) - { - ReceiverFactory = factory; - return this; - } - - public IConfiguration Use(Func factory) - { - SerializerFactory = factory; - return this; - } - - public IConfiguration Use(Func factory) - { - LoggerFactory = factory; - return this; - } - } -} diff --git a/HandyIpc.Core/Client.cs b/HandyIpc.Core/ContainerClient.cs similarity index 75% rename from HandyIpc.Core/Client.cs rename to HandyIpc.Core/ContainerClient.cs index 4d80ee9..a680ad7 100644 --- a/HandyIpc.Core/Client.cs +++ b/HandyIpc.Core/ContainerClient.cs @@ -4,15 +4,15 @@ namespace HandyIpc { - internal sealed class Client : IClient + internal sealed class ContainerClient : IContainerClient { - private readonly SenderBase _sender; + private readonly Sender _sender; private readonly ISerializer _serializer; private readonly ConcurrentDictionary _typeInstanceMapping = new(); private bool _isDisposed; - public Client(SenderBase sender, ISerializer serializer) + public ContainerClient(Sender sender, ISerializer serializer) { _sender = sender; _serializer = serializer; @@ -22,7 +22,7 @@ public T Resolve(string key) { if (_isDisposed) { - throw new ObjectDisposedException(nameof(IClient)); + throw new ObjectDisposedException(nameof(IContainerClient)); } return (T)_typeInstanceMapping.GetOrAdd(typeof(T), interfaceType => @@ -37,6 +37,7 @@ public void Dispose() _isDisposed = true; _typeInstanceMapping.Clear(); + _sender.Dispose(); } } } diff --git a/HandyIpc.Core/ContainerClientBuilder.cs b/HandyIpc.Core/ContainerClientBuilder.cs new file mode 100644 index 0000000..b8a6115 --- /dev/null +++ b/HandyIpc.Core/ContainerClientBuilder.cs @@ -0,0 +1,39 @@ +using System; +using HandyIpc.Core; + +namespace HandyIpc +{ + public class ContainerClientBuilder : IClientConfiguration + { + private Func _clientFactory = () => throw new InvalidOperationException( + $"Must invoke the {nameof(IServerConfiguration)}.Use(Func<{nameof(IClient)}> factory) method " + + "to register a factory before invoking the Build method."); + private Func _serializerFactory = () => throw new InvalidOperationException( + $"Must invoke the {nameof(IServerConfiguration)}.Use(Func<{nameof(ISerializer)}> factory) method " + + "to register a factory before invoking the Build method."); + private Func _loggerFactory = () => new DebugLogger(); + + public IClientConfiguration Use(Func factory) + { + _serializerFactory = factory; + return this; + } + + public IClientConfiguration Use(Func factory) + { + _loggerFactory = factory; + return this; + } + + public IClientConfiguration Use(Func factory) + { + _clientFactory = factory; + return this; + } + + public IContainerClient Build() + { + return new ContainerClient(new Sender(_clientFactory()), _serializerFactory()); + } + } +} diff --git a/HandyIpc.Core/ContainerClientExtensions.cs b/HandyIpc.Core/ContainerClientExtensions.cs new file mode 100644 index 0000000..871f944 --- /dev/null +++ b/HandyIpc.Core/ContainerClientExtensions.cs @@ -0,0 +1,7 @@ +namespace HandyIpc +{ + public static class ContainerClientExtensions + { + public static T Resolve(this IContainerClient client) => client.Resolve(typeof(T).GetDefaultKey()); + } +} diff --git a/HandyIpc.Core/ContainerRegistryExtensions.cs b/HandyIpc.Core/ContainerRegistryExtensions.cs new file mode 100644 index 0000000..5f88c67 --- /dev/null +++ b/HandyIpc.Core/ContainerRegistryExtensions.cs @@ -0,0 +1,36 @@ +using System; + +namespace HandyIpc +{ + public static class ContainerRegistryExtensions + { + public static IContainerRegistry Register(this IContainerRegistry registry, string? key = null) + where TInterface : class + where TImpl : TInterface, new() + { + return registry.Register(() => new TImpl(), key); + } + + public static IContainerRegistry Register(this IContainerRegistry registry, Func factory, string? key = null) + where TInterface : class + { + key ??= typeof(TInterface).GetDefaultKey(); + return registry.Register(typeof(TInterface), factory, key); + } + + public static IContainerRegistry Register(this IContainerRegistry registry, Type interfaceType, Type classType, string? key = null) + { + key ??= interfaceType.GetDefaultKey(); + return classType.ContainsGenericParameters + ? registry.Register(interfaceType, GenericFactory, key) + : registry.Register(interfaceType, () => Activator.CreateInstance(classType), key); + + // Local Method + object GenericFactory(Type[] genericTypes) + { + var constructedClassType = classType.MakeGenericType(genericTypes); + return Activator.CreateInstance(constructedClassType); + } + } + } +} diff --git a/HandyIpc.Core/ContainerServer.cs b/HandyIpc.Core/ContainerServer.cs new file mode 100644 index 0000000..a89b786 --- /dev/null +++ b/HandyIpc.Core/ContainerServer.cs @@ -0,0 +1,108 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using HandyIpc.Core; + +namespace HandyIpc +{ + internal sealed class ContainerServer : IContainerServer + { + private readonly IServer _server; + private readonly Middleware _middleware; + private readonly ISerializer _serializer; + private readonly ILogger _logger; + + private CancellationTokenSource? _cancellationTokenSource; + + public bool IsRunning { get; private set; } + + public ContainerServer(IServer server, Middleware middleware, ISerializer serializer, ILogger logger) + { + _server = server; + _middleware = middleware; + _serializer = serializer; + _logger = logger; + } + + public void Start() + { + if (_cancellationTokenSource is null or { IsCancellationRequested: true }) + { + _cancellationTokenSource = new CancellationTokenSource(); + } + +#pragma warning disable 4014 + // Async run the server without waiting. + StartAsync(_cancellationTokenSource.Token); +#pragma warning restore 4014 + + IsRunning = true; + } + + public void Stop() + { + _cancellationTokenSource?.Cancel(); + IsRunning = false; + } + + public void Dispose() + { + Stop(); + _server.Dispose(); + } + + private async Task StartAsync(CancellationToken token) + { + while (!token.IsCancellationRequested) + { + IConnection connection = await _server.WaitForConnectionAsync(); + RequestHandler handler = _middleware.ToHandler(_serializer, _logger); + + if (token.IsCancellationRequested) + { + break; + } + + // Do not await the request handler, and go to await next stream connection directly. +#pragma warning disable 4014 + HandleRequestAsync(connection, handler, token); +#pragma warning restore 4014 + } + } + + private async Task HandleRequestAsync(IConnection connection, RequestHandler handler, CancellationToken token) + { + try + { + while (true) + { + if (token.IsCancellationRequested) + { + break; + } + + byte[] buffer = await connection.ReadAsync(token); + if (buffer.Length == 0) + { + continue; + } + + byte[] output = await handler(buffer); + await connection.WriteAsync(output, token); + } + } + catch (OperationCanceledException) + { + // Ignore + } + catch (Exception e) + { + _logger.Error("Unexpected exception occurred when starting the server instance.", e); + } + finally + { + connection.Dispose(); + } + } + } +} diff --git a/HandyIpc.Core/ServerBuilder.cs b/HandyIpc.Core/ContainerServerBuilder.cs similarity index 63% rename from HandyIpc.Core/ServerBuilder.cs rename to HandyIpc.Core/ContainerServerBuilder.cs index 56cbba4..03c1faf 100644 --- a/HandyIpc.Core/ServerBuilder.cs +++ b/HandyIpc.Core/ContainerServerBuilder.cs @@ -4,24 +4,50 @@ namespace HandyIpc { - internal class ServerBuilder : Configuration, IServerBuilder + public class ContainerServerBuilder : IServerConfiguration, IContainerRegistry { private readonly List<(string key, Type type, Func factory)> _interfaceMap = new(); private readonly List<(string key, Type type, Func factory)> _genericInterfaceMap = new(); - public IServerRegistry Register(Type interfaceType, Func factory, string key) + private Func _serverFactory = () => throw new InvalidOperationException( + $"Must invoke the {nameof(IServerConfiguration)}.Use(Func<{nameof(IServer)}> factory) method " + + "to register a factory before invoking the Build method."); + private Func _serializerFactory = () => throw new InvalidOperationException( + $"Must invoke the {nameof(IServerConfiguration)}.Use(Func<{nameof(ISerializer)}> factory) method " + + "to register a factory before invoking the Build method."); + private Func _loggerFactory = () => new DebugLogger(); + + public IServerConfiguration Use(Func factory) + { + _serializerFactory = factory; + return this; + } + + public IServerConfiguration Use(Func factory) + { + _loggerFactory = factory; + return this; + } + + public IServerConfiguration Use(Func factory) + { + _serverFactory = factory; + return this; + } + + public IContainerRegistry Register(Type interfaceType, Func factory, string key) { _interfaceMap.Add((key, interfaceType, factory)); return this; } - public IServerRegistry Register(Type interfaceType, Func factory, string key) + public IContainerRegistry Register(Type interfaceType, Func factory, string key) { _genericInterfaceMap.Add((key, interfaceType, factory)); return this; } - public IServer Build() + public IContainerServer Build() { Dictionary map = new(); foreach (var (key, type, factory) in _interfaceMap) @@ -39,20 +65,13 @@ public IServer Build() map.Add(key, methodDispatcher); } - Middleware middleware = BuildBasicMiddleware().Then(Middlewares.GetInterfaceMiddleware(map)); - - ReceiverBase receiver = ReceiverFactory(); - ILogger logger = LoggerFactory(); - receiver.SetLogger(logger); - return new Server(receiver, middleware, SerializerFactory(), logger); - } - - private static Middleware BuildBasicMiddleware() - { - return Middlewares.Compose( + Middleware middleware = Middlewares.Compose( Middlewares.Heartbeat, Middlewares.ExceptionHandler, - Middlewares.RequestParser); + Middlewares.RequestParser, + Middlewares.GetInterfaceMiddleware(map)); + + return new ContainerServer(_serverFactory(), middleware, _serializerFactory(), _loggerFactory()); } private static IMethodDispatcher CreateDispatcher(Type interfaceType, Func factory) diff --git a/HandyIpc.Core/Core/ReceiverBase.cs b/HandyIpc.Core/Core/ReceiverBase.cs deleted file mode 100644 index 0c421de..0000000 --- a/HandyIpc.Core/Core/ReceiverBase.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; - -namespace HandyIpc.Core -{ - public abstract class ReceiverBase - { - protected ILogger Logger { get; private set; } = null!; - - internal void SetLogger(ILogger logger) => Logger = logger; - - public abstract Task StartAsync(RequestHandler handler, CancellationToken token); - } -} diff --git a/HandyIpc.Core/Core/SenderBase.cs b/HandyIpc.Core/Core/SenderBase.cs deleted file mode 100644 index 21dac25..0000000 --- a/HandyIpc.Core/Core/SenderBase.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System.Threading.Tasks; - -namespace HandyIpc.Core -{ - public abstract class SenderBase - { - protected ILogger Logger { get; private set; } = null!; - - internal void SetLogger(ILogger logger) => Logger = logger; - - public abstract byte[] Invoke(byte[] requestBytes); - - public abstract Task InvokeAsync(byte[] requestBytes); - } -} diff --git a/HandyIpc.Core/Extensions.cs b/HandyIpc.Core/Extensions.cs index 11716d9..15581d4 100644 --- a/HandyIpc.Core/Extensions.cs +++ b/HandyIpc.Core/Extensions.cs @@ -5,39 +5,8 @@ namespace HandyIpc { - public static class Extensions + internal static class Extensions { - public static IServerRegistry Register(this IServerRegistry registry, string? key = null) - where TInterface : class - where TImpl : TInterface, new() - { - return registry.Register(() => new TImpl(), key); - } - - public static IServerRegistry Register(this IServerRegistry registry, Func factory, string? key = null) - where TInterface : class - { - key ??= typeof(TInterface).GetDefaultKey(); - return registry.Register(typeof(TInterface), factory, key); - } - - public static IServerRegistry Register(this IServerRegistry registry, Type interfaceType, Type classType, string? key = null) - { - key ??= interfaceType.GetDefaultKey(); - return classType.ContainsGenericParameters - ? registry.Register(interfaceType, GenericFactory, key) - : registry.Register(interfaceType, () => Activator.CreateInstance(classType), key); - - // Local Method - object GenericFactory(Type[] genericTypes) - { - var constructedClassType = classType.MakeGenericType(genericTypes); - return Activator.CreateInstance(constructedClassType); - } - } - - public static T Resolve(this IClient client) => client.Resolve(typeof(T).GetDefaultKey()); - internal static string GetDefaultKey(this Type interfaceType) { if (interfaceType.IsGenericType) diff --git a/HandyIpc.Core/HandyIpcBuilder.cs b/HandyIpc.Core/HandyIpcBuilder.cs deleted file mode 100644 index 3a15b11..0000000 --- a/HandyIpc.Core/HandyIpcBuilder.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace HandyIpc -{ - public static class HandyIpcBuilder - { - public static IServerBuilder CreateServerBuilder() => new ServerBuilder(); - - public static IClientBuilder CreateClientBuilder() => new ClientBuilder(); - } -} diff --git a/HandyIpc.Core/IClient.cs b/HandyIpc.Core/IClient.cs index 4aae703..e50b8c5 100644 --- a/HandyIpc.Core/IClient.cs +++ b/HandyIpc.Core/IClient.cs @@ -1,19 +1,11 @@ -using System; +using System.Threading.Tasks; namespace HandyIpc { - /// - /// It represents a IPC client instances. - /// - public interface IClient : IDisposable + public interface IClient { - /// - /// Gets or adds a IPC client instance with the specified interface type, - /// which is a proxy of remote server. - /// - /// The specified interface type. - /// A key for mark the instance is registered. - /// An IPC client proxy singleton of the type. - T Resolve(string key); + IConnection Connect(); + + Task ConnectAsync(); } } diff --git a/HandyIpc.Core/IClientBuilder.cs b/HandyIpc.Core/IClientBuilder.cs deleted file mode 100644 index e3fce6b..0000000 --- a/HandyIpc.Core/IClientBuilder.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace HandyIpc -{ - public interface IClientBuilder : IConfiguration - { - IClient Build(); - } -} diff --git a/HandyIpc.Core/IClientConfiguration.cs b/HandyIpc.Core/IClientConfiguration.cs new file mode 100644 index 0000000..e3a9122 --- /dev/null +++ b/HandyIpc.Core/IClientConfiguration.cs @@ -0,0 +1,16 @@ +using System; + +namespace HandyIpc +{ + public interface IClientConfiguration : IConfiguration + { + /// + /// Use a provider for the underlying communication. + /// + /// + /// A factory of the type for RMI (remote method invocation). + /// + /// The interface instance itself. + IClientConfiguration Use(Func factory); + } +} diff --git a/HandyIpc.Core/IConfiguration.cs b/HandyIpc.Core/IConfiguration.cs index 033e17c..920cf94 100644 --- a/HandyIpc.Core/IConfiguration.cs +++ b/HandyIpc.Core/IConfiguration.cs @@ -1,40 +1,22 @@ -using System; +using System; using HandyIpc.Core; namespace HandyIpc { - public interface IConfiguration + public interface IConfiguration where T : IConfiguration { - /// - /// Use a provider for the underlying communication. - /// - /// - /// A factory of the type for RMI (remote method invocation). - /// - /// The interface instance itself. - IConfiguration Use(Func factory); - - /// - /// Use a provider for the underlying communication. - /// - /// - /// A factory of the type for RMI (remote method invocation). - /// - /// The interface instance itself. - IConfiguration Use(Func factory); - /// /// Use a provider for serialization and deserialization. /// /// A factory for creating the instance. /// The interface instance itself. - IConfiguration Use(Func factory); + T Use(Func factory); /// /// Use a provider for logging. /// /// A factory for creating the instance. /// The interface instance itself. - IConfiguration Use(Func factory); + T Use(Func factory); } } diff --git a/HandyIpc.Core/IConnection.cs b/HandyIpc.Core/IConnection.cs new file mode 100644 index 0000000..4e1aa4d --- /dev/null +++ b/HandyIpc.Core/IConnection.cs @@ -0,0 +1,17 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace HandyIpc +{ + public interface IConnection : IDisposable + { + void Write(byte[] bytes); + + Task WriteAsync(byte[] bytes, CancellationToken token); + + byte[] Read(); + + Task ReadAsync(CancellationToken token); + } +} diff --git a/HandyIpc.Core/IContainerClient.cs b/HandyIpc.Core/IContainerClient.cs new file mode 100644 index 0000000..72f8072 --- /dev/null +++ b/HandyIpc.Core/IContainerClient.cs @@ -0,0 +1,19 @@ +using System; + +namespace HandyIpc +{ + /// + /// It represents a IPC client instances. + /// + public interface IContainerClient : IDisposable + { + /// + /// Gets or adds a IPC client instance with the specified interface type, + /// which is a proxy of remote server. + /// + /// The specified interface type. + /// A key for mark the instance is registered. + /// An IPC client proxy singleton of the type. + T Resolve(string key); + } +} diff --git a/HandyIpc.Core/IServerRegistry.cs b/HandyIpc.Core/IContainerRegistry.cs similarity index 82% rename from HandyIpc.Core/IServerRegistry.cs rename to HandyIpc.Core/IContainerRegistry.cs index 0ad138a..ce11944 100644 --- a/HandyIpc.Core/IServerRegistry.cs +++ b/HandyIpc.Core/IContainerRegistry.cs @@ -2,7 +2,7 @@ namespace HandyIpc { - public interface IServerRegistry + public interface IContainerRegistry { /// /// Registers a mapping between the non-generic contract interface and the non-generic concrete service class, @@ -12,7 +12,7 @@ public interface IServerRegistry /// The factory of the non-generic concrete service class. /// A key for mark the instance is registered. /// A token to stop the running service instance. - IServerRegistry Register(Type interfaceType, Func factory, string key); + IContainerRegistry Register(Type interfaceType, Func factory, string key); /// /// Registers a mapping between the generic contract interface and the generic concrete service class, @@ -22,6 +22,6 @@ public interface IServerRegistry /// The factory of the generic concrete service class. /// A key for mark the instance is registered. /// A token to stop the running service instance. - IServerRegistry Register(Type interfaceType, Func factory, string key); + IContainerRegistry Register(Type interfaceType, Func factory, string key); } } diff --git a/HandyIpc.Core/IContainerServer.cs b/HandyIpc.Core/IContainerServer.cs new file mode 100644 index 0000000..e3ff7d4 --- /dev/null +++ b/HandyIpc.Core/IContainerServer.cs @@ -0,0 +1,16 @@ +using System; + +namespace HandyIpc +{ + /// + /// It represents a IPC server instances. + /// + public interface IContainerServer : IDisposable + { + bool IsRunning { get; } + + void Start(); + + void Stop(); + } +} diff --git a/HandyIpc.Core/IServer.cs b/HandyIpc.Core/IServer.cs index 5b95ff4..7f6f7d2 100644 --- a/HandyIpc.Core/IServer.cs +++ b/HandyIpc.Core/IServer.cs @@ -1,16 +1,10 @@ -using System; +using System; +using System.Threading.Tasks; namespace HandyIpc { - /// - /// It represents a IPC server instances. - /// public interface IServer : IDisposable { - bool IsRunning { get; } - - void Start(); - - void Stop(); + Task WaitForConnectionAsync(); } } diff --git a/HandyIpc.Core/IServerBuilder.cs b/HandyIpc.Core/IServerBuilder.cs deleted file mode 100644 index 8d2d334..0000000 --- a/HandyIpc.Core/IServerBuilder.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace HandyIpc -{ - public interface IServerBuilder : IConfiguration, IServerRegistry - { - IServer Build(); - } -} diff --git a/HandyIpc.Core/IServerConfiguration.cs b/HandyIpc.Core/IServerConfiguration.cs new file mode 100644 index 0000000..366cfa3 --- /dev/null +++ b/HandyIpc.Core/IServerConfiguration.cs @@ -0,0 +1,16 @@ +using System; + +namespace HandyIpc +{ + public interface IServerConfiguration : IConfiguration + { + /// + /// Use a provider for the underlying communication. + /// + /// + /// A factory of the type for RMI (remote method invocation). + /// + /// The interface instance itself. + IServerConfiguration Use(Func factory); + } +} diff --git a/HandyIpc.Core/Implementation/AsyncPool.cs b/HandyIpc.Core/Implementation/AsyncPool.cs index 751e9fc..ae48991 100644 --- a/HandyIpc.Core/Implementation/AsyncPool.cs +++ b/HandyIpc.Core/Implementation/AsyncPool.cs @@ -1,14 +1,12 @@ using System; -using System.Collections.Concurrent; using System.Threading.Tasks; namespace HandyIpc.Implementation { - public class AsyncPool + internal sealed class AsyncPool : PoolBase where TValue : IDisposable { private readonly Func> _factory; private readonly Func> _checkValue; - private readonly ConcurrentBag _cache = new(); public AsyncPool(Func> factory, Func>? checkValue = null) { @@ -18,19 +16,21 @@ public AsyncPool(Func> factory, Func>? checkValu public async Task> RentAsync() { + CheckDisposed("AsyncPool"); + TValue value = await TakeOrCreateValue(); return new RentedValue(value, ReturnValue); // Local method - void ReturnValue(TValue rentedValue) => _cache.Add(rentedValue); + void ReturnValue(TValue rentedValue) => Cache.Add(rentedValue); } private async Task TakeOrCreateValue() { TValue result; - while (!_cache.TryTake(out result) || !await _checkValue(result)) + while (!Cache.TryTake(out result) || !await _checkValue(result)) { - _cache.Add(await _factory()); + Cache.Add(await _factory()); } return result; diff --git a/HandyIpc.Core/Implementation/Pool.cs b/HandyIpc.Core/Implementation/Pool.cs index 157d191..aec7306 100644 --- a/HandyIpc.Core/Implementation/Pool.cs +++ b/HandyIpc.Core/Implementation/Pool.cs @@ -1,13 +1,11 @@ using System; -using System.Collections.Concurrent; namespace HandyIpc.Implementation { - public class Pool + internal sealed class Pool : PoolBase where TValue : IDisposable { private readonly Func _factory; private readonly Func _checkValue; - private readonly ConcurrentBag _cache = new(); public Pool(Func factory, Func? checkValue = null) { @@ -17,19 +15,21 @@ public Pool(Func factory, Func? checkValue = null) public RentedValue Rent() { + CheckDisposed("Pool"); + TValue value = TakeOrCreateValue(); return new RentedValue(value, ReturnValue); // Local method - void ReturnValue(TValue rentedValue) => _cache.Add(rentedValue); + void ReturnValue(TValue rentedValue) => Cache.Add(rentedValue); } private TValue TakeOrCreateValue() { TValue result; - while (!_cache.TryTake(out result) || !_checkValue(result)) + while (!Cache.TryTake(out result) || !_checkValue(result)) { - _cache.Add(_factory()); + Cache.Add(_factory()); } return result; diff --git a/HandyIpc.Core/Implementation/PoolBase.cs b/HandyIpc.Core/Implementation/PoolBase.cs new file mode 100644 index 0000000..64b1468 --- /dev/null +++ b/HandyIpc.Core/Implementation/PoolBase.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Concurrent; + +namespace HandyIpc.Implementation +{ + internal abstract class PoolBase : IDisposable where TValue : IDisposable + { + protected readonly ConcurrentBag Cache = new(); + + private bool _isDisposed; + + protected void CheckDisposed(string objectName) + { + if (_isDisposed) + { + throw new ObjectDisposedException(objectName); + } + } + + protected virtual void Dispose(bool disposing) + { + _isDisposed = true; + + if (disposing) + { + while (Cache.TryTake(out TValue item)) + { + item.Dispose(); + } + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + } +} diff --git a/HandyIpc.Core/Implementation/RentedValue.cs b/HandyIpc.Core/Implementation/RentedValue.cs index e544d4d..b7e95a1 100644 --- a/HandyIpc.Core/Implementation/RentedValue.cs +++ b/HandyIpc.Core/Implementation/RentedValue.cs @@ -2,7 +2,7 @@ namespace HandyIpc.Implementation { - public readonly struct RentedValue : IDisposable + internal readonly struct RentedValue : IDisposable { private readonly Action _dispose; diff --git a/HandyIpc.NamedPipe/NamedPipeSender.cs b/HandyIpc.Core/Sender.cs similarity index 57% rename from HandyIpc.NamedPipe/NamedPipeSender.cs rename to HandyIpc.Core/Sender.cs index f46bff4..1706b3b 100644 --- a/HandyIpc.NamedPipe/NamedPipeSender.cs +++ b/HandyIpc.Core/Sender.cs @@ -1,44 +1,48 @@ using System; -using System.IO.Pipes; using System.Threading; using System.Threading.Tasks; using HandyIpc.Core; using HandyIpc.Implementation; -namespace HandyIpc.NamedPipe +namespace HandyIpc { - internal class NamedPipeSender : SenderBase + public sealed class Sender : IDisposable { - private readonly string _pipeName; + private readonly IClient _client; private readonly Pool _clientPool; private readonly AsyncPool _asyncClientPool; - public NamedPipeSender(string pipeName) + internal Sender(IClient client) { - _pipeName = pipeName; + _client = client; _clientPool = new Pool(CreateClient, CheckClient); _asyncClientPool = new AsyncPool(CreateAsyncClient, CheckAsyncClient); } - public override byte[] Invoke(byte[] requestBytes) + public byte[] Invoke(byte[] bytes) { using RentedValue invokeOwner = _clientPool.Rent(); - byte[] response = invokeOwner.Value.Invoke(requestBytes); + byte[] response = invokeOwner.Value.Invoke(bytes); return response; } - public override async Task InvokeAsync(byte[] requestBytes) + public async Task InvokeAsync(byte[] bytes) { using RentedValue invokeOwner = await _asyncClientPool.RentAsync(); - byte[] response = await invokeOwner.Value.InvokeAsync(requestBytes, CancellationToken.None); + byte[] response = await invokeOwner.Value.InvokeAsync(bytes, CancellationToken.None); return response; } private ClientItem CreateClient() { - var stream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut); - stream.Connect(); - return new ClientItem(stream); + IConnection connection = _client.Connect(); + return new ClientItem(connection); + } + + private async Task CreateAsyncClient() + { + IConnection connection = await _client.ConnectAsync(); + return new AsyncClientItem(connection); } private static bool CheckClient(ClientItem item) @@ -55,13 +59,6 @@ private static bool CheckClient(ClientItem item) } } - private async Task CreateAsyncClient() - { - var stream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut); - await stream.ConnectAsync(); - return new AsyncClientItem(stream); - } - private static async Task CheckAsyncClient(AsyncClientItem item) { try @@ -78,51 +75,54 @@ private static async Task CheckAsyncClient(AsyncClientItem item) private sealed class ClientItem : IDisposable { - private readonly NamedPipeClientStream _stream; + private readonly IConnection _connection; - public ClientItem(NamedPipeClientStream stream) => _stream = stream; + public ClientItem(IConnection connection) => _connection = connection; public byte[] Invoke(byte[] input) { try { - _stream.Write(input, 0, input.Length); - _stream.Flush(); - return _stream.ReadAllBytes(); + _connection.Write(input); + return _connection.Read(); } catch { - _stream.Dispose(); + _connection.Dispose(); throw; } } - public void Dispose() => _stream.Dispose(); + public void Dispose() => _connection.Dispose(); } private sealed class AsyncClientItem : IDisposable { - private readonly NamedPipeClientStream _stream; + private readonly IConnection _connection; - public AsyncClientItem(NamedPipeClientStream stream) => _stream = stream; + public AsyncClientItem(IConnection connection) => _connection = connection; public async Task InvokeAsync(byte[] input, CancellationToken token) { try { - token.ThrowIfCancellationRequested(); - await _stream.WriteAsync(input, 0, input.Length, token); - await _stream.FlushAsync(token); - return await _stream.ReadAllBytesAsync(token); + await _connection.WriteAsync(input, token); + return await _connection.ReadAsync(token); } catch { - _stream.Dispose(); + _connection.Dispose(); throw; } } - public void Dispose() => _stream.Dispose(); + public void Dispose() => _connection.Dispose(); + } + + public void Dispose() + { + _clientPool.Dispose(); + _asyncClientPool.Dispose(); } } } diff --git a/HandyIpc.Core/Server.cs b/HandyIpc.Core/Server.cs deleted file mode 100644 index 31bd5f3..0000000 --- a/HandyIpc.Core/Server.cs +++ /dev/null @@ -1,65 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using HandyIpc.Core; - -namespace HandyIpc -{ - internal sealed class Server : IServer - { - private readonly ReceiverBase _receiver; - private readonly Middleware _middleware; - private readonly ISerializer _serializer; - private readonly ILogger _logger; - - private CancellationTokenSource? _cancellationTokenSource; - - public bool IsRunning { get; private set; } - - public Server(ReceiverBase receiver, Middleware middleware, ISerializer serializer, ILogger logger) - { - _receiver = receiver; - _middleware = middleware; - _serializer = serializer; - _logger = logger; - } - - public void Start() - { - if (_cancellationTokenSource is null or { IsCancellationRequested: true }) - { - _cancellationTokenSource = new CancellationTokenSource(); - } - -#pragma warning disable 4014 - // Async run the server without waiting. - CatchException(_receiver.StartAsync(_middleware.ToHandler(_serializer, _logger), _cancellationTokenSource.Token)); -#pragma warning restore 4014 - - IsRunning = true; - } - - public void Stop() - { - _cancellationTokenSource?.Cancel(); - IsRunning = false; - } - - public void Dispose() - { - Stop(); - } - - private async Task CatchException(Task task) - { - try - { - await task; - } - catch (Exception e) - { - _logger.Error("Unexpected exception occurred when starting the server instance.", e); - } - } - } -} diff --git a/HandyIpc.Core/StreamConnection.cs b/HandyIpc.Core/StreamConnection.cs new file mode 100644 index 0000000..05e2d54 --- /dev/null +++ b/HandyIpc.Core/StreamConnection.cs @@ -0,0 +1,35 @@ +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace HandyIpc +{ + public sealed class StreamConnection : IConnection + { + private readonly Stream _stream; + + public StreamConnection(Stream stream) => _stream = stream; + + public void Write(byte[] bytes) + { + _stream.Write(bytes, 0, bytes.Length); + } + + public Task WriteAsync(byte[] bytes, CancellationToken token) + { + return _stream.WriteAsync(bytes, 0, bytes.Length, token); + } + + public byte[] Read() + { + return _stream.ReadAllBytes(); + } + + public Task ReadAsync(CancellationToken token) + { + return _stream.ReadAllBytesAsync(token); + } + + public void Dispose() => _stream.Dispose(); + } +} diff --git a/HandyIpc.Core/StreamExtensions.cs b/HandyIpc.Core/StreamExtensions.cs new file mode 100644 index 0000000..6b56737 --- /dev/null +++ b/HandyIpc.Core/StreamExtensions.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace HandyIpc +{ + internal static class StreamExtensions + { + private const int BatchBufferSize = 8 * 1024; + + internal static byte[] ReadAllBytes(this Stream self) + { + // TODO: Refactoring by System.IO.Pipelines, ArrayPool or stackalloc and so on. + var collector = new List(); + while (true) + { + byte[] bytes = new byte[BatchBufferSize]; + int actualCount = self.Read(bytes, 0, BatchBufferSize); + + if (CollectBytes(collector, bytes, actualCount)) + { + break; + } + } + + return ConcatBytesList(collector); + } + + internal static async Task ReadAllBytesAsync(this Stream self, CancellationToken token) + { + var collector = new List(); + while (true) + { + if (token.IsCancellationRequested) + { + break; + } + + byte[] bytes = new byte[BatchBufferSize]; + int actualCount = await self.ReadAsync(bytes, 0, BatchBufferSize, token); + + if (CollectBytes(collector, bytes, actualCount)) + { + break; + } + } + + return ConcatBytesList(collector); + } + + /// + /// Fill a bytes list, and return a bool value to indicate whether the process has been completed.
+ /// I known this method is so ugly: It modifies external collection, but it was extracted to be able to + /// reuse code in async and sync methods. + ///
+ /// The filled bytes list. + /// The buffered bytes. + /// The actual length of the buffered bytes (). + /// The bool value indicate whether it has been completed. + private static bool CollectBytes(ICollection collector, byte[] bytes, int actualCount) + { + if (actualCount == 0) + { + return true; + } + + if (actualCount < BatchBufferSize) + { + byte[] tailBytes = new byte[actualCount]; + Array.Copy(bytes, tailBytes, actualCount); + collector.Add(tailBytes); + return true; + } + + collector.Add(bytes); + return false; + } + + private static byte[] ConcatBytesList(IReadOnlyList bytesList) + { + int totalSize = bytesList.Sum(item => item.Length); + byte[] totalBytes = new byte[totalSize]; + int offset = 0; + // "for" performance is higher than "foreach". + // ReSharper disable once ForCanBeConvertedToForeach + for (int i = 0; i < bytesList.Count; i++) + { + byte[] itemBytes = bytesList[i]; + itemBytes.CopyTo(totalBytes, offset); + offset += itemBytes.Length; + } + + return totalBytes; + } + } +} diff --git a/HandyIpc.Generator/ClientProxy.cs b/HandyIpc.Generator/ClientProxy.cs index 53dae0c..953762e 100644 --- a/HandyIpc.Generator/ClientProxy.cs +++ b/HandyIpc.Generator/ClientProxy.cs @@ -16,6 +16,7 @@ public static string Generate(INamedTypeSymbol @interface, IReadOnlyCollection - /// Uses the Named Pipe as the underlying communication technology for building the IPC hub instance. - ///
- /// An factory instance. - /// The name of the named pipe. - /// The factory instance itself. - public static IConfiguration UseNamedPipe(this IConfiguration self, string pipeName) - { - return self - .Use(() => new NamedPipeSender(pipeName)) - .Use(() => new NamedPipeReceiver(pipeName)); - } - - internal static byte[] ReadAllBytes(this Stream self) + public static IServerConfiguration UseNamedPipe(this IServerConfiguration self, string pipeName) { - // TODO: Refactoring by System.IO.Pipelines, ArrayPool or stackalloc and so on. - var collector = new List(); - while (true) - { - byte[] bytes = new byte[BatchBufferSize]; - int actualCount = self.Read(bytes, 0, BatchBufferSize); - - if (CollectBytes(collector, bytes, actualCount)) - { - break; - } - } - - return ConcatBytesList(collector); + return self.Use(() => new NamedPipeIpcServer(pipeName)); } - internal static async Task ReadAllBytesAsync(this Stream self, CancellationToken token) + public static IClientConfiguration UseNamedPipe(this IClientConfiguration self, string pipeName) { - // TODO: Refactoring by System.IO.Pipelines, ArrayPool or stackalloc and so on. - var collector = new List(); - while (true) - { - if (token.IsCancellationRequested) - { - break; - } - - byte[] bytes = new byte[BatchBufferSize]; - int actualCount = await self.ReadAsync(bytes, 0, BatchBufferSize, token); - - if (CollectBytes(collector, bytes, actualCount)) - { - break; - } - } - - return ConcatBytesList(collector); - } - - /// - /// Fill a bytes list, and return a bool value to indicate whether the process has been completed.
- /// I known this method is so ugly: It modifies external collection, but it was extracted to be able to - /// reuse code in async and sync methods. - ///
- /// The filled bytes list. - /// The buffered bytes. - /// The actual length of the buffered bytes (). - /// The bool value indicate whether it has been completed. - private static bool CollectBytes(ICollection collector, byte[] bytes, int actualCount) - { - if (actualCount == 0) - { - return true; - } - - if (actualCount < BatchBufferSize) - { - byte[] tailBytes = new byte[actualCount]; - Array.Copy(bytes, tailBytes, actualCount); - collector.Add(tailBytes); - return true; - } - - collector.Add(bytes); - return false; - } - - private static byte[] ConcatBytesList(IReadOnlyList bytesList) - { - int totalSize = bytesList.Sum(item => item.Length); - byte[] totalBytes = new byte[totalSize]; - int offset = 0; - // "for" performance is higher than "foreach". - // ReSharper disable once ForCanBeConvertedToForeach - for (int i = 0; i < bytesList.Count; i++) - { - byte[] itemBytes = bytesList[i]; - itemBytes.CopyTo(totalBytes, offset); - offset += itemBytes.Length; - } - - return totalBytes; + return self.Use(() => new NamedPipeIpcClient(pipeName)); } } } diff --git a/HandyIpc.NamedPipe/NamedPipeIpcClient.cs b/HandyIpc.NamedPipe/NamedPipeIpcClient.cs new file mode 100644 index 0000000..869d72d --- /dev/null +++ b/HandyIpc.NamedPipe/NamedPipeIpcClient.cs @@ -0,0 +1,29 @@ +using System.IO.Pipes; +using System.Threading.Tasks; + +namespace HandyIpc.NamedPipe +{ + internal class NamedPipeIpcClient : IClient + { + private readonly string _pipeName; + + public NamedPipeIpcClient(string pipeName) + { + _pipeName = pipeName; + } + + public IConnection Connect() + { + var stream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut); + stream.Connect(); + return new StreamConnection(stream); + } + + public async Task ConnectAsync() + { + var stream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut); + await stream.ConnectAsync(); + return new StreamConnection(stream); + } + } +} diff --git a/HandyIpc.NamedPipe/NamedPipeIpcServer.cs b/HandyIpc.NamedPipe/NamedPipeIpcServer.cs new file mode 100644 index 0000000..ac2bd5b --- /dev/null +++ b/HandyIpc.NamedPipe/NamedPipeIpcServer.cs @@ -0,0 +1,30 @@ +using System.IO.Pipes; +using System.Threading; +using System.Threading.Tasks; + +namespace HandyIpc.NamedPipe +{ + internal class NamedPipeIpcServer : IServer + { + private readonly string _pipeName; + private readonly CancellationTokenSource _source = new(); + + public NamedPipeIpcServer(string pipeName) + { + _pipeName = pipeName; + } + + public async Task WaitForConnectionAsync() + { + var stream = new NamedPipeServerStream(_pipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances); + await stream.WaitForConnectionAsync(_source.Token); + return new StreamConnection(stream); + } + + public void Dispose() + { + _source.Cancel(); + _source.Dispose(); + } + } +} diff --git a/HandyIpc.NamedPipe/NamedPipeReceiver.cs b/HandyIpc.NamedPipe/NamedPipeReceiver.cs deleted file mode 100644 index 7ce92ff..0000000 --- a/HandyIpc.NamedPipe/NamedPipeReceiver.cs +++ /dev/null @@ -1,75 +0,0 @@ -using System; -using System.IO.Pipes; -using System.Threading; -using System.Threading.Tasks; -using HandyIpc.Core; - -namespace HandyIpc.NamedPipe -{ - internal class NamedPipeReceiver : ReceiverBase - { - private readonly string _pipeName; - - public NamedPipeReceiver(string pipeName) => _pipeName = pipeName; - - public override async Task StartAsync(RequestHandler handler, CancellationToken token) - { - while (!token.IsCancellationRequested) - { - try - { - var stream = await CreateServerStreamAsync(_pipeName, token); - - if (token.IsCancellationRequested) - { - break; - } - - // Do not await the request handler, and go to await next stream connection directly. -#pragma warning disable 4014 - HandleRequestAsync(stream, handler, token); -#pragma warning restore 4014 - } - catch (OperationCanceledException) - { - // Ignore - } - catch (Exception e) - { - Logger.Error($"An unexpected exception occurred in the server (Id: {_pipeName}).", e); - } - } - } - - private static async Task CreateServerStreamAsync(string pipeName, CancellationToken token) - { - var stream = new NamedPipeServerStream(pipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances); - await stream.WaitForConnectionAsync(token); - return stream; - } - - private static async Task HandleRequestAsync(NamedPipeServerStream stream, RequestHandler handler, CancellationToken token) - { - using (stream) - { - while (true) - { - if (!stream.IsConnected || token.IsCancellationRequested) - { - break; - } - - byte[] buffer = await stream.ReadAllBytesAsync(token); - if (buffer.Length == 0) - { - continue; - } - - byte[] output = await handler(buffer); - await stream.WriteAsync(output, 0, output.Length, token); - await stream.FlushAsync(token); - } - } - } - } -} diff --git a/HandyIpc.Serializer.Json/Extensions.cs b/HandyIpc.Serializer.Json/Extensions.cs index 9fcad29..ba048ba 100644 --- a/HandyIpc.Serializer.Json/Extensions.cs +++ b/HandyIpc.Serializer.Json/Extensions.cs @@ -13,7 +13,7 @@ public static class Extensions TypeNameHandling = TypeNameHandling.Auto, }; - public static IConfiguration UseJsonSerializer(this IConfiguration self) + public static IConfiguration UseJsonSerializer(this IConfiguration self) where T : IConfiguration { return self.Use(() => new JsonSerializer()); } diff --git a/HandyIpc.Socket/Extensions.cs b/HandyIpc.Socket/Extensions.cs index 3808dec..d35dc5f 100644 --- a/HandyIpc.Socket/Extensions.cs +++ b/HandyIpc.Socket/Extensions.cs @@ -1,124 +1,17 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; using System.Net; -using System.Threading; -using System.Threading.Tasks; namespace HandyIpc.Socket { public static class Extensions { - private const int BatchBufferSize = 4 * 1024; - - private static readonly char[] IdentifierSplitter = { ':', ' ' }; - - public static IConfiguration UseTcp(this IConfiguration self, IPAddress ip, int port) - { - return self - .Use(() => new TcpSender(ip, port)) - .Use(() => new TcpReceiver(ip, port)); - } - - internal static (IPAddress ip, int port) ToIpEndPoint(this string connectionString) - { - const int ipIndex = 0; - const int portIndex = 1; - - string[] address = connectionString.Split(IdentifierSplitter, StringSplitOptions.RemoveEmptyEntries); - if (address.Length != 2) - { - throw new FormatException($"Invalid identifier. The correct format is 'x.x.x.x:port', rather than '{connectionString}'"); - } - - return (IPAddress.Parse(address[ipIndex]), int.Parse(address[portIndex])); - } - - internal static byte[] ReadAllBytes(this Stream self) - { - // TODO: Refactoring by System.IO.Pipelines, ArrayPool or stackalloc and so on. - var collector = new List(); - while (true) - { - byte[] bytes = new byte[BatchBufferSize]; - int actualCount = self.Read(bytes, 0, BatchBufferSize); - - if (CollectBytes(collector, bytes, actualCount)) - { - break; - } - } - - return ConcatBytesList(collector); - } - - internal static async Task ReadAllBytesAsync(this Stream self, CancellationToken token) - { - // TODO: Refactoring by System.IO.Pipelines, ArrayPool or stackalloc and so on. - var collector = new List(); - while (true) - { - if (token.IsCancellationRequested) - { - break; - } - - byte[] bytes = new byte[BatchBufferSize]; - int actualCount = await self.ReadAsync(bytes, 0, BatchBufferSize, token); - - if (CollectBytes(collector, bytes, actualCount)) - { - break; - } - } - - return ConcatBytesList(collector); - } - - /// - /// Fill a bytes list, and return a bool value to indicate whether the process has been completed.
- /// I known this method is so ugly: It modifies external collection, but it was extracted to be able to - /// reuse code in async and sync methods. - ///
- /// The filled bytes list. - /// The buffered bytes. - /// The actual length of the buffered bytes (). - /// The bool value indicate whether it has been completed. - private static bool CollectBytes(ICollection collector, byte[] bytes, int actualCount) + public static IServerConfiguration UseTcp(this IServerConfiguration self, IPAddress ip, int port) { - if (actualCount == 0) - { - return true; - } - - if (actualCount < BatchBufferSize) - { - byte[] tailBytes = new byte[actualCount]; - Array.Copy(bytes, tailBytes, actualCount); - collector.Add(tailBytes); - return true; - } - - collector.Add(bytes); - return false; + return self.Use(() => new TcpIpcServer(ip, port)); } - private static byte[] ConcatBytesList(IReadOnlyList bytesList) + public static IClientConfiguration UseTcp(this IClientConfiguration self, IPAddress ip, int port) { - int totalSize = bytesList.Sum(item => item.Length); - byte[] totalBytes = new byte[totalSize]; - int offset = 0; - // "for" performance is higher than "foreach". - // ReSharper disable once ForCanBeConvertedToForeach - for (int i = 0; i < bytesList.Count; i++) - { - byte[] itemBytes = bytesList[i]; - itemBytes.CopyTo(totalBytes, offset); - offset += itemBytes.Length; - } - - return totalBytes; + return self.Use(() => new TcpIpcClient(ip, port)); } } } diff --git a/HandyIpc.Socket/TcpIpcClient.cs b/HandyIpc.Socket/TcpIpcClient.cs new file mode 100644 index 0000000..78aaac1 --- /dev/null +++ b/HandyIpc.Socket/TcpIpcClient.cs @@ -0,0 +1,32 @@ +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; + +namespace HandyIpc.Socket +{ + internal class TcpIpcClient : IClient + { + private readonly IPAddress _ip; + private readonly int _port; + + public TcpIpcClient(IPAddress ip, int port) + { + _ip = ip; + _port = port; + } + + public IConnection Connect() + { + TcpClient client = new(); + client.Connect(_ip, _port); + return new StreamConnection(client.GetStream()); + } + + public async Task ConnectAsync() + { + TcpClient client = new(); + await client.ConnectAsync(_ip, _port); + return new StreamConnection(client.GetStream()); + } + } +} diff --git a/HandyIpc.Socket/TcpIpcServer.cs b/HandyIpc.Socket/TcpIpcServer.cs new file mode 100644 index 0000000..36f0a75 --- /dev/null +++ b/HandyIpc.Socket/TcpIpcServer.cs @@ -0,0 +1,25 @@ +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; + +namespace HandyIpc.Socket +{ + internal class TcpIpcServer : IServer + { + private readonly TcpListener _tcpListener; + + public TcpIpcServer(IPAddress ip, int port) + { + _tcpListener = new TcpListener(ip, port); + _tcpListener.Start(); + } + + public async Task WaitForConnectionAsync() + { + TcpClient client = await _tcpListener.AcceptTcpClientAsync(); + return new StreamConnection(client.GetStream()); + } + + public void Dispose() => _tcpListener.Stop(); + } +} diff --git a/HandyIpc.Socket/TcpReceiver.cs b/HandyIpc.Socket/TcpReceiver.cs deleted file mode 100644 index 237ffae..0000000 --- a/HandyIpc.Socket/TcpReceiver.cs +++ /dev/null @@ -1,88 +0,0 @@ -using System; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; -using HandyIpc.Core; - -namespace HandyIpc.Socket -{ - internal class TcpReceiver : ReceiverBase - { - private readonly IPAddress _ip; - private readonly int _port; - - public TcpReceiver(IPAddress ip, int port) - { - _ip = ip; - _port = port; - } - - public override async Task StartAsync(RequestHandler handler, CancellationToken token) - { - TcpListener listener = CreateTcpListener(_ip, _port); - while (!token.IsCancellationRequested) - { - try - { - TcpClient client = await listener.AcceptTcpClientAsync(); - - if (token.IsCancellationRequested) - { - break; - } - - // Do not await the request handler, and go to await next stream connection directly. -#pragma warning disable 4014 - HandleRequestAsync(client, handler, token); -#pragma warning restore 4014 - } - catch (OperationCanceledException) - { - // Ignore - } - catch (Exception e) - { - Logger.Error($"An unexpected exception occurred in the server (IP: {_ip}, Port: {_port}).", e); - } - } - - listener.Stop(); - } - - private static TcpListener CreateTcpListener(IPAddress ip, int port) - { - var listener = new TcpListener(ip, port); - listener.Start(); - return listener; - } - - private static async Task HandleRequestAsync(TcpClient client, RequestHandler handler, CancellationToken token) - { - using (client) - { - while (true) - { - if (!client.Connected || token.IsCancellationRequested) - { - break; - } - - NetworkStream stream = client.GetStream(); - - byte[] buffer = await stream.ReadAllBytesAsync(token); - - if (buffer.Length == 0) - { - continue; - } - - byte[] output = await handler(buffer); - - await stream.WriteAsync(output, 0, output.Length, token); - await stream.FlushAsync(token); - } - } - } - } -} diff --git a/HandyIpc.Socket/TcpSender.cs b/HandyIpc.Socket/TcpSender.cs deleted file mode 100644 index 407f3f0..0000000 --- a/HandyIpc.Socket/TcpSender.cs +++ /dev/null @@ -1,153 +0,0 @@ -using System; -using System.Net; -using System.Net.Sockets; -using System.Threading; -using System.Threading.Tasks; -using HandyIpc.Core; -using HandyIpc.Implementation; - -namespace HandyIpc.Socket -{ - internal class TcpSender : SenderBase - { - private readonly IPAddress _ip; - private readonly int _port; - private readonly Pool _clientPool; - private readonly AsyncPool _asyncClientPool; - - public TcpSender(IPAddress ip, int port) - { - _ip = ip; - _port = port; - _clientPool = new Pool(CreateClient, CheckClient); - _asyncClientPool = new AsyncPool(CreateAsyncClient, CheckAsyncClient); - } - - public override byte[] Invoke(byte[] requestBytes) - { - using RentedValue invokeOwner = _clientPool.Rent(); - byte[] response = invokeOwner.Value.Invoke(requestBytes); - return response; - } - - public override async Task InvokeAsync(byte[] requestBytes) - { - using RentedValue invokeOwner = await _asyncClientPool.RentAsync(); - byte[] response = await invokeOwner.Value.InvokeAsync(requestBytes, CancellationToken.None); - return response; - } - - private ClientItem CreateClient() - { - try - { - TcpClient client = new(); - client.Connect(_ip, _port); - return new ClientItem(client); - } - catch (Exception e) - { - Logger.Error("Unexpected exception occurred when sending message by creating tcp client.", e); - throw; - } - } - - private bool CheckClient(ClientItem item) - { - try - { - byte[] response = item.Invoke(Signals.Empty); - return response.IsEmpty(); - } - catch (Exception e) - { - Logger.Error("Unexpected exception occurred when sending message by tcp client.", e); - item.Dispose(); - return false; - } - } - - private async Task CreateAsyncClient() - { - try - { - TcpClient client = new(); - await client.ConnectAsync(_ip, _port); - return new AsyncClientItem(client); - } - catch (Exception e) - { - Logger.Error("Unexpected exception occurred when sending ping by creating tcp client.", e); - throw; - } - } - - private async Task CheckAsyncClient(AsyncClientItem item) - { - try - { - byte[] response = await item.InvokeAsync(Signals.Empty, CancellationToken.None); - return response.IsEmpty(); - } - catch (Exception e) - { - Logger.Error("Unexpected exception occurred when sending ping by tcp client.", e); - item.Dispose(); - return false; - } - } - - private sealed class ClientItem : IDisposable - { - private readonly TcpClient _client; - - public ClientItem(TcpClient client) => _client = client; - - public byte[] Invoke(byte[] input) - { - try - { - NetworkStream stream = _client.GetStream(); - stream.Write(input, 0, input.Length); - stream.Flush(); - return stream.ReadAllBytes(); - } - catch - { - _client.Dispose(); - throw; - } - } - - public void Dispose() => _client.Dispose(); - } - - private sealed class AsyncClientItem : IDisposable - { - private readonly TcpClient _client; - - public AsyncClientItem(TcpClient client) => _client = client; - - public async Task InvokeAsync(byte[] input, CancellationToken token) - { - try - { - NetworkStream stream = _client.GetStream(); - - token.ThrowIfCancellationRequested(); - - await stream.WriteAsync(input, 0, input.Length, token); - await stream.FlushAsync(token); - return await stream.ReadAllBytesAsync(token); - } - catch - { - _client.Dispose(); - throw; - } - } - - public void Dispose() => _client.Dispose(); - } - } -} diff --git a/HandyIpc.Tests/EndToEndTestFixture.cs b/HandyIpc.Tests/EndToEndTestFixture.cs index f00ee40..69262ca 100644 --- a/HandyIpc.Tests/EndToEndTestFixture.cs +++ b/HandyIpc.Tests/EndToEndTestFixture.cs @@ -1,6 +1,7 @@ using System; using System.Net; using HandyIpc; +using HandyIpc.NamedPipe; using HandyIpc.Serializer.Json; using HandyIpc.Socket; using HandyIpcTests.Implementations; @@ -10,25 +11,29 @@ namespace HandyIpcTests { public sealed class EndToEndTestFixture : IDisposable { - private readonly IServer _server; + private readonly IContainerServer _server; - public IClient Client { get; } + public IContainerClient Client { get; } public EndToEndTestFixture() { - IClientBuilder clientBuilder = HandyIpcBuilder.CreateClientBuilder(); + ContainerClientBuilder clientBuilder = new(); clientBuilder - .UseJsonSerializer() - .UseTcp(IPAddress.Loopback, 10086); + .UseTcp(IPAddress.Loopback, 10086) + //.UseNamedPipe("ec57043f-465c-4766-ae49-b9b1ee9ac571") + .UseJsonSerializer(); Client = clientBuilder.Build(); - IServerBuilder serverBuilder = HandyIpcBuilder.CreateServerBuilder(); + ContainerServerBuilder serverBuilder = new(); serverBuilder - .UseJsonSerializer() - .UseTcp(IPAddress.Loopback, 10086); + .UseTcp(IPAddress.Loopback, 10086) + //.UseNamedPipe("ec57043f-465c-4766-ae49-b9b1ee9ac571") + .UseJsonSerializer(); + serverBuilder .Register() .Register(typeof(IGenericTest<,>), typeof(GenericTest<,>)); + _server = serverBuilder.Build(); _server.Start(); } @@ -36,6 +41,8 @@ public EndToEndTestFixture() public void Dispose() { Client.Dispose(); + + _server.Stop(); _server.Dispose(); } }