Skip to content

Commit

Permalink
Add abilities for acks
Browse files Browse the repository at this point in the history
  • Loading branch information
dries-c committed Apr 7, 2024
1 parent 85b5ee7 commit 589afb5
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 41 deletions.
15 changes: 10 additions & 5 deletions ProxyNetworkInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Exception;
use libproxy\data\LatencyData;
use libproxy\data\TickSyncPacket;
use libproxy\protocol\AckPacket;
use libproxy\protocol\DisconnectPacket;
use libproxy\protocol\ForwardPacket;
use libproxy\protocol\LoginPacket;
Expand Down Expand Up @@ -223,6 +224,14 @@ private function onPacketReceive(string $buffer): void
}
$this->receiveBytes += strlen($pk->payload);
break;
case AckPacket::NETWORK_ID:
/** @var AckPacket $pk */
if (($session = $this->getSession($socketId)) === null || !(fn() => $this->connected)->call($session)) {
break; // might be data arriving from the client after the server has closed the connection
}

$session->handleAckReceipt($pk->receiptId);
break;
}
} catch (PacketHandlingException|BinaryDataException $exception) {
$this->close($socketId, 'Error handling a Packet (Server)');
Expand Down Expand Up @@ -274,7 +283,7 @@ public function getSession(int $socketId): ?NetworkSession
return $this->sessions[$socketId] ?? null;
}

public function putPacket(int $socketId, ProxyPacket $pk, int $receiptId = null): void
public function putPacket(int $socketId, ProxyPacket $pk): void
{
$serializer = new ProxyPacketSerializer();
$serializer->putLInt($socketId);
Expand All @@ -289,10 +298,6 @@ public function putPacket(int $socketId, ProxyPacket $pk, int $receiptId = null)
} catch (Error $exception) {
$this->server->getLogger()->debug('Packet was send while the client was already shut down');
}

if ($receiptId !== null) { // TODO: check if QUIC supports acks on specific data ;l
$this->getSession($socketId)?->handleAckReceipt($receiptId);
}
}

public function createSession(int $socketId, string $ip, int $port): NetworkSession
Expand Down
10 changes: 8 additions & 2 deletions ProxyPacketSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@


use libproxy\protocol\ForwardPacket;
use libproxy\protocol\ForwardReceiptPacket;
use pocketmine\network\mcpe\PacketSender;

class ProxyPacketSender implements PacketSender
Expand All @@ -29,10 +30,15 @@ public function __construct(int $socketId, ProxyNetworkInterface $handler)
public function send(string $payload, bool $immediate, ?int $receiptId): void
{
if (!$this->closed) {
$pk = new ForwardPacket();
if ($receiptId === null) {
$pk = new ForwardPacket();
} else {
$pk = new ForwardReceiptPacket();
$pk->receiptId = $receiptId;
}
$pk->payload = $payload;

$this->handler->putPacket($this->socketId, $pk, $receiptId);
$this->handler->putPacket($this->socketId, $pk);
}
}

Expand Down
87 changes: 54 additions & 33 deletions ProxyServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
namespace libproxy;

use ErrorException;
use libproxy\protocol\AckPacket;
use libproxy\protocol\DisconnectPacket;
use libproxy\protocol\ForwardPacket;
use libproxy\protocol\ForwardReceiptPacket;
use libproxy\protocol\LoginPacket;
use libproxy\protocol\ProxyPacket;
use libproxy\protocol\ProxyPacketPool;
Expand Down Expand Up @@ -257,10 +259,29 @@ private function pushSockets(): void
/** @var ForwardPacket $pk */
$this->sendPayload($streamIdentifier, $pk->payload);
break;
case ForwardReceiptPacket::NETWORK_ID:
/** @var ForwardReceiptPacket $pk */
$this->sendPayloadWithReceipt($streamIdentifier, $pk->payload, $pk->receiptId);
break;
}
}
}

private function sendPayloadWithReceipt(int $streamIdentifier, string $payload, int $receiptId): void
{
if (($writer = $this->getStreamWriter($streamIdentifier)) === null) {
$this->shutdownStream($streamIdentifier, 'stream not found', false);
return;
}

$writer->writeWithPromise(Binary::writeInt(strlen($payload)) . $payload)->onResult(function() use ($streamIdentifier, $receiptId): void{
$pk = new AckPacket();
$pk->receiptId = $receiptId;

$this->sendToMainBuffer($streamIdentifier, $pk);
});
}

/**
* Sends a payload to the client
*/
Expand All @@ -277,40 +298,40 @@ private function sendPayload(int $streamIdentifier, string $payload): void
/**
* Sends a data packet to the main thread.
*/
private function sendDataPacketToMain(int $socketIdentifier, string $payload): void
private function sendDataPacketToMain(int $streamIdentifier, string $payload): void
{
$pk = new ForwardPacket();
$pk->payload = $payload;

$this->sendToMainBuffer($socketIdentifier, $pk);
$this->sendToMainBuffer($streamIdentifier, $pk);
}

