diff --git a/ProxyListener.php b/ProxyListener.php deleted file mode 100644 index 501e6f9..0000000 --- a/ProxyListener.php +++ /dev/null @@ -1,40 +0,0 @@ -getOrigin(); - $packet = $event->getPacket(); - - /** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */ - if ($packet->pid() == NetworkStackLatencyPacket::NETWORK_ID) { - if ($packet->timestamp === 0 && $packet->needResponse) { - if (($player = $origin->getPlayer()) !== null && $player->isConnected()) { - $origin->sendDataPacket(NetworkStackLatencyPacket::response(0)); - } - $event->cancel(); - } - } - } -} diff --git a/ProxyNetworkInterface.php b/ProxyNetworkInterface.php index 1503048..ca926a0 100644 --- a/ProxyNetworkInterface.php +++ b/ProxyNetworkInterface.php @@ -6,7 +6,6 @@ namespace libproxy; use Error; -use ErrorException; use Exception; use libproxy\data\LatencyData; use libproxy\data\TickSyncPacket; @@ -18,16 +17,12 @@ use libproxy\protocol\ProxyPacketSerializer; use pmmp\thread\Thread as NativeThread; use pmmp\thread\ThreadSafeArray; -use pocketmine\network\mcpe\compression\DecompressionException; use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\EntityEventBroadcaster; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\PacketBroadcaster; -use pocketmine\network\mcpe\protocol\PacketDecodeException; use pocketmine\network\mcpe\protocol\PacketPool; -use pocketmine\network\mcpe\protocol\serializer\PacketBatch; -use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm; use pocketmine\network\mcpe\raklib\PthreadsChannelReader; use pocketmine\network\mcpe\raklib\PthreadsChannelWriter; use pocketmine\network\NetworkInterface; @@ -37,16 +32,13 @@ use pocketmine\Server; use pocketmine\snooze\SleeperHandlerEntry; use pocketmine\thread\ThreadCrashException; -use pocketmine\timings\Timings; use pocketmine\utils\Binary; use pocketmine\utils\BinaryDataException; -use pocketmine\utils\BinaryStream; use Socket; use ThreadedArray; use WeakMap; use function base64_encode; use function bin2hex; -use function ord; use function socket_close; use function socket_create_pair; use function socket_last_error; @@ -55,7 +47,6 @@ use function strlen; use function substr; use function trim; -use function zstd_uncompress; use const AF_INET; use const AF_UNIX; use const SOCK_STREAM; @@ -148,8 +139,6 @@ public function __construct(PluginBase $plugin, int $port, ?string $composerPath $this->sendBytes = 0; $this->receiveBytes = 0; }), 20, 20); - - $server->getPluginManager()->registerEvents(new ProxyListener(), $plugin); } public static function handleRawLatency(NetworkSession $session, int $upstream, int $downstream): void @@ -217,82 +206,28 @@ private function onPacketReceive(string $buffer): void break; case ForwardPacket::NETWORK_ID: /** @var ForwardPacket $pk */ - if (($session = $this->getSession($socketId)) === null) { + if (($session = $this->getSession($socketId)) === null || !(fn() => $this->connected)->call($session)) { break; // might be data arriving from the client after the server has closed the connection } - $this->handleEncoded($session, $pk->payload); - $this->receiveBytes += strlen($pk->payload); - break; - } - } catch (PacketHandlingException|BinaryDataException $exception) { - $this->close($socketId, 'Error handling a Packet (Server)'); - - $this->server->getLogger()->logException($exception); - } - } - - /** - * @throws PacketHandlingException - */ - public function handleEncoded(NetworkSession $session, string $payload): void - { - if (!(fn() => $this->connected)->call($session)) { - return; - } - - Timings::$playerNetworkReceive->startTiming(); - try { - (fn() => $this->packetBatchLimiter->decrement())->call($session); - - if (strlen($payload) < 1) { - throw new PacketHandlingException("No bytes in payload"); - } - - Timings::$playerNetworkReceiveDecompress->startTiming(); - $compressionType = ord($payload[0]); - $compressed = substr($payload, 1); - - try { - $decompressed = match ($compressionType) { - CompressionAlgorithm::NONE => $compressed, - CompressionAlgorithm::ZLIB => $session->getCompressor()->decompress($compressed), - CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d, - default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType") - }; - } catch (ErrorException|DecompressionException $e) { - $session->getLogger()->debug("Failed to decompress packet: " . base64_encode($compressed)); - throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); - } finally { - Timings::$playerNetworkReceiveDecompress->stopTiming(); - } - - try { - $stream = new BinaryStream($decompressed); - $count = 0; - foreach (PacketBatch::decodeRaw($stream) as $buffer) { - (fn() => $this->gamePacketLimiter->decrement())->call($session); - if (++$count > 100) { - throw new PacketHandlingException("Too many packets in batch"); - } - $packet = PacketPool::getInstance()->getPacket($buffer); + $packet = PacketPool::getInstance()->getPacket($pk->payload); if ($packet === null) { - $session->getLogger()->debug("Unknown packet: " . base64_encode($buffer)); + $session->getLogger()->debug("Unknown packet: " . base64_encode($pk->payload)); throw new PacketHandlingException("Unknown packet received"); } try { - $session->handleDataPacket($packet, $buffer); + $session->handleDataPacket($packet, $pk->payload); } catch (PacketHandlingException $e) { - $session->getLogger()->debug($packet->getName() . ": " . base64_encode($buffer)); + $session->getLogger()->debug($packet->getName() . ": " . base64_encode($pk->payload)); throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); } - } - } catch (PacketDecodeException|BinaryDataException $e) { - $session->getLogger()->logException($e); - throw PacketHandlingException::wrap($e, "Packet batch decode error"); + $this->receiveBytes += strlen($pk->payload); + break; } - } finally { - Timings::$playerNetworkReceive->stopTiming(); + } catch (PacketHandlingException|BinaryDataException $exception) { + $this->close($socketId, 'Error handling a Packet (Server)'); + + $this->server->getLogger()->logException($exception); } } diff --git a/ProxyServer.php b/ProxyServer.php index 4a65dc7..17e8cf0 100644 --- a/ProxyServer.php +++ b/ProxyServer.php @@ -12,19 +12,36 @@ use libproxy\protocol\ProxyPacketPool; use libproxy\protocol\ProxyPacketSerializer; use pmmp\thread\ThreadSafeArray; +use pocketmine\network\mcpe\compression\DecompressionException; +use pocketmine\network\mcpe\compression\ZlibCompressor; +use pocketmine\network\mcpe\NetworkSession; +use pocketmine\network\mcpe\PacketRateLimiter; +use pocketmine\network\mcpe\protocol\NetworkStackLatencyPacket; +use pocketmine\network\mcpe\protocol\Packet as BedrockPacket; +use pocketmine\network\mcpe\protocol\PacketDecodeException; +use pocketmine\network\mcpe\protocol\PacketPool; +use pocketmine\network\mcpe\protocol\ProtocolInfo; +use pocketmine\network\mcpe\protocol\RequestNetworkSettingsPacket; +use pocketmine\network\mcpe\protocol\serializer\PacketBatch; +use pocketmine\network\mcpe\protocol\serializer\PacketSerializer; +use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm; use pocketmine\network\mcpe\raklib\PthreadsChannelReader; use pocketmine\network\mcpe\raklib\SnoozeAwarePthreadsChannelWriter; use pocketmine\network\PacketHandlingException; use pocketmine\snooze\SleeperHandlerEntry; -use pocketmine\snooze\SleeperNotifier; use pocketmine\thread\log\AttachableThreadSafeLogger; use pocketmine\utils\Binary; use pocketmine\utils\BinaryDataException; +use pocketmine\utils\BinaryStream; use pocketmine\utils\Utils; use raklib\generic\SocketException; use Socket; +use function base64_encode; +use function bin2hex; +use function chr; use function count; use function min; +use function ord; use function socket_accept; use function socket_close; use function socket_getpeername; @@ -37,7 +54,9 @@ use function socket_strerror; use function socket_write; use function strlen; +use function substr; use function trim; +use function zstd_uncompress; use const MSG_DONTWAIT; use const MSG_WAITALL; use const SO_LINGER; @@ -70,6 +89,14 @@ class ProxyServer /** @var Socket[] */ private array $sockets = []; + + /** @phpstan-var array */ + private array $gamePacketLimiter = []; + /** @phpstan-var array */ + private array $batchPacketLimiter = []; + /** @phpstan-var array */ + private array $protocolId = []; + /** @phpstan-var array> */ private array $socketBuffer = []; @@ -109,7 +136,13 @@ private function closeSocket(int $socketId, string $reason, bool $fromMain = fal $this->logger->debug('Socket is not connected anymore.'); } socket_close($socket); - unset($this->sockets[$socketId], $this->socketBuffer[$socketId]); + unset( + $this->sockets[$socketId], + $this->socketBuffer[$socketId], + $this->gamePacketLimiter[$socketId], + $this->batchPacketLimiter[$socketId], + $this->protocolId[$socketId] + ); } $this->logger->debug("Disconnected socket with id " . $socketId); @@ -127,6 +160,16 @@ public function getSocket(int $socketId): ?Socket return $this->sockets[$socketId] ?? null; } + private function getGamePacketLimiter(int $streamIdentifier): PacketRateLimiter + { + return $this->gamePacketLimiter[$streamIdentifier] ??= new PacketRateLimiter("Game Packets", 2, 100); + } + + private function getBatchPacketLimiter(int $streamIdentifier): PacketRateLimiter + { + return $this->batchPacketLimiter[$streamIdentifier] ??= new PacketRateLimiter("Batch Packets", 2, 100); + } + private function putPacket(int $socketId, ProxyPacket $pk): void { $serializer = new ProxyPacketSerializer(); @@ -190,17 +233,7 @@ private function pushSockets(): void break; case ForwardPacket::NETWORK_ID: /** @var ForwardPacket $pk */ - if (($socket = $this->getSocket($socketId)) === null) { - throw new PacketHandlingException('Socket with id (' . $socketId . ") doesn't exist."); - } - - try { - if (socket_write($socket, Binary::writeInt(strlen($pk->payload)) . $pk->payload) === false) { - throw new PacketHandlingException('client disconnect'); - } - } catch (ErrorException $exception) { - throw PacketHandlingException::wrap($exception, 'client disconnect'); - } + $this->sendPayload($socketId, $pk->payload); break; } } catch (PacketHandlingException $exception) { @@ -209,6 +242,156 @@ private function pushSockets(): void } } + /** + * Sends a payload to the client + */ + private function sendPayload(int $socketId, string $payload): void + { + if (($socket = $this->getSocket($socketId)) === null) { + throw new PacketHandlingException('Socket with id (' . $socketId . ") doesn't exist."); + } + + try { + if (socket_write($socket, Binary::writeInt(strlen($payload)) . $payload) === false) { + throw new PacketHandlingException('client disconnect'); + } + } catch (ErrorException $exception) { + throw PacketHandlingException::wrap($exception, 'client disconnect'); + } + } + + /** + * Sends a data packet to the main thread. + */ + private function sendDataPacketToMain(int $socketId, string $payload): void + { + $pk = new ForwardPacket(); + $pk->payload = $payload; + + $this->putPacket($socketId, $pk); + } + + /** + * Returns the protocol ID for the given socket identifier. + */ + private function getProtocolId(int $socketId): int + { + return $this->protocolId[$socketId] ?? ProtocolInfo::CURRENT_PROTOCOL; + } + + /** + * Sends a data packet to the client using a single packet in a batch. + */ + private function sendDataPacket(int $socketId, BedrockPacket $packet): void + { + $packetSerializer = PacketSerializer::encoder($protocolId = $this->getProtocolId($socketId)); + $packet->encode($packetSerializer); + + $stream = new BinaryStream(); + PacketBatch::encodeRaw($stream, [$packetSerializer->getBuffer()]); + $payload = ($protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr(CompressionAlgorithm::NONE) : '') . $stream->getBuffer(); + + $this->sendPayload($socketId, $payload); + } + + private function decodePacket(int $socketId, BedrockPacket $packet, string $buffer): void + { + $stream = PacketSerializer::decoder($this->protocolId[$socketId] ?? ProtocolInfo::CURRENT_PROTOCOL, $buffer, 0); + try { + $packet->decode($stream); + } catch (PacketDecodeException $e) { + throw PacketHandlingException::wrap($e); + } + if (!$stream->feof()) { + $remains = substr($stream->getBuffer(), $stream->getOffset()); + $this->logger->debug("Still " . strlen($remains) . " bytes unread in " . $packet->getName() . ": " . bin2hex($remains)); + } + } + + /** + * Returns true if the packet was handled successfully, false if it should be sent to the main thread. + * + * @return bool whether the packet was handled successfully + */ + private function handleDataPacket(int $socketId, BedrockPacket $packet, string $buffer): bool + { + if ($packet->pid() == NetworkStackLatencyPacket::NETWORK_ID) { + /** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */ + $this->decodePacket($socketId, $packet, $buffer); + + if ($packet->timestamp === 0 && $packet->needResponse) { + $this->sendDataPacket($socketId, NetworkStackLatencyPacket::response(0)); + return true; + } + } else if ($packet->pid() === RequestNetworkSettingsPacket::NETWORK_ID) { + /** @var RequestNetworkSettingsPacket $packet USED TO GET PROTOCOLID */ + $this->decodePacket($socketId, $packet, $buffer); + + $this->protocolId[$socketId] = $packet->getProtocolVersion(); + } + + return false; + } + + /** + * @see NetworkSession::handleEncoded($payload) + */ + private function onFullDataReceive(int $socketId, string $payload): void + { + try { + $this->getBatchPacketLimiter($socketId)->decrement(); + + if (strlen($payload) < 1) { + throw new PacketHandlingException("No bytes in payload"); + } + + $compressionType = ord($payload[0]); + $compressed = substr($payload, 1); + + try { + $decompressed = match ($compressionType) { + CompressionAlgorithm::NONE => $compressed, + CompressionAlgorithm::ZLIB => ZlibCompressor::getInstance()->decompress($compressed), + CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d, + default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType") + }; + } catch (ErrorException|DecompressionException $e) { + $this->logger->debug("Failed to decompress packet: " . base64_encode($compressed)); + throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); + } + + try { + $stream = new BinaryStream($decompressed); + $count = 0; + foreach (PacketBatch::decodeRaw($stream) as $buffer) { + $this->getGamePacketLimiter($socketId)->decrement(); + if (++$count > 100) { + throw new PacketHandlingException("Too many packets in batch"); + } + $packet = PacketPool::getInstance()->getPacket($buffer); + if ($packet === null) { + $this->logger->debug("Unknown packet: " . base64_encode($buffer)); + throw new PacketHandlingException("Unknown packet received"); + } + try { + if (!$this->handleDataPacket($socketId, $packet, $buffer)) { + $this->sendDataPacketToMain($socketId, $buffer); + } + } catch (PacketHandlingException $e) { + $this->logger->debug($packet->getName() . ": " . base64_encode($buffer)); + throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); + } + } + } catch (PacketDecodeException|BinaryDataException $e) { + $this->logger->logException($e); + throw PacketHandlingException::wrap($e, "Packet batch decode error"); + } + } catch (PacketHandlingException $e) { + $this->logger->logException($e); + $this->closeSocket($socketId, $e->getMessage()); + } + } + private function onServerSocketReceive(): void { $socket = socket_accept($this->serverSocket); @@ -273,11 +456,7 @@ private function onSocketReceive(int $socketId): void // A null frame data indicates that there is not enough bytes to read. if ($rawFrameData !== null) { unset($this->socketBuffer[$socketId]); - - $pk = new ForwardPacket(); - $pk->payload = $rawFrameData; - - $this->putPacket($socketId, $pk); + $this->onFullDataReceive($socketId, $rawFrameData); } }