Skip to content

Commit

Permalink
feat(core): deferring the implementation of identifier to downstream (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
DingpingZhang committed Aug 10, 2021
1 parent 040b509 commit a3138b3
Show file tree
Hide file tree
Showing 20 changed files with 102 additions and 136 deletions.
9 changes: 1 addition & 8 deletions HandyIpc.Core/ClientHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,7 @@ public T Resolve<T>()
return (T)_typeInstanceMapping.GetOrAdd(typeof(T), key =>
{
Type type = key.GetClientType();
if (key.IsGenericType)
{
key = key.GetGenericTypeDefinition();
}
string identifier = key.ResolveIdentifier();
return Activator.CreateInstance(type, _sender, _serializer, identifier);
return Activator.CreateInstance(type, _sender, _serializer);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion HandyIpc.Core/Core/ReceiverBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public virtual Middleware BuildMiddleware(Type interfaceType, Func<Type[], objec
return BuildBasicMiddleware().Then(genericDispatcher);
}

public abstract Task StartAsync(string identifier, RequestHandler handler, CancellationToken token);
public abstract Task StartAsync(RequestHandler handler, CancellationToken token);

private static Middleware BuildBasicMiddleware()
{
Expand Down
4 changes: 2 additions & 2 deletions HandyIpc.Core/Core/SenderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ public abstract class SenderBase

internal void SetLogger(ILogger logger) => Logger = logger;

public abstract byte[] Invoke(string identifier, byte[] requestBytes);
public abstract byte[] Invoke(byte[] requestBytes);

public abstract Task<byte[]> InvokeAsync(string identifier, byte[] requestBytes);
public abstract Task<byte[]> InvokeAsync(byte[] requestBytes);
}
}
22 changes: 0 additions & 22 deletions HandyIpc.Core/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using System;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;

namespace HandyIpc
{
Expand Down Expand Up @@ -49,24 +45,6 @@ internal static Type GetServerProxyType(this Type interfaceType)
return GetAutoGeneratedType(interfaceType, "ServerProxy");
}

internal static string ResolveIdentifier(this Type interfaceType)
{
Guards.ThrowIfNot(interfaceType.IsInterface, "The type must be interface type.", nameof(interfaceType));

var attribute = interfaceType.GetCustomAttribute<IpcContractAttribute>(false);
string identifier = attribute.Identifier;

if (string.IsNullOrEmpty(identifier))
{
using var sha256 = new SHA256CryptoServiceProvider();
byte[] buffer = Encoding.UTF8.GetBytes(interfaceType.AssemblyQualifiedName!);
byte[] sha256Bytes = sha256.ComputeHash(buffer);
identifier = string.Concat(sha256Bytes.Select(item => item.ToString("X2")));
}

return identifier;
}

private static Type GetAutoGeneratedType(Type interfaceType, string category)
{
string typeName;
Expand Down
26 changes: 11 additions & 15 deletions HandyIpc.Core/Implementation/AsyncPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,36 @@

namespace HandyIpc.Implementation
{
public class AsyncPool<TKey, TValue>
public class AsyncPool<TValue>
{
private readonly Func<TKey, Task<TValue>> _factory;
private readonly Func<Task<TValue>> _factory;
private readonly Func<TValue, Task<bool>> _checkValue;
private readonly ConcurrentDictionary<TKey, ConcurrentBag<TValue>> _cache = new();
private readonly ConcurrentBag<TValue> _cache = new();

public AsyncPool(Func<TKey, Task<TValue>> factory, Func<TValue, Task<bool>>? checkValue = null)
public AsyncPool(Func<Task<TValue>> factory, Func<TValue, Task<bool>>? checkValue = null)
{
_factory = factory;
_checkValue = checkValue ?? (_ => Task.FromResult(true));
}

public async Task<IRentedValue<TValue>> RentAsync(TKey key)
public async Task<IRentedValue<TValue>> RentAsync()
{
TValue value = await TakeOrCreateValue(key);
return new RentedValue<TKey, TValue>(key, value, ReturnValue);
TValue value = await TakeOrCreateValue();
return new RentedValue<TValue>(value, ReturnValue);

// Local method
void ReturnValue(TKey rentedKey, TValue rentedValue) => GetBag(rentedKey).Add(rentedValue);
void ReturnValue(TValue rentedValue) => _cache.Add(rentedValue);
}

private async Task<TValue> TakeOrCreateValue(TKey key)
private async Task<TValue> TakeOrCreateValue()
{
ConcurrentBag<TValue> bag = GetBag(key);

TValue result;
while (!bag.TryTake(out result) || !await _checkValue(result))
while (!_cache.TryTake(out result) || !await _checkValue(result))
{
bag.Add(await _factory(key));
_cache.Add(await _factory());
}

return result;
}

private ConcurrentBag<TValue> GetBag(TKey key) => _cache.GetOrAdd(key, _ => new ConcurrentBag<TValue>());
}
}
26 changes: 11 additions & 15 deletions HandyIpc.Core/Implementation/Pool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,36 @@

namespace HandyIpc.Implementation
{
public class Pool<TKey, TValue>
public class Pool<TValue>
{
private readonly Func<TKey, TValue> _factory;
private readonly Func<TValue> _factory;
private readonly Func<TValue, bool> _checkValue;
private readonly ConcurrentDictionary<TKey, ConcurrentBag<TValue>> _cache = new();
private readonly ConcurrentBag<TValue> _cache = new();

public Pool(Func<TKey, TValue> factory, Func<TValue, bool>? checkValue = null)
public Pool(Func<TValue> factory, Func<TValue, bool>? checkValue = null)
{
_factory = factory;
_checkValue = checkValue ?? (_ => true);
}

public IRentedValue<TValue> Rent(TKey key)
public IRentedValue<TValue> Rent()
{
TValue value = TakeOrCreateValue(key);
return new RentedValue<TKey, TValue>(key, value, ReturnValue);
TValue value = TakeOrCreateValue();
return new RentedValue<TValue>(value, ReturnValue);

// Local method
void ReturnValue(TKey rentedKey, TValue rentedValue) => GetBag(rentedKey).Add(rentedValue);
void ReturnValue(TValue rentedValue) => _cache.Add(rentedValue);
}

private TValue TakeOrCreateValue(TKey key)
private TValue TakeOrCreateValue()
{
ConcurrentBag<TValue> bag = GetBag(key);

TValue result;
while (!bag.TryTake(out result) || !_checkValue(result))
while (!_cache.TryTake(out result) || !_checkValue(result))
{
bag.Add(_factory(key));
_cache.Add(_factory());
}

return result;
}

private ConcurrentBag<TValue> GetBag(TKey key) => _cache.GetOrAdd(key, _ => new ConcurrentBag<TValue>());
}
}
10 changes: 4 additions & 6 deletions HandyIpc.Core/Implementation/RentedValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@

namespace HandyIpc.Implementation
{
internal sealed class RentedValue<TKey, TValue> : IRentedValue<TValue>
internal sealed class RentedValue<TValue> : IRentedValue<TValue>
{
private readonly TKey _key;
private readonly Action<TKey, TValue> _dispose;
private readonly Action<TValue> _dispose;

public TValue Value { get; }

public RentedValue(TKey key, TValue value, Action<TKey, TValue> dispose)
public RentedValue(TValue value, Action<TValue> dispose)
{
_key = key;
_dispose = dispose;
Value = value;
}

public void Dispose() => _dispose(_key, Value);
public void Dispose() => _dispose(Value);
}
}
1 change: 0 additions & 1 deletion HandyIpc.Core/IpcContractAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ namespace HandyIpc
[AttributeUsage(AttributeTargets.Interface)]
public sealed class IpcContractAttribute : Attribute
{
public string Identifier { get; set; } = string.Empty;
}
}
3 changes: 1 addition & 2 deletions HandyIpc.Core/ServerHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@ private void StopAndRemoveInterface(Type interfaceType)

private void StartInterface(Type interfaceType, Middleware middleware)
{
string identifier = interfaceType.ResolveIdentifier();
var source = new CancellationTokenSource();

#pragma warning disable 4014
// Async run the server without waiting.
CatchException(_receiver.StartAsync(identifier, middleware.ToHandler(_serializer, _logger), source.Token));
CatchException(_receiver.StartAsync(middleware.ToHandler(_serializer, _logger), source.Token));
#pragma warning restore 4014

_runningInterfaces.Add(interfaceType, source);
Expand Down
8 changes: 3 additions & 5 deletions HandyIpc.Generator/ClientProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ public class {nameof(ClientProxy)}{className}{typeParameters} : {interfaceType}
{{
private readonly SenderBase _sender;
private readonly ISerializer _serializer;
private readonly string _identifier;
public {nameof(ClientProxy)}{className}(SenderBase sender, ISerializer serializer, string identifier)
public {nameof(ClientProxy)}{className}(SenderBase sender, ISerializer serializer)
{{
_sender = sender;
_serializer = serializer;
_identifier = identifier;
}}
{methods.For(method =>
{
Expand Down Expand Up @@ -78,9 +76,9 @@ public class {nameof(ClientProxy)}{className}{typeParameters} : {interfaceType}
}};
")}
{Text(isAwaitable ? @"
var responseBytes = await _sender.InvokeAsync(_identifier, request.ToBytes());
var responseBytes = await _sender.InvokeAsync(request.ToBytes());
" : @"
var responseBytes = _sender.Invoke(_identifier, request.ToBytes());
var responseBytes = _sender.Invoke(request.ToBytes());
"
)}
{Text(isAwaitable ? $@"
Expand Down
6 changes: 3 additions & 3 deletions HandyIpc.NamedPipe/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ public static class Extensions
/// </summary>
/// <param name="self">An factory instance.</param>
/// <returns>The factory instance itself.</returns>
public static IHubBuilder UseNamedPipe(this IHubBuilder self)
public static IHubBuilder UseNamedPipe(this IHubBuilder self, string pipeName)
{
return self
.Use(() => new NamedPipeSender())
.Use(() => new NamedPipeReceiver());
.Use(() => new NamedPipeSender(pipeName))
.Use(() => new NamedPipeReceiver(pipeName));
}

internal static byte[] ReadAllBytes(this PipeStream self)
Expand Down
10 changes: 7 additions & 3 deletions HandyIpc.NamedPipe/NamedPipeReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ namespace HandyIpc.NamedPipe
{
internal class NamedPipeReceiver : ReceiverBase
{
public override async Task StartAsync(string identifier, RequestHandler handler, CancellationToken token)
private readonly string _pipeName;

public NamedPipeReceiver(string pipeName) => _pipeName = pipeName;

public override async Task StartAsync(RequestHandler handler, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
var stream = await CreateServerStreamAsync(identifier, token);
var stream = await CreateServerStreamAsync(_pipeName, token);

if (token.IsCancellationRequested)
{
Expand All @@ -32,7 +36,7 @@ public override async Task StartAsync(string identifier, RequestHandler handler,
}
catch (Exception e)
{
Logger.Error($"An unexpected exception occurred in the server (Id: {identifier}).", e);
Logger.Error($"An unexpected exception occurred in the server (Id: {_pipeName}).", e);
}
}
}
Expand Down
28 changes: 15 additions & 13 deletions HandyIpc.NamedPipe/NamedPipeSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,34 @@ namespace HandyIpc.NamedPipe
{
internal class NamedPipeSender : SenderBase
{
private readonly Pool<string, ClientItem> _clientPool;
private readonly AsyncPool<string, AsyncClientItem> _asyncClientPool;
private readonly string _pipeName;
private readonly Pool<ClientItem> _clientPool;
private readonly AsyncPool<AsyncClientItem> _asyncClientPool;

public NamedPipeSender()
public NamedPipeSender(string pipeName)
{
_clientPool = new Pool<string, ClientItem>(CreateClient, CheckClient);
_asyncClientPool = new AsyncPool<string, AsyncClientItem>(CreateAsyncClient, CheckAsyncClient);
_pipeName = pipeName;
_clientPool = new Pool<ClientItem>(CreateClient, CheckClient);
_asyncClientPool = new AsyncPool<AsyncClientItem>(CreateAsyncClient, CheckAsyncClient);
}

public override byte[] Invoke(string pipeName, byte[] requestBytes)
public override byte[] Invoke(byte[] requestBytes)
{
using var invokeOwner = _clientPool.Rent(pipeName);
using IRentedValue<ClientItem> invokeOwner = _clientPool.Rent();
byte[] response = invokeOwner.Value.Invoke(requestBytes);
return response;
}

public override async Task<byte[]> InvokeAsync(string pipeName, byte[] requestBytes)
public override async Task<byte[]> InvokeAsync(byte[] requestBytes)
{
using var invokeOwner = await _asyncClientPool.RentAsync(pipeName);
using IRentedValue<AsyncClientItem> invokeOwner = await _asyncClientPool.RentAsync();
byte[] response = await invokeOwner.Value.InvokeAsync(requestBytes, CancellationToken.None);
return response;
}

private static ClientItem CreateClient(string pipeName)
private ClientItem CreateClient()
{
var stream = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut);
var stream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut);
stream.Connect();
return new ClientItem(stream);
}
Expand All @@ -53,9 +55,9 @@ private static bool CheckClient(ClientItem item)
}
}

private static async Task<AsyncClientItem> CreateAsyncClient(string pipeName)
private async Task<AsyncClientItem> CreateAsyncClient()
{
var stream = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut);
var stream = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut);
await stream.ConnectAsync();
return new AsyncClientItem(stream);
}
Expand Down
6 changes: 3 additions & 3 deletions HandyIpc.Socket/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ public static class Extensions

private static readonly char[] IdentifierSplitter = { ':', ' ' };

public static IHubBuilder UseTcp(this IHubBuilder self)
public static IHubBuilder UseTcp(this IHubBuilder self, IPAddress ip, int port)
{
return self
.Use(() => new TcpSender())
.Use(() => new TcpReceiver());
.Use(() => new TcpSender(ip, port))
.Use(() => new TcpReceiver(ip, port));
}

internal static (IPAddress ip, int port) ToIpEndPoint(this string connectionString)
Expand Down
Loading

0 comments on commit a3138b3

Please sign in to comment.