Skip to content

Commit

Permalink
Merge pull request #947 from Denrage/feat/arcdps_bridge_rewrite
Browse files Browse the repository at this point in the history
Arcdps Integration Rewrite
  • Loading branch information
dlamkins authored Mar 31, 2024
2 parents 6703703 + 2d066a1 commit 190d133
Show file tree
Hide file tree
Showing 31 changed files with 2,978 additions and 148 deletions.
41 changes: 8 additions & 33 deletions Blish HUD/GameServices/ArcDps/Common/CommonFields.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace Blish_HUD.ArcDps.Common {

public class CommonFields {
private bool _enabled = false;

/// <summary>
/// Delegate which will be invoked in <see cref="CommonFields.PlayerAdded" /> and
Expand All @@ -15,11 +17,9 @@ public class CommonFields {
/// Contains every player in the current group or squad.
/// Key: Character Name, Value: Account Name
/// </summary>
public IReadOnlyDictionary<string, Player> PlayersInSquad => _playersInSquad;

private readonly ConcurrentDictionary<string, Player> _playersInSquad = new ConcurrentDictionary<string, Player>();

private bool _enabled;
public IReadOnlyDictionary<string, Player> PlayersInSquad => GameService.ArcDpsV2.Common.PlayersInSquad
.Select(x => new KeyValuePair<string, Player>(x.Key, new Player(x.Value.CharacterName, x.Value.AccountName, x.Value.Profession, x.Value.Elite, x.Value.Self)))
.ToDictionary(x=> x.Key, x => x.Value);

/// <summary>
/// Gets invoked whenever someone joins the squad or group.
Expand All @@ -35,37 +35,12 @@ public class CommonFields {
/// Activates the <see cref="CommonFields" /> service.
/// </summary>
public void Activate() {
GameService.ArcDpsV2.Common.PlayerAdded += player => PlayerAdded?.Invoke(new Player(player.CharacterName, player.AccountName, player.Profession, player.Elite, player.Self));
GameService.ArcDpsV2.Common.PlayerRemoved += player => PlayerRemoved?.Invoke(new Player(player.CharacterName, player.AccountName, player.Profession, player.Elite, player.Self));

if (_enabled) return;

_enabled = true;
GameService.ArcDps.RawCombatEvent += CombatHandler;
}

private void CombatHandler(object sender, RawCombatEventArgs args) {
if (args.CombatEvent.Ev != null) return;

/* notify tracking change */
if (args.CombatEvent.Src.Elite != 0) return;

/* add */
if (args.CombatEvent.Src.Profession != 0) {
if (_playersInSquad.ContainsKey(args.CombatEvent.Src.Name)) return;

string accountName = args.CombatEvent.Dst.Name.StartsWith(":")
? args.CombatEvent.Dst.Name.Substring(1)
: args.CombatEvent.Dst.Name;

var player = new Player(
args.CombatEvent.Src.Name, accountName,
args.CombatEvent.Dst.Profession, args.CombatEvent.Dst.Elite, args.CombatEvent.Dst.Self != 0
);

if (_playersInSquad.TryAdd(args.CombatEvent.Src.Name, player)) this.PlayerAdded?.Invoke(player);
}
/* remove */
else {
if (_playersInSquad.TryRemove(args.CombatEvent.Src.Name, out var player)) this.PlayerRemoved?.Invoke(player);
}
}

public struct Player {
Expand Down
7 changes: 7 additions & 0 deletions Blish HUD/GameServices/ArcDps/V2/ArcDpsBridgeVersion.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Blish_HUD.GameServices.ArcDps {
public enum ArcDpsBridgeVersion {
V1 = 0,
V2 = 1,
None = 100,
}
}
224 changes: 224 additions & 0 deletions Blish HUD/GameServices/ArcDps/V2/ArcDpsClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Blish_HUD.GameServices.ArcDps.V2;
using Blish_HUD.GameServices.ArcDps.V2.Processors;

namespace Blish_HUD.GameServices.ArcDps {

internal class ArcDpsClient : IArcDpsClient {
#if DEBUG
public static long Counter;
#endif

private static readonly Logger _logger = Logger.GetLogger<ArcDpsServiceV2>();
private readonly BlockingCollection<byte[]>[] messageQueues;
private readonly Dictionary<int, MessageProcessor> processors = new Dictionary<int, MessageProcessor>();
private readonly ArcDpsBridgeVersion arcDpsBridgeVersion;
private bool isConnected = false;
private NetworkStream networkStream;
private CancellationToken ct;
private bool disposedValue;

public event EventHandler<SocketError> Error;

public bool IsConnected => this.isConnected && this.Client.Connected;

public TcpClient Client { get; }

public event Action Disconnected;

public ArcDpsClient(ArcDpsBridgeVersion arcDpsBridgeVersion) {
this.arcDpsBridgeVersion = arcDpsBridgeVersion;

processors.Add(1, new ImGuiProcessor());

if (this.arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
processors.Add(2, new LegacyCombatProcessor());
processors.Add(3, new LegacyCombatProcessor());
} else {
processors.Add(2, new CombatEventProcessor());
processors.Add(3, new CombatEventProcessor());
}

// hardcoded message queue size. One Collection per message type. This is done just for optimizations
this.messageQueues = new BlockingCollection<byte[]>[4];

this.Client = new TcpClient();
}

public void RegisterMessageTypeListener<T>(int type, Func<T, CancellationToken, Task> listener)
where T : struct {
var processor = (MessageProcessor<T>)this.processors[type];
if (messageQueues[type] == null) {
messageQueues[type] = new BlockingCollection<byte[]>();

try {
Task.Run(() => this.ProcessMessage(processor, messageQueues[type]));
} catch (OperationCanceledException) {
// NOP
}
}

processor.RegisterListener(listener);
}

private void ProcessMessage(MessageProcessor processor, BlockingCollection<byte[]> messageQueue) {
while (!ct.IsCancellationRequested) {
ct.ThrowIfCancellationRequested();
Task.Delay(1).Wait();
foreach (var item in messageQueue.GetConsumingEnumerable()) {
ct.ThrowIfCancellationRequested();
processor.Process(item, ct);
ArrayPool<byte>.Shared.Return(item);
}
}

ct.ThrowIfCancellationRequested();
}

/// <summary>
/// Initializes the client and connects to the arcdps "server"
/// </summary>
/// <param name="endpoint"></param>
/// <param name="ct">CancellationToken to cancel the whole client</param>
public void Initialize(IPEndPoint endpoint, CancellationToken ct) {
this.ct = ct;
this.Client.Connect(endpoint);
_logger.Info("Connected to arcdps endpoint on: " + endpoint.ToString());

this.networkStream = this.Client.GetStream();
this.isConnected = true;

try {
if (this.arcDpsBridgeVersion == ArcDpsBridgeVersion.V1) {
Task.Run(async () => await this.LegacyReceive(ct), ct);
} else {
Task.Run(async () => await this.Receive(ct), ct);
}
} catch (OperationCanceledException) {
// NOP
}
}

public void Disconnect() {
if (isConnected) {
if (this.Client.Connected) {
this.Client.Close();
this.Client.Dispose();
_logger.Info("Disconnected from arcdps endpoint");
}

this.isConnected = false;
this.Disconnected?.Invoke();
}
}

private async Task LegacyReceive(CancellationToken ct) {
_logger.Info($"Start Legacy Receive Task for {this.Client.Client.RemoteEndPoint?.ToString()}");
try {
var messageHeaderBuffer = new byte[9];
ArrayPool<byte> pool = ArrayPool<byte>.Shared;
while (this.Client.Connected) {
ct.ThrowIfCancellationRequested();

if (this.Client.Available == 0) {
await Task.Delay(1, ct);
}

ReadFromStream(this.networkStream, messageHeaderBuffer, 9);

// In V1 the message type is part of the message and therefor included in message length, so we subtract it here
var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]) - 1;
var messageType = messageHeaderBuffer[8];

var messageBuffer = pool.Rent(messageLength);
ReadFromStream(this.networkStream, messageBuffer, messageLength);

this.messageQueues[messageType]?.Add(messageBuffer);
#if DEBUG
Interlocked.Increment(ref Counter);
#endif

}
} catch (Exception ex) {
_logger.Error(ex.ToString());
this.Error?.Invoke(this, SocketError.SocketError);
this.Disconnect();
}

_logger.Info($"Legacy Receive Task for {this.Client.Client?.RemoteEndPoint?.ToString()} stopped");
}

private async Task Receive(CancellationToken ct) {
_logger.Info($"Start Receive Task for {this.Client.Client.RemoteEndPoint?.ToString()}");
try {
var messageHeaderBuffer = new byte[5];
ArrayPool<byte> pool = ArrayPool<byte>.Shared;
while (this.Client.Connected) {
ct.ThrowIfCancellationRequested();

if (this.Client.Available == 0) {
await Task.Delay(1, ct);
}

ReadFromStream(this.networkStream, messageHeaderBuffer, 5);

var messageLength = Unsafe.ReadUnaligned<int>(ref messageHeaderBuffer[0]);
var messageType = messageHeaderBuffer[4];

var messageBuffer = pool.Rent(messageLength);
ReadFromStream(this.networkStream, messageBuffer, messageLength);
this.messageQueues[messageType]?.Add(messageBuffer);
#if DEBUG
Interlocked.Increment(ref Counter);
#endif
}
} catch (Exception ex) {
_logger.Error(ex.ToString());
this.Error?.Invoke(this, SocketError.SocketError);
this.Disconnect();
}

_logger.Info($"Receive Task for {this.Client.Client?.RemoteEndPoint?.ToString()} stopped");
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadFromStream(Stream stream, byte[] buffer, int length) {
int bytesRead = 0;
while (bytesRead != length) {
bytesRead += stream.Read(buffer, bytesRead, length - bytesRead);
}
}

protected virtual void Dispose(bool disposing) {
if (!disposedValue) {
if (disposing) {
Client.Dispose();
foreach (var item in messageQueues) {
if (item.Count != 0) {
foreach (var message in item) {
ArrayPool<byte>.Shared.Return(message);
}
}
}
networkStream.Dispose();
}

disposedValue = true;
}
}

public void Dispose() {
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}
Loading

0 comments on commit 190d133

Please sign in to comment.