Skip to content

Commit

Permalink
Handle ping pong on thread
Browse files Browse the repository at this point in the history
  • Loading branch information
dries-c committed Mar 5, 2024
1 parent 0af6716 commit 537f2ce
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 127 deletions.
40 changes: 0 additions & 40 deletions ProxyListener.php

This file was deleted.

87 changes: 11 additions & 76 deletions ProxyNetworkInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
namespace libproxy;

use Error;
use ErrorException;
use Exception;
use libproxy\data\LatencyData;
use libproxy\data\TickSyncPacket;
Expand All @@ -18,16 +17,12 @@
use libproxy\protocol\ProxyPacketSerializer;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\network\mcpe\compression\DecompressionException;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\EntityEventBroadcaster;
use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\PacketBroadcaster;
use pocketmine\network\mcpe\protocol\PacketDecodeException;
use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm;
use pocketmine\network\mcpe\raklib\PthreadsChannelReader;
use pocketmine\network\mcpe\raklib\PthreadsChannelWriter;
use pocketmine\network\NetworkInterface;
Expand All @@ -37,16 +32,13 @@
use pocketmine\Server;
use pocketmine\snooze\SleeperHandlerEntry;
use pocketmine\thread\ThreadCrashException;
use pocketmine\timings\Timings;
use pocketmine\utils\Binary;
use pocketmine\utils\BinaryDataException;
use pocketmine\utils\BinaryStream;
use Socket;
use ThreadedArray;
use WeakMap;
use function base64_encode;
use function bin2hex;
use function ord;
use function socket_close;
use function socket_create_pair;
use function socket_last_error;
Expand All @@ -55,7 +47,6 @@
use function strlen;
use function substr;
use function trim;
use function zstd_uncompress;
use const AF_INET;
use const AF_UNIX;
use const SOCK_STREAM;
Expand Down Expand Up @@ -148,8 +139,6 @@ public function __construct(PluginBase $plugin, int $port, ?string $composerPath
$this->sendBytes = 0;
$this->receiveBytes = 0;
}), 20, 20);

$server->getPluginManager()->registerEvents(new ProxyListener(), $plugin);
}

public static function handleRawLatency(NetworkSession $session, int $upstream, int $downstream): void
Expand Down Expand Up @@ -217,82 +206,28 @@ private function onPacketReceive(string $buffer): void
break;
case ForwardPacket::NETWORK_ID:
/** @var ForwardPacket $pk */
if (($session = $this->getSession($socketId)) === null) {
if (($session = $this->getSession($socketId)) === null || !(fn() => $this->connected)->call($session)) {
break; // might be data arriving from the client after the server has closed the connection
}

$this->handleEncoded($session, $pk->payload);
$this->receiveBytes += strlen($pk->payload);
break;
}
} catch (PacketHandlingException|BinaryDataException $exception) {
$this->close($socketId, 'Error handling a Packet (Server)');

$this->server->getLogger()->logException($exception);
}
}

/**
* @throws PacketHandlingException
*/
public function handleEncoded(NetworkSession $session, string $payload): void
{
if (!(fn() => $this->connected)->call($session)) {
return;
}

Timings::$playerNetworkReceive->startTiming();
try {
(fn() => $this->packetBatchLimiter->decrement())->call($session);

if (strlen($payload) < 1) {
throw new PacketHandlingException("No bytes in payload");
}

Timings::$playerNetworkReceiveDecompress->startTiming();
$compressionType = ord($payload[0]);
$compressed = substr($payload, 1);

try {
$decompressed = match ($compressionType) {
CompressionAlgorithm::NONE => $compressed,
CompressionAlgorithm::ZLIB => $session->getCompressor()->decompress($compressed),
CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d,
default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType")
};
} catch (ErrorException|DecompressionException $e) {
$session->getLogger()->debug("Failed to decompress packet: " . base64_encode($compressed));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
} finally {
Timings::$playerNetworkReceiveDecompress->stopTiming();
}

try {
$stream = new BinaryStream($decompressed);
$count = 0;
foreach (PacketBatch::decodeRaw($stream) as $buffer) {
(fn() => $this->gamePacketLimiter->decrement())->call($session);
if (++$count > 100) {
throw new PacketHandlingException("Too many packets in batch");
}
$packet = PacketPool::getInstance()->getPacket($buffer);
$packet = PacketPool::getInstance()->getPacket($pk->payload);
if ($packet === null) {
$session->getLogger()->debug("Unknown packet: " . base64_encode($buffer));
$session->getLogger()->debug("Unknown packet: " . base64_encode($pk->payload));
throw new PacketHandlingException("Unknown packet received");
}
try {
$session->handleDataPacket($packet, $buffer);
$session->handleDataPacket($packet, $pk->payload);
} catch (PacketHandlingException $e) {
$session->getLogger()->debug($packet->getName() . ": " . base64_encode($buffer));
$session->getLogger()->debug($packet->getName() . ": " . base64_encode($pk->payload));
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
}
}
} catch (PacketDecodeException|BinaryDataException $e) {
$session->getLogger()->logException($e);
throw PacketHandlingException::wrap($e, "Packet batch decode error");
$this->receiveBytes += strlen($pk->payload);
break;
}
} finally {
Timings::$playerNetworkReceive->stopTiming();
} catch (PacketHandlingException|BinaryDataException $exception) {
$this->close($socketId, 'Error handling a Packet (Server)');

$this->server->getLogger()->logException($exception);
}
}

Expand Down
Loading

0 comments on commit 537f2ce

Please sign in to comment.