Skip to content

Commit

Permalink
Implement new packet compression
Browse files Browse the repository at this point in the history
  • Loading branch information
dries-c committed Feb 10, 2024
1 parent 20fa49d commit efd09c3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 81 deletions.
79 changes: 0 additions & 79 deletions MultiCompressor.php

This file was deleted.

79 changes: 77 additions & 2 deletions ProxyNetworkInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace libproxy;

use Error;
use ErrorException;
use Exception;
use libproxy\data\LatencyData;
use libproxy\data\TickSyncPacket;
Expand All @@ -17,12 +18,17 @@
use libproxy\protocol\ProxyPacketSerializer;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\network\mcpe\compression\DecompressionException;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\EntityEventBroadcaster;
use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\PacketBroadcaster;
use pocketmine\network\mcpe\protocol\PacketDecodeException;
use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\protocol\serializer\PacketSerializerContext;
use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm;
use pocketmine\network\mcpe\raklib\PthreadsChannelReader;
use pocketmine\network\mcpe\raklib\PthreadsChannelWriter;
use pocketmine\network\NetworkInterface;
Expand All @@ -32,12 +38,16 @@
use pocketmine\Server;
use pocketmine\snooze\SleeperHandlerEntry;
use pocketmine\thread\ThreadCrashException;
use pocketmine\timings\Timings;
use pocketmine\utils\Binary;
use pocketmine\utils\BinaryDataException;
use pocketmine\utils\BinaryStream;
use Socket;
use ThreadedArray;
use WeakMap;
use function base64_encode;
use function bin2hex;
use function ord;
use function socket_close;
use function socket_create_pair;
use function socket_last_error;
Expand All @@ -46,6 +56,7 @@
use function strlen;
use function substr;
use function trim;
use function zstd_uncompress;
use const AF_INET;
use const AF_UNIX;
use const SOCK_STREAM;
Expand Down Expand Up @@ -214,7 +225,7 @@ private function onPacketReceive(string $buffer): void
break; // might be data arriving from the client after the server has closed the connection
}

$session->handleEncoded($pk->payload);
$this->handleEncoded($session, $pk->payload);
$this->receiveBytes += strlen($pk->payload);
break;
}
Expand All @@ -225,6 +236,70 @@ private function onPacketReceive(string $buffer): void
}
}

/**
* @throws PacketHandlingException
*/
public function handleEncoded(NetworkSession $session, string $payload): void
{
if (!$session->isConnected()) {
return;
}

Timings::$playerNetworkReceive->startTiming();
try {
(fn() => $this->packetBatchLimiter->decrement())->call($session);

if (strlen($payload) < 1) {
throw new PacketHandlingException("No bytes in payload");
}

Timings::$playerNetworkReceiveDecompress->startTiming();
$compressionType = ord($payload[0]);
$compressed = substr($payload, 1);

try {
$decompressed = match ($compressionType) {
CompressionAlgorithm::NONE => $compressed,
CompressionAlgorithm::ZLIB => $session->getCompressor()->decompress($compressed),
CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d,
default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType")
};
} catch (ErrorException|DecompressionException $e) {
$session->getLogger()->debug("Failed to decompress packet: " . base64_encode($compressed));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
} finally {
Timings::$playerNetworkReceiveDecompress->stopTiming();
}

try {
$stream = new BinaryStream($decompressed);
$count = 0;
foreach (PacketBatch::decodeRaw($stream) as $buffer) {
(fn() => $this->gamePacketLimiter->decrement())->call($session);
if (++$count > 100) {
throw new PacketHandlingException("Too many packets in batch");
}
$packet = PacketPool::getInstance()->getPacket($buffer);
if ($packet === null) {
$session->getLogger()->debug("Unknown packet: " . base64_encode($buffer));
throw new PacketHandlingException("Unknown packet received");
}
try {
$session->handleDataPacket($packet, $buffer);
} catch (PacketHandlingException $e) {
$session->getLogger()->debug($packet->getName() . ": " . base64_encode($buffer));
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
}
}
} catch (PacketDecodeException|BinaryDataException $e) {
$session->getLogger()->logException($e);
throw PacketHandlingException::wrap($e, "Packet batch decode error");
}
} finally {
Timings::$playerNetworkReceive->stopTiming();
}
}

public function tick(): void
{
if (!$this->proxy->isRunning()) {
Expand Down Expand Up @@ -295,7 +370,7 @@ public function createSession(int $socketId, string $ip, int $port): NetworkSess
new ProxyPacketSender($socketId, $this),
$this->packetBroadcaster,
$this->entityEventBroadcaster,
MultiCompressor::getInstance(),
ZlibCompressor::getInstance(),
TypeConverter::getInstance(),
$ip,
$port
Expand Down

0 comments on commit efd09c3

Please sign in to comment.