/**
* Returns the protocol ID for the given socket identifier.
*/
private function getProtocolId(int $socketIdentifier): int
private function getProtocolId(int $streamIdentifier): int
{
return $this->protocolId[$socketIdentifier] ?? ProtocolInfo::CURRENT_PROTOCOL;
return $this->protocolId[$streamIdentifier] ?? ProtocolInfo::CURRENT_PROTOCOL;
}

/**
* Sends a data packet to the client using a single packet in a batch.
*/
private function sendDataPacket(int $socketIdentifier, BedrockPacket $packet): void
private function sendDataPacket(int $streamIdentifier, BedrockPacket $packet): void
{
$packetSerializer = PacketSerializer::encoder($protocolId = $this->getProtocolId($socketIdentifier));
$packetSerializer = PacketSerializer::encoder($protocolId = $this->getProtocolId($streamIdentifier));
$packet->encode($packetSerializer);

$stream = new BinaryStream();
PacketBatch::encodeRaw($stream, [$packetSerializer->getBuffer()]);
$payload = ($protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr(CompressionAlgorithm::ZLIB) : '') . ZlibCompressor::getInstance()->compress($stream->getBuffer());

$this->sendPayload($socketIdentifier, $payload);
$this->sendPayload($streamIdentifier, $payload);
}

