diff --git a/.idea/PHPWebSockets.iml b/.idea/PHPWebSockets.iml index c8ff473..e71b720 100644 --- a/.idea/PHPWebSockets.iml +++ b/.idea/PHPWebSockets.iml @@ -3,6 +3,7 @@ + diff --git a/VERSION b/VERSION index 005119b..437459c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.4.1 +2.5.0 diff --git a/src/PHPWebSockets.php b/src/PHPWebSockets.php index 14b347f..19c960e 100755 --- a/src/PHPWebSockets.php +++ b/src/PHPWebSockets.php @@ -303,7 +303,15 @@ public static function IsValidCloseCode(int $code, bool $received = TRUE) : bool * @return bool */ public static function IsPriorityOpcode(int $opcode) : bool { - return self::IsControlOpcode($opcode); + + /* + * Note: + * We do not consider the opcode for CLOSE to be a priority since this would cause us to send the close frame before finishing our queue + * In those cases it can be possible for the remote site to read both a close and another frame at the same time and process them in that order + * That in return causes it to effectively receive a message from a closed connection + */ + + return $opcode !== self::OPCODE_CLOSE_CONNECTION && self::IsControlOpcode($opcode); } /** diff --git a/src/PHPWebSockets/AConnection.php b/src/PHPWebSockets/AConnection.php index be951d1..5e64d8e 100644 --- a/src/PHPWebSockets/AConnection.php +++ b/src/PHPWebSockets/AConnection.php @@ -52,6 +52,13 @@ abstract class AConnection implements IStreamContainer, LoggerAwareInterface { */ protected $_newMessageStreamCallback = NULL; + /** + * The timestamp at which we will close the connection if the remote doesn't reply with an disconnect, this will only be set if we initiated the disconnect + * + * @var float|null + */ + protected $_cleanDisconnectTimeout = NULL; + /** * If we've initiated the disconnect * @@ -297,12 +304,14 @@ protected function _handlePacket(string $newData) : \Generator { $this->_readBuffer .= $newData; } + $bufferLength = strlen($this->_readBuffer); + $orgBuffer = $this->_readBuffer; - $numBytes = strlen($this->_readBuffer); + $numBytes = $bufferLength; $framePos = 0; $pongs = []; - $this->_log(LogLevel::DEBUG, 'Handling packet, current buffer size: ' . strlen($this->_readBuffer)); + $this->_log(LogLevel::DEBUG, 'Handling packet, current buffer size: ' . $bufferLength); while ($framePos < $numBytes) { @@ -311,6 +320,12 @@ protected function _handlePacket(string $newData) : \Generator { break; } + if (!$this->isOpen()) { + $this->_log(LogLevel::WARNING, 'Got frame after close, dropping'); + + return; + } + if (!$this->_checkRSVBits($headers)) { $this->sendDisconnect(\PHPWebSockets::CLOSECODE_PROTOCOL_ERROR, 'Invalid RSV value'); @@ -405,6 +420,8 @@ protected function _handlePacket(string $newData) : \Generator { $this->sendDisconnect(\PHPWebSockets::CLOSECODE_UNSUPPORTED_PAYLOAD); $this->setCloseAfterWrite(); + yield new Update\Error(Update\Error::C_READ_NO_STREAM_FOR_NEW_MESSAGE, $this); + return; } @@ -613,6 +630,14 @@ public function handleWrite() : \Generator { */ public function beforeStreamSelect() : \Generator { + if ($this->_cleanDisconnectTimeout !== NULL && microtime(TRUE) >= $this->_cleanDisconnectTimeout) { + + $this->close(); + + yield new Update\Error(Update\Error::C_DISCONNECT_TIMEOUT, $this); + + } + if ($this->_shouldReportClose) { $this->_log(LogLevel::DEBUG, 'Reporting close'); @@ -739,13 +764,17 @@ public function waitUntilDisconnect(float $timeout = NULL) : bool { * * @param int $code * @param string $reason + * @param float $timeout * * @throws \Exception */ - public function sendDisconnect(int $code, string $reason = '') { + public function sendDisconnect(int $code, string $reason = '', float $timeout = 10.0) { if (!$this->_remoteSentDisconnect) { + $this->_weInitiateDisconnect = TRUE; + $this->_cleanDisconnectTimeout = microtime(TRUE) + $timeout; + } $this->_weSentDisconnect = TRUE; diff --git a/src/PHPWebSockets/Server.php b/src/PHPWebSockets/Server.php index 51aba36..8b939a1 100644 --- a/src/PHPWebSockets/Server.php +++ b/src/PHPWebSockets/Server.php @@ -274,13 +274,13 @@ public function acceptNewConnection() : \Generator { throw new \LogicException('This server has no accepting connection, unable to accept a new connection!'); } - $peername = ''; - $newStream = stream_socket_accept($this->_acceptingConnection->getStream(), $this->getSocketAcceptTimeout(), $peername); + $peerName = ''; + $newStream = stream_socket_accept($this->_acceptingConnection->getStream(), $this->getSocketAcceptTimeout(), $peerName); if (!$newStream) { throw new \RuntimeException('Unable to accept stream socket!'); } - $newConnection = new $this->_connectionClass($this, $newStream, $peername, $this->_connectionIndex); + $newConnection = new $this->_connectionClass($this, $newStream, $peerName, $this->_connectionIndex); $this->_connections[$this->_connectionIndex] = $newConnection; $this->_log(LogLevel::DEBUG, 'Got new connection: ' . $newConnection); diff --git a/src/PHPWebSockets/Server/Connection.php b/src/PHPWebSockets/Server/Connection.php index 1694588..e623381 100644 --- a/src/PHPWebSockets/Server/Connection.php +++ b/src/PHPWebSockets/Server/Connection.php @@ -188,7 +188,7 @@ public function handleRead() : \Generator { $responseCode = 0; if ($this->_doHandshake($rawHandshake, $responseCode)) { - yield new Update\Read(Update\Read::C_NEWCONNECTION, $this); + yield new Update\Read(Update\Read::C_NEW_CONNECTION, $this); } else { $this->writeRaw($this->_server->getErrorPageForCode($responseCode), FALSE); @@ -220,7 +220,7 @@ public function beforeStreamSelect() : \Generator { if (!$this->isAccepted() && $this->hasHandshake() && $this->getOpenedTimestamp() + $this->getAcceptTimeout() < time()) { yield new Update\Error(Update\Error::C_ACCEPT_TIMEOUT_PASSED, $this); - $this->deny(408); // Request Timeout + $this->deny(504); // Gateway Timeout } diff --git a/src/PHPWebSockets/Update/Error.php b/src/PHPWebSockets/Update/Error.php index d219f07..d43ad89 100644 --- a/src/PHPWebSockets/Update/Error.php +++ b/src/PHPWebSockets/Update/Error.php @@ -49,7 +49,9 @@ class Error extends AUpdate { C_WRITE = 12, C_ACCEPT_TIMEOUT_PASSED = 13, C_WRITE_INVALID_TARGET_STREAM = 14, - C_READ_DISCONNECT_DURING_HANDSHAKE = 15; + C_READ_DISCONNECT_DURING_HANDSHAKE = 15, + C_DISCONNECT_TIMEOUT = 16, + C_READ_NO_STREAM_FOR_NEW_MESSAGE = 17; /** * @deprecated Constant has the wrong name, use C_WRITE_INVALID_TARGET_STREAM instead @@ -81,6 +83,8 @@ public static function StringForCode(int $code) : string { self::C_WRITE => 'Write failure', self::C_ACCEPT_TIMEOUT_PASSED => 'Accept timeout passed', self::C_READ_DISCONNECT_DURING_HANDSHAKE => 'Disconnect during handshake', + self::C_DISCONNECT_TIMEOUT => 'The remote failed to respond in time to our disconnect', + self::C_READ_NO_STREAM_FOR_NEW_MESSAGE => 'No stream was returned by the newMessageStreamCallback', ]; return $codes[$code] ?? 'Unknown error code ' . $code; diff --git a/src/PHPWebSockets/Update/Read.php b/src/PHPWebSockets/Update/Read.php index 24ba187..eea77c9 100644 --- a/src/PHPWebSockets/Update/Read.php +++ b/src/PHPWebSockets/Update/Read.php @@ -36,7 +36,7 @@ class Read extends AUpdate { const C_UNKNOWN = 0, - C_NEWCONNECTION = 1, + C_NEW_CONNECTION = 1, C_READ = 2, C_PING = 3, C_PONG = 4, @@ -57,6 +57,11 @@ class Read extends AUpdate { */ const C_NEW_TCP_CONNECTION = self::C_NEW_SOCKET_CONNECTED; + /** + * @deprecated Use C_NEW_CONNECTION instead + */ + const C_NEWCONNECTION = self::C_NEW_CONNECTION; + /** * The message from the client * @@ -103,7 +108,7 @@ public static function StringForCode(int $code) : string { $codes = [ self::C_UNKNOWN => 'Unknown error', - self::C_NEWCONNECTION => 'New connection', + self::C_NEW_CONNECTION => 'New connection', self::C_READ => 'Read', self::C_PING => 'Ping', self::C_PONG => 'Pong', diff --git a/src/PHPWebSockets/UpdatesWrapper.php b/src/PHPWebSockets/UpdatesWrapper.php index 2212438..9f8b210 100644 --- a/src/PHPWebSockets/UpdatesWrapper.php +++ b/src/PHPWebSockets/UpdatesWrapper.php @@ -171,7 +171,7 @@ public function update(float $timeout = NULL, array $tempStreams = []) { $code = $update->getCode(); switch ($code) { - case Update\Read::C_NEWCONNECTION: + case Update\Read::C_NEW_CONNECTION: $this->_onNewConnection($update); break; case Update\Read::C_READ: diff --git a/tests/ServerTest.php b/tests/ServerTest.php index 9399b68..8910337 100644 --- a/tests/ServerTest.php +++ b/tests/ServerTest.php @@ -115,7 +115,7 @@ public function testServer() { $sourceObj = $update->getSourceObject(); $opcode = $update->getCode(); switch ($opcode) { - case Read::C_NEWCONNECTION: + case Read::C_NEW_CONNECTION: $sourceObj->accept(); diff --git a/tests/UpdatesWrapperTest.php b/tests/UpdatesWrapperTest.php index 4317c95..f701e6b 100644 --- a/tests/UpdatesWrapperTest.php +++ b/tests/UpdatesWrapperTest.php @@ -72,6 +72,10 @@ protected function setUp() { $this->assertContains($connection, $this->_connectionList); + if ($connection instanceof \PHPWebSockets\Server\Connection) { + $this->assertTrue($connection->getServer()->hasConnection($connection), 'Server doesn\'t have a reference to its connection!'); + } + unset($this->_connectionList[$connection->getResourceIndex()]); }); @@ -304,6 +308,66 @@ public function testWrapperClientRefuse() { $this->assertEmpty($this->_wsServer->getConnections(FALSE)); $this->assertEmpty($this->_connectionList); + $this->_refuseNextConnection = FALSE; + + \PHPWebSockets::Log(LogLevel::INFO, 'Test finished' . PHP_EOL); + + } + + public function testDoubleClose() { + + \PHPWebSockets::Log(LogLevel::INFO, 'Starting test..' . PHP_EOL); + + $this->assertEmpty($this->_wsServer->getConnections(FALSE)); + + $closeAt = microtime(TRUE) + 3.0; + $runUntil = $closeAt + 4.0; + + $didSendDisconnect = FALSE; + $didClose = FALSE; + + $descriptorSpec = [['pipe', 'r'], STDOUT, STDERR]; + $clientProcess = proc_open('./tests/Helpers/client.php --address=' . escapeshellarg(self::ADDRESS) . ' --message=' . escapeshellarg('Hello world') . ' --message-count=5', $descriptorSpec, $pipes, realpath(__DIR__ . '/../')); + + while (microtime(TRUE) <= $runUntil) { + + $this->_updatesWrapper->update(0.5, $c = $this->_wsServer->getConnections(TRUE)); + + if (!$didSendDisconnect) { + $this->assertTrue(proc_get_status($clientProcess)['running'] ?? FALSE); + } + + if ($didSendDisconnect && !$didClose) { + + /** @var \PHPWebSockets\AConnection $connection */ + $connection = reset($connections); + $connection->close(); + + $didClose = TRUE; + + } + + if (microtime(TRUE) >= $closeAt && !$didSendDisconnect) { + + $connections = $this->_wsServer->getConnections(FALSE); + + $this->assertNotEmpty($connections); + + \PHPWebSockets::Log(LogLevel::INFO, 'Sending disconnect + close'); + + /** @var \PHPWebSockets\AConnection $connection */ + $connection = reset($connections); + $connection->sendDisconnect(\PHPWebSockets::CLOSECODE_NORMAL); + + $didSendDisconnect = TRUE; + + } + + } + + $this->assertEmpty($this->_wsServer->getConnections(FALSE)); + $this->assertEmpty($this->_connectionList); + \PHPWebSockets::Log(LogLevel::INFO, 'Test finished' . PHP_EOL); }