diff --git a/MultiCompressor.php b/MultiCompressor.php deleted file mode 100644 index c13898e..0000000 --- a/MultiCompressor.php +++ /dev/null @@ -1,73 +0,0 @@ -zlibCompressor = ZlibCompressor::getInstance(); - } - - public function getCompressionThreshold(): ?int - { - return $this->zlibCompressor->getCompressionThreshold(); - } - - public function decompress(string $payload): string - { - $stream = new BinaryStream($payload); - - try { - $method = $stream->getByte(); - - try { - $result = match ($method) { - self::METHOD_ZLIB => $this->zlibCompressor->decompress($stream->getRemaining()), - self::METHOD_ZSTD => zstd_uncompress($stream->getRemaining()), - default => throw new DecompressionException("Decompression method not found"), - }; - } catch (ErrorException $exception) { - throw new DecompressionException('Failed to decompress data', 0, $exception); - } - } catch (BinaryDataException $exception) { - throw new DecompressionException("Decompression method is invalid"); - } - - if ($result === false) { - throw new DecompressionException("Failed to decompress data"); - } - - return $result; - } - - /** - * @param string $payload - * @return string - */ - public function compress(string $payload): string - { - return $this->zlibCompressor->compress($payload); - } -} diff --git a/ProxyListener.php b/ProxyListener.php index 69afbec..501e6f9 100644 --- a/ProxyListener.php +++ b/ProxyListener.php @@ -27,41 +27,14 @@ public function onDataPacketReceive(DataPacketReceiveEvent $event): void $origin = $event->getOrigin(); $packet = $event->getPacket(); - switch ($packet->pid()) { - case NetworkStackLatencyPacket::NETWORK_ID: - /** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */ - if ($packet->timestamp === 0 && $packet->needResponse) { - if (($player = $origin->getPlayer()) !== null && $player->isConnected()) { - $origin->sendDataPacket(NetworkStackLatencyPacket::response(0)); - } - $event->cancel(); + /** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */ + if ($packet->pid() == NetworkStackLatencyPacket::NETWORK_ID) { + if ($packet->timestamp === 0 && $packet->needResponse) { + if (($player = $origin->getPlayer()) !== null && $player->isConnected()) { + $origin->sendDataPacket(NetworkStackLatencyPacket::response(0)); } - break; - case RequestNetworkSettingsPacket::NETWORK_ID: - /** @var RequestNetworkSettingsPacket $packet USED TO SIMULATE VANILLA BEHAVIOUR, SINCE IT'S NOT USED BY US */ - $multiProtocol = method_exists($origin, 'setProtocolId'); - $protocolVersion = $packet->getProtocolVersion(); - - if (($multiProtocol && !in_array($protocolVersion, ProtocolInfo::ACCEPTED_PROTOCOL, true)) || !$multiProtocol && $protocolVersion !== ProtocolInfo::CURRENT_PROTOCOL) { - $origin->disconnectIncompatibleProtocol($protocolVersion); - return; - } - - if ($multiProtocol) { - $origin->setProtocolId($packet->getProtocolVersion()); - } - - $origin->sendDataPacket(NetworkSettingsPacket::create( - NetworkSettingsPacket::COMPRESS_EVERYTHING, - CompressionAlgorithm::ZLIB, - false, - 0, - 0 - ), true); - $event->cancel(); - break; + } } - } } diff --git a/ProxyNetworkInterface.php b/ProxyNetworkInterface.php index d89e15c..a8c5b83 100644 --- a/ProxyNetworkInterface.php +++ b/ProxyNetworkInterface.php @@ -6,6 +6,7 @@ namespace libproxy; use Error; +use ErrorException; use Exception; use libproxy\data\LatencyData; use libproxy\data\TickSyncPacket; @@ -17,12 +18,17 @@ use libproxy\protocol\ProxyPacketSerializer; use pmmp\thread\Thread as NativeThread; use pmmp\thread\ThreadSafeArray; +use pocketmine\network\mcpe\compression\DecompressionException; +use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\EntityEventBroadcaster; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\PacketBroadcaster; +use pocketmine\network\mcpe\protocol\PacketDecodeException; use pocketmine\network\mcpe\protocol\PacketPool; +use pocketmine\network\mcpe\protocol\serializer\PacketBatch; use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext; +use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm; use pocketmine\network\mcpe\raklib\PthreadsChannelReader; use pocketmine\network\mcpe\raklib\PthreadsChannelWriter; use pocketmine\network\NetworkInterface; @@ -32,12 +38,16 @@ use pocketmine\Server; use pocketmine\snooze\SleeperHandlerEntry; use pocketmine\thread\ThreadCrashException; +use pocketmine\timings\Timings; use pocketmine\utils\Binary; use pocketmine\utils\BinaryDataException; +use pocketmine\utils\BinaryStream; use Socket; use ThreadedArray; use WeakMap; +use function base64_encode; use function bin2hex; +use function ord; use function socket_close; use function socket_create_pair; use function socket_last_error; @@ -46,6 +56,7 @@ use function strlen; use function substr; use function trim; +use function zstd_uncompress; use const AF_INET; use const AF_UNIX; use const SOCK_STREAM; @@ -214,7 +225,7 @@ private function onPacketReceive(string $buffer): void break; // might be data arriving from the client after the server has closed the connection } - $session->handleEncoded($pk->payload); + $this->handleEncoded($session, $pk->payload); $this->receiveBytes += strlen($pk->payload); break; } @@ -225,6 +236,70 @@ private function onPacketReceive(string $buffer): void } } + /** + * @throws PacketHandlingException + */ + public function handleEncoded(NetworkSession $session, string $payload): void + { + if (!(fn() => $this->connected)->call($session)) { + return; + } + + Timings::$playerNetworkReceive->startTiming(); + try { + (fn() => $this->packetBatchLimiter->decrement())->call($session); + + if (strlen($payload) < 1) { + throw new PacketHandlingException("No bytes in payload"); + } + + Timings::$playerNetworkReceiveDecompress->startTiming(); + $compressionType = ord($payload[0]); + $compressed = substr($payload, 1); + + try { + $decompressed = match ($compressionType) { + CompressionAlgorithm::NONE => $compressed, + CompressionAlgorithm::ZLIB => $session->getCompressor()->decompress($compressed), + CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d, + default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType") + }; + } catch (ErrorException|DecompressionException $e) { + $session->getLogger()->debug("Failed to decompress packet: " . base64_encode($compressed)); + throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); + } finally { + Timings::$playerNetworkReceiveDecompress->stopTiming(); + } + + try { + $stream = new BinaryStream($decompressed); + $count = 0; + foreach (PacketBatch::decodeRaw($stream) as $buffer) { + (fn() => $this->gamePacketLimiter->decrement())->call($session); + if (++$count > 100) { + throw new PacketHandlingException("Too many packets in batch"); + } + $packet = PacketPool::getInstance()->getPacket($buffer); + if ($packet === null) { + $session->getLogger()->debug("Unknown packet: " . base64_encode($buffer)); + throw new PacketHandlingException("Unknown packet received"); + } + try { + $session->handleDataPacket($packet, $buffer); + } catch (PacketHandlingException $e) { + $session->getLogger()->debug($packet->getName() . ": " . base64_encode($buffer)); + throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); + } + } + } catch (PacketDecodeException|BinaryDataException $e) { + $session->getLogger()->logException($e); + throw PacketHandlingException::wrap($e, "Packet batch decode error"); + } + } finally { + Timings::$playerNetworkReceive->stopTiming(); + } + } + public function tick(): void { if (!$this->proxy->isRunning()) { @@ -295,20 +370,13 @@ public function createSession(int $socketId, string $ip, int $port): NetworkSess new ProxyPacketSender($socketId, $this), $this->packetBroadcaster, $this->entityEventBroadcaster, - MultiCompressor::getInstance(), + ZlibCompressor::getInstance(), TypeConverter::getInstance(), $ip, $port ); $this->sessions[$socketId] = $session; - - // Set the LoginPacketHandler, since compression is handled by the proxy - (function (): void { - /** @noinspection PhpUndefinedFieldInspection */ - $this->onSessionStartSuccess(); - })->call($session); - return $session; } diff --git a/ProxyThread.php b/ProxyThread.php index d70100e..986e634 100644 --- a/ProxyThread.php +++ b/ProxyThread.php @@ -15,7 +15,6 @@ use Socket; use function gc_enable; use function ini_set; -use function register_shutdown_function; use function socket_bind; use function socket_create; use function socket_last_error; @@ -134,10 +133,13 @@ private function createServerSocket(): Socket if (!socket_listen($serverSocket, 10)) { throw new RuntimeException("Failed to listen to socket: " . socket_strerror(socket_last_error($serverSocket))); } - if (!socket_set_option($serverSocket, SOL_SOCKET, SO_SNDBUF, 8 * 1024 * 1024) || !socket_set_option($serverSocket, SOL_SOCKET, SO_RCVBUF, 8 * 1024 * 1024) || !socket_set_option($serverSocket, SOL_TCP, TCP_NODELAY, 1)) { + if (!socket_set_option($serverSocket, SOL_TCP, TCP_NODELAY, 1)) { throw new RuntimeException("Failed to set option on socket: " . socket_strerror(socket_last_error($serverSocket))); } + socket_set_option($serverSocket, SOL_SOCKET, SO_SNDBUF, 8 * 1024 * 1024); // best effort + socket_set_option($serverSocket, SOL_SOCKET, SO_RCVBUF, 8 * 1024 * 1024); + return $serverSocket; } }