private function decodePacket(int $socketIdentifier, BedrockPacket $packet, string $buffer): void
private function decodePacket(int $streamIdentifier, BedrockPacket $packet, string $buffer): void
{
$stream = PacketSerializer::decoder($this->protocolId[$socketIdentifier] ?? ProtocolInfo::CURRENT_PROTOCOL, $buffer, 0);
$stream = PacketSerializer::decoder($this->protocolId[$streamIdentifier] ?? ProtocolInfo::CURRENT_PROTOCOL, $buffer, 0);
try {
$packet->decode($stream);
} catch (PacketDecodeException $e) {
Expand All @@ -327,41 +348,41 @@ private function decodePacket(int $socketIdentifier, BedrockPacket $packet, stri
*
* @return bool whether the packet was handled successfully
*/
private function handleDataPacket(int $socketIdentifier, BedrockPacket $packet, string $buffer): bool
private function handleDataPacket(int $streamIdentifier, BedrockPacket $packet, string $buffer): bool
{
if ($packet->pid() == NetworkStackLatencyPacket::NETWORK_ID) {
/** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */
$this->decodePacket($socketIdentifier, $packet, $buffer);
$this->decodePacket($streamIdentifier, $packet, $buffer);

if ($packet->timestamp === 0 && $packet->needResponse) {
try {
$this->sendDataPacket($socketIdentifier, NetworkStackLatencyPacket::response(0));
$this->sendDataPacket($streamIdentifier, NetworkStackLatencyPacket::response(0));
} catch (PacketHandlingException $e) {
// ignore, client probably disconnected
}
return true;
}
} else if ($packet->pid() === RequestNetworkSettingsPacket::NETWORK_ID) {
/** @var RequestNetworkSettingsPacket $packet USED TO GET PROTOCOLID */
$this->decodePacket($socketIdentifier, $packet, $buffer);
$this->decodePacket($streamIdentifier, $packet, $buffer);

$this->protocolId[$socketIdentifier] = $packet->getProtocolVersion();
$this->protocolId[$streamIdentifier] = $packet->getProtocolVersion();
}

return false;
}

/**
* @param int $socketIdentifier
* @param int $streamIdentifier
* @param string $payload
* @return void
* @see NetworkSession::handleEncoded($payload)
*
*/
private function onFullDataReceive(int $socketIdentifier, string $payload): void
private function onFullDataReceive(int $streamIdentifier, string $payload): void
{
try {
$this->getBatchPacketLimiter($socketIdentifier)->decrement();
$this->getBatchPacketLimiter($streamIdentifier)->decrement();

if (strlen($payload) < 1) {
throw new PacketHandlingException("No bytes in payload");
Expand All @@ -386,7 +407,7 @@ private function onFullDataReceive(int $socketIdentifier, string $payload): void
$stream = new BinaryStream($decompressed);
$count = 0;
foreach (PacketBatch::decodeRaw($stream) as $buffer) {
$this->getGamePacketLimiter($socketIdentifier)->decrement();
$this->getGamePacketLimiter($streamIdentifier)->decrement();
if (++$count > 100) {
throw new PacketHandlingException("Too many packets in batch");
}
Expand All @@ -396,8 +417,8 @@ private function onFullDataReceive(int $socketIdentifier, string $payload): void
throw new PacketHandlingException("Unknown packet received");
}
try {
if (!$this->handleDataPacket($socketIdentifier, $packet, $buffer)) {
$this->sendDataPacketToMain($socketIdentifier, $buffer);
if (!$this->handleDataPacket($streamIdentifier, $packet, $buffer)) {
$this->sendDataPacketToMain($streamIdentifier, $buffer);
}
} catch (PacketHandlingException $e) {
$this->logger->debug($packet->getName() . ": " . base64_encode($buffer));
Expand All @@ -410,22 +431,22 @@ private function onFullDataReceive(int $socketIdentifier, string $payload): void
}
} catch (PacketHandlingException $e) {
$this->logger->logException($e);
$this->shutdownStream($socketIdentifier, "invalid packet", false);
$this->shutdownStream($streamIdentifier, "invalid packet", false);
}
}

private function onDataReceive(int $socketIdentifier, string $data): void
private function onDataReceive(int $streamIdentifier, string $data): void
{
if (isset($this->socketBuffer[$socketIdentifier])) {
$this->socketBuffer[$socketIdentifier] .= $data;
if (isset($this->socketBuffer[$streamIdentifier])) {
$this->socketBuffer[$streamIdentifier] .= $data;
} else {
$this->socketBuffer[$socketIdentifier] = $data;
$this->socketBuffer[$streamIdentifier] = $data;
}

while (true) {
$buffer = $this->socketBuffer[$socketIdentifier];
$buffer = $this->socketBuffer[$streamIdentifier];
$length = strlen($buffer);
$lengthNeeded = $this->socketBufferLengthNeeded[$socketIdentifier] ?? null;
$lengthNeeded = $this->socketBufferLengthNeeded[$streamIdentifier] ?? null;

if ($lengthNeeded === null) {
if ($length < 4) { // first 4 bytes are the length of the packet
Expand All @@ -434,18 +455,18 @@ private function onDataReceive(int $socketIdentifier, string $data): void
try {
$packetLength = Binary::readInt(substr($buffer, 0, 4));
} catch (BinaryDataException $exception) {
$this->shutdownStream($socketIdentifier, 'invalid packet', false);
$this->shutdownStream($streamIdentifier, 'invalid packet', false);
return;
}

$this->socketBufferLengthNeeded[$socketIdentifier] = $packetLength;
$this->socketBuffer[$socketIdentifier] = substr($buffer, 4);
$this->socketBufferLengthNeeded[$streamIdentifier] = $packetLength;
$this->socketBuffer[$streamIdentifier] = substr($buffer, 4);
}
} else if ($length >= $lengthNeeded) {
$this->onFullDataReceive($socketIdentifier, substr($buffer, 0, $lengthNeeded));
$this->onFullDataReceive($streamIdentifier, substr($buffer, 0, $lengthNeeded));

$this->socketBuffer[$socketIdentifier] = substr($buffer, $lengthNeeded);
unset($this->socketBufferLengthNeeded[$socketIdentifier]);
$this->socketBuffer[$streamIdentifier] = substr($buffer, $lengthNeeded);
unset($this->socketBufferLengthNeeded[$streamIdentifier]);
} else {
return; // wait for more data
}
Expand Down
24 changes: 24 additions & 0 deletions protocol/AckPacket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);


namespace libproxy\protocol;


class AckPacket extends ProxyPacket
{
public const NETWORK_ID = ProxyProtocolInfo::ACK_PACKET;

public int $receiptId;

public function encodePayload(ProxyPacketSerializer $out): void
{
$out->putUnsignedVarInt($this->receiptId);
}

public function decodePayload(ProxyPacketSerializer $in): void
{
$this->receiptId = $in->getUnsignedVarInt();
}
}
28 changes: 28 additions & 0 deletions protocol/ForwardReceiptPacket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);


namespace libproxy\protocol;


class ForwardReceiptPacket extends ForwardPacket
{
public const NETWORK_ID = ProxyProtocolInfo::FORWARD_RECEIPT_PACKET;

public int $receiptId;

public function encodePayload(ProxyPacketSerializer $out): void
{
$out->putUnsignedVarInt($this->receiptId);

parent::encodePayload($out);
}

public function decodePayload(ProxyPacketSerializer $in): void
{
$this->receiptId = $in->getUnsignedVarInt();

parent::decodePayload($in);
}
}
4 changes: 3 additions & 1 deletion protocol/ProxyPacketPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ class ProxyPacketPool

public function __construct()
{
$this->pool = new SplFixedArray(256);
$this->pool = new SplFixedArray(5);

$this->registerPacket(new LoginPacket());
$this->registerPacket(new DisconnectPacket());
$this->registerPacket(new ForwardPacket());
$this->registerPacket(new ForwardReceiptPacket());
$this->registerPacket(new AckPacket());
}

public function registerPacket(ProxyPacket $packet): void
Expand Down
2 changes: 2 additions & 0 deletions protocol/ProxyProtocolInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ final class ProxyProtocolInfo
public const LOGIN_PACKET = 0x01;
public const DISCONNECT_PACKET = 0x02;
public const FORWARD_PACKET = 0x03;
public const FORWARD_RECEIPT_PACKET = 0x04;
public const ACK_PACKET = 0x05;
}

0 comments on commit 589afb5

Please sign in to comment.