From efd09c3f9d44c651a17cbaa121d0075b5d985c20 Mon Sep 17 00:00:00 2001 From: Dries C Date: Sat, 10 Feb 2024 14:44:10 +0100 Subject: [PATCH] Implement new packet compression --- MultiCompressor.php | 79 --------------------------------------- ProxyNetworkInterface.php | 79 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 81 deletions(-) delete mode 100644 MultiCompressor.php diff --git a/MultiCompressor.php b/MultiCompressor.php deleted file mode 100644 index 592fced..0000000 --- a/MultiCompressor.php +++ /dev/null @@ -1,79 +0,0 @@ -zlibCompressor = ZlibCompressor::getInstance(); - } - - public function getCompressionThreshold(): ?int - { - return $this->zlibCompressor->getCompressionThreshold(); - } - - public function getNetworkId(): int - { - return CompressionAlgorithm::ZLIB; - } - - public function decompress(string $payload): string - { - $stream = new BinaryStream($payload); - - try { - $method = $stream->getByte(); - - try { - $result = match ($method) { - self::METHOD_ZLIB => $this->zlibCompressor->decompress($stream->getRemaining()), - self::METHOD_ZSTD => zstd_uncompress($stream->getRemaining()), - default => throw new DecompressionException("Decompression method not found"), - }; - } catch (ErrorException $exception) { - throw new DecompressionException('Failed to decompress data', 0, $exception); - } - } catch (BinaryDataException $exception) { - throw new DecompressionException("Decompression method is invalid"); - } - - if ($result === false) { - throw new DecompressionException("Failed to decompress data"); - } - - return $result; - } - - /** - * @param string $payload - * @return string - */ - public function compress(string $payload): string - { - return $this->zlibCompressor->compress($payload); - } -} diff --git a/ProxyNetworkInterface.php b/ProxyNetworkInterface.php index d89e15c..9096bed 100644 --- a/ProxyNetworkInterface.php +++ b/ProxyNetworkInterface.php @@ -6,6 +6,7 @@ namespace libproxy; use Error; +use ErrorException; use Exception; use libproxy\data\LatencyData; use libproxy\data\TickSyncPacket; @@ -17,12 +18,17 @@ use libproxy\protocol\ProxyPacketSerializer; use pmmp\thread\Thread as NativeThread; use pmmp\thread\ThreadSafeArray; +use pocketmine\network\mcpe\compression\DecompressionException; +use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\EntityEventBroadcaster; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\PacketBroadcaster; +use pocketmine\network\mcpe\protocol\PacketDecodeException; use pocketmine\network\mcpe\protocol\PacketPool; +use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext; +use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm; use pocketmine\network\mcpe\raklib\PthreadsChannelReader; use pocketmine\network\mcpe\raklib\PthreadsChannelWriter; use pocketmine\network\NetworkInterface; @@ -32,12 +38,16 @@ use pocketmine\Server; use pocketmine\snooze\SleeperHandlerEntry; use pocketmine\thread\ThreadCrashException; +use pocketmine\timings\Timings; use pocketmine\utils\Binary; use pocketmine\utils\BinaryDataException; +use pocketmine\utils\BinaryStream; use Socket; use ThreadedArray; use WeakMap; +use function base64_encode; use function bin2hex; +use function ord; use function socket_close; use function socket_create_pair; use function socket_last_error; @@ -46,6 +56,7 @@ use function strlen; use function substr; use function trim; +use function zstd_uncompress; use const AF_INET; use const AF_UNIX; use const SOCK_STREAM; @@ -214,7 +225,7 @@ private function onPacketReceive(string $buffer): void break; // might be data arriving from the client after the server has closed the connection } - $session->handleEncoded($pk->payload); + $this->handleEncoded($session, $pk->payload); $this->receiveBytes += strlen($pk->payload); break; } @@ -225,6 +236,70 @@ private function onPacketReceive(string $buffer): void } } + /** + * @throws PacketHandlingException + */ + public function handleEncoded(NetworkSession $session, string $payload): void + { + if (!$session->isConnected()) { + return; + } + + Timings::$playerNetworkReceive->startTiming(); + try { + (fn() => $this->packetBatchLimiter->decrement())->call($session); + + if (strlen($payload) < 1) { + throw new PacketHandlingException("No bytes in payload"); + } + + Timings::$playerNetworkReceiveDecompress->startTiming(); + $compressionType = ord($payload[0]); + $compressed = substr($payload, 1); + + try { + $decompressed = match ($compressionType) { + CompressionAlgorithm::NONE => $compressed, + CompressionAlgorithm::ZLIB => $session->getCompressor()->decompress($compressed), + CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d, + default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType") + }; + } catch (ErrorException|DecompressionException $e) { + $session->getLogger()->debug("Failed to decompress packet: " . base64_encode($compressed)); + throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); + } finally { + Timings::$playerNetworkReceiveDecompress->stopTiming(); + } + + try { + $stream = new BinaryStream($decompressed); + $count = 0; + foreach (PacketBatch::decodeRaw($stream) as $buffer) { + (fn() => $this->gamePacketLimiter->decrement())->call($session); + if (++$count > 100) { + throw new PacketHandlingException("Too many packets in batch"); + } + $packet = PacketPool::getInstance()->getPacket($buffer); + if ($packet === null) { + $session->getLogger()->debug("Unknown packet: " . base64_encode($buffer)); + throw new PacketHandlingException("Unknown packet received"); + } + try { + $session->handleDataPacket($packet, $buffer); + } catch (PacketHandlingException $e) { + $session->getLogger()->debug($packet->getName() . ": " . base64_encode($buffer)); + throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); + } + } + } catch (PacketDecodeException|BinaryDataException $e) { + $session->getLogger()->logException($e); + throw PacketHandlingException::wrap($e, "Packet batch decode error"); + } + } finally { + Timings::$playerNetworkReceive->stopTiming(); + } + } + public function tick(): void { if (!$this->proxy->isRunning()) { @@ -295,7 +370,7 @@ public function createSession(int $socketId, string $ip, int $port): NetworkSess new ProxyPacketSender($socketId, $this), $this->packetBroadcaster, $this->entityEventBroadcaster, - MultiCompressor::getInstance(), + ZlibCompressor::getInstance(), TypeConverter::getInstance(), $ip, $port