From 994ba2a5dac72a41e0ae0b8e4035e6c7e5986e80 Mon Sep 17 00:00:00 2001 From: Evert Jan Hakvoort <31619091+EJTJ3@users.noreply.github.com> Date: Fri, 21 Oct 2022 05:13:19 -0700 Subject: [PATCH] chore!: bump php version to 8.1 (#2) --- composer.json | 5 +- src/Connection/ClientConnectionOptions.php | 111 +++++------ src/Connection/NatsConnection.php | 63 ++++--- src/Connection/NatsConnectionOption.php | 2 +- src/Connection/ServerCollection.php | 2 +- src/Constant/Nats.php | 10 +- src/Constant/NatsProtocolOperation.php | 38 ++-- src/Encoder/EncoderInterface.php | 10 +- src/Encoder/JsonEncoder.php | 5 +- .../NatsConnectionRefusedException.php | 4 +- .../NatsInvalidOperationException.php | 22 --- src/Logger/NullLogger.php | 2 + src/ServerInfo/ServerInfo.php | 177 +++++++++--------- src/Transport/NatsTransportInterface.php | 5 +- src/Transport/Stream/StreamTransport.php | 67 ++++--- src/Transport/TranssportOption.php | 16 +- tests/Constant/NatsProtocolOperationTest.php | 17 ++ 17 files changed, 260 insertions(+), 296 deletions(-) delete mode 100644 src/Exception/NatsInvalidOperationException.php create mode 100644 tests/Constant/NatsProtocolOperationTest.php diff --git a/composer.json b/composer.json index 0d76cff..3faf1cd 100644 --- a/composer.json +++ b/composer.json @@ -13,7 +13,7 @@ } ], "require": { - "php" : "^7.4 | ^8.0", + "php" : "^8.1", "nyholm/dsn": "^2.0", "psr/log": "^1.1", "ext-json": "*" @@ -21,9 +21,6 @@ "require-dev": { "phpunit/phpunit": "^9.0" }, - "suggest": { - "clue/socket-raw": "Allows using the new socket client" - }, "scripts": { "test": "vendor/bin/phpunit" } diff --git a/src/Connection/ClientConnectionOptions.php b/src/Connection/ClientConnectionOptions.php index e4318b3..b3be98e 100644 --- a/src/Connection/ClientConnectionOptions.php +++ b/src/Connection/ClientConnectionOptions.php @@ -11,66 +11,57 @@ */ final class ClientConnectionOptions { - /** - * Turns on +OK protocol acknowledgements. - */ - private bool $verbose; - - /** - * Turns on additional strict format checking, e.g. for properly formed subjects. - */ - private bool $pedantic; - - /** - * Indicates whether the client requires an SSL connection. - */ - private bool $tlsRequired; - - /** - * Client authorization token (if auth_required is set). - */ - private ?string $authToken; - - /** - * Connection username (if auth_required is set). - */ - private ?string $user; - - /** - * Connection password (if auth_required is set). - */ - private ?string $password; - - /** - * Optional client name. - */ - private ?string $name; - - /** - * Sending 0 (or absent) indicates client supports original protocol. - * Sending 1 indicates that the client supports dynamic reconfiguration of - * cluster topology changes by asynchronously receiving INFO messages with known servers it can reconnect to. - */ - private ?int $protocol; - - /** - * If set to true, the server (version 1.2.0+) will not send originating messages from this - * connection to its own subscriptions. Clients should set this to true only for server supporting - * this feature, which is when proto in the INFO protocol is set to at least 1. - */ - private bool $echo; - - public function __construct() + public function __construct( + /** + * Turns on +OK protocol acknowledgements. + */ + private bool $verbose = true, + + /** + * Turns on additional strict format checking, e.g. for properly formed subjects. + */ + private bool $pedantic = true, + + /** + * Indicates whether the client requires an SSL connection. + */ + private bool $tlsRequired = false, + + /** + * Client authorization token (if auth_required is set). + */ + private ?string $authToken = null, + + /** + * Connection username (if auth_required is set). + */ + private ?string $user = null, + + /** + * Connection password (if auth_required is set). + */ + private ?string $password = null, + + /** + * Optional client name. + */ + private ?string $name = null, + + /** + * Sending 0 (or absent) indicates client supports original protocol. + * Sending 1 indicates that the client supports dynamic reconfiguration of + * cluster topology changes by asynchronously receiving INFO messages with known servers it can reconnect to. + */ + private ?int $protocol = 0, + + /** + * If set to true, the server (version 1.2.0+) will not send originating messages from this + * connection to its own subscriptions. Clients should set this to true only for server supporting + * this feature, which is when proto in the INFO protocol is set to at least 1. + */ + private bool $echo = false, + ) { - $this->verbose = true; - $this->pedantic = true; - $this->tlsRequired = false; - $this->authToken = null; - $this->user = null; - $this->password = null; - $this->name = null; - $this->protocol = 0; - $this->echo = false; } public function isVerbose(): bool @@ -164,7 +155,7 @@ public function setEcho(bool $echo): void } /** - * @return array + * @return array */ public function toArray(): array { diff --git a/src/Connection/NatsConnection.php b/src/Connection/NatsConnection.php index 69d0153..fc9e354 100644 --- a/src/Connection/NatsConnection.php +++ b/src/Connection/NatsConnection.php @@ -9,7 +9,6 @@ use EJTJ3\PhpNats\Encoder\EncoderInterface; use EJTJ3\PhpNats\Encoder\JsonEncoder; use EJTJ3\PhpNats\Exception\NatsConnectionRefusedException; -use EJTJ3\PhpNats\Exception\NatsInvalidOperationException; use EJTJ3\PhpNats\Exception\NatsInvalidResponseException; use EJTJ3\PhpNats\Exception\TransportAlreadyConnectedException; use EJTJ3\PhpNats\Logger\NullLogger; @@ -18,6 +17,7 @@ use EJTJ3\PhpNats\Transport\Stream\StreamTransport; use EJTJ3\PhpNats\Transport\TranssportOption; use EJTJ3\PhpNats\Util\StringUtil; +use Exception; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerInterface; @@ -42,22 +42,10 @@ final class NatsConnection implements LoggerAwareInterface public function __construct( NatsConnectionOptionInterface $connectionOptions, - NatsTransportInterface $transport = null, - EncoderInterface $encoder = null, - LoggerInterface $logger = null + NatsTransportInterface $transport = new StreamTransport(), + EncoderInterface $encoder = new JsonEncoder(), + LoggerInterface $logger = new NullLogger() ) { - if ($encoder === null) { - $encoder = new JsonEncoder(); - } - - if ($logger === null) { - $logger = new NullLogger(); - } - - if ($transport === null) { - $transport = new StreamTransport(); - } - $this->transport = $transport; $this->encoder = $encoder; $this->connectionOptions = $connectionOptions; @@ -67,6 +55,9 @@ public function __construct( $this->connected = false; } + /** + * @throws Exception + */ public function connect(): void { if ($this->transport->isConnected()) { @@ -82,7 +73,7 @@ public function connect(): void try { $this->transport->connect($transportOption); - } catch (\Exception $e) { + } catch (Exception $e) { $this->logger->error($e->getMessage()); continue; @@ -125,6 +116,9 @@ public function close(): void $this->transport->close(); } + /** + * @throws Exception + */ public function publish(string $subject, string $payload): void { if ($this->isConnected() === false) { @@ -133,26 +127,32 @@ public function publish(string $subject, string $payload): void $message = sprintf('%s %s', $subject, strlen($payload)); - $this->doWrite(NatsProtocolOperation::PUB, $message . Nats::CR_LF . $payload); + $this->doWrite(NatsProtocolOperation::Pub, $message . Nats::CR_LF . $payload); } private function isErrorResponse(string $response): bool { - return substr($response, 0, 4) === NatsProtocolOperation::ERR; + return NatsProtocolOperation::Err->isOperation(substr($response, 0, 4)); } + /** + * @throws Exception + */ public function ping(): void { - $this->doWrite(NatsProtocolOperation::PING); + $this->doWrite(NatsProtocolOperation::Ping, "ping"); } + /** + * @throws Exception + */ public function validatePing(): void { $this->ping(); $pingResponse = $this->getResponse(); - if ($pingResponse !== NatsProtocolOperation::PONG) { + if (!NatsProtocolOperation::Pong->isOperation($pingResponse)) { throw new NatsInvalidResponseException('Did not receive a pong from the server'); } } @@ -162,6 +162,9 @@ public function getServerInfo(): ?ServerInfo return $this->serverInfo; } + /** + * @throws Exception + */ private function doConnect(): void { if ($this->currentServer === null) { @@ -179,12 +182,12 @@ private function doConnect(): void $connectionOptions->setPassword($server->getPassword()); } - $this->doWrite(NatsProtocolOperation::CONNECT, $connectionOptions->toArray()); + $this->doWrite(NatsProtocolOperation::Connect, $connectionOptions->toArray()); if ($connectionOptions->isVerbose() === true) { $connectResponse = $this->getResponse(); - if ($connectResponse !== NatsProtocolOperation::ACK) { + if (NatsProtocolOperation::Ack->isOperation($connectResponse) === false) { throw new NatsInvalidResponseException('Nats did not send a normal response'); } } @@ -196,13 +199,13 @@ private function createServerInfo(): ServerInfo [$operation, $data] = explode(' ', $rawData); - if ($operation !== NatsProtocolOperation::INFO) { + if (!NatsProtocolOperation::Info->isOperation($operation)) { throw new NatsInvalidResponseException('Server information is not correct'); } $data = $this->encoder->decode($data); - $serverInfo = new ServerInfo($data); + $serverInfo = ServerInfo::fromData($data); $this->serverInfo = $serverInfo; @@ -230,18 +233,16 @@ private function getResponse(): string /** * @param array|string|null $payload + * + * @throws Exception */ - private function doWrite(string $operation, $payload = null): void + private function doWrite(NatsProtocolOperation $operation, array|string $payload): void { - if (!in_array($operation, NatsProtocolOperation::AVAILABLE_OPERATIONS, true)) { - throw NatsInvalidOperationException::withOperation($operation); - } - if (!is_string($payload)) { $payload = $this->encoder->encode($payload); } - $payload = sprintf('%s %s%s', $operation, $payload, Nats::CR_LF); + $payload = sprintf('%s %s%s', $operation->value, $payload, Nats::CR_LF); $this->transport->write($payload); } diff --git a/src/Connection/NatsConnectionOption.php b/src/Connection/NatsConnectionOption.php index 27cd395..d73a794 100644 --- a/src/Connection/NatsConnectionOption.php +++ b/src/Connection/NatsConnectionOption.php @@ -28,7 +28,7 @@ final class NatsConnectionOption implements NatsConnectionOptionInterface * @param array|Server|string|ServerCollection $servers */ public function __construct( - $servers = [], + array|Server|string|ServerCollection $servers = [], ?string $name = null, int $timeout = 5, bool $randomize = false diff --git a/src/Connection/ServerCollection.php b/src/Connection/ServerCollection.php index 88f04b6..1182157 100644 --- a/src/Connection/ServerCollection.php +++ b/src/Connection/ServerCollection.php @@ -14,7 +14,7 @@ final class ServerCollection private array $servers; /** - * @param array $servers + * @param Server[] $servers */ public function __construct(array $servers, bool $randomize = false) { diff --git a/src/Constant/Nats.php b/src/Constant/Nats.php index b232024..90c949d 100644 --- a/src/Constant/Nats.php +++ b/src/Constant/Nats.php @@ -7,13 +7,13 @@ /** * @author Evert Jan Hakvoort */ -final class Nats +abstract class Nats { - public const DEFAULT_PORT = 4222; + final public const DEFAULT_PORT = 4222; - public const CR_LF = "\r\n"; + final public const CR_LF = "\r\n"; - public const LANG = 'php'; + final public const LANG = 'php'; - public const VERSION = '0.0.1'; + final public const VERSION = '0.0.1'; } diff --git a/src/Constant/NatsProtocolOperation.php b/src/Constant/NatsProtocolOperation.php index 4a07f29..6a47ef5 100644 --- a/src/Constant/NatsProtocolOperation.php +++ b/src/Constant/NatsProtocolOperation.php @@ -9,7 +9,7 @@ * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol */ -final class NatsProtocolOperation +enum NatsProtocolOperation: string { /** * Sent by server @@ -17,14 +17,14 @@ final class NatsProtocolOperation * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#info */ - public const INFO = 'INFO'; + case Info = 'INFO'; /** * Sent to client after initial TCP/IP connection. * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#connect */ - public const CONNECT = 'CONNECT'; + case Connect = 'CONNECT'; /** * Sent by client @@ -32,7 +32,7 @@ final class NatsProtocolOperation * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#pub */ - public const PUB = 'PUB'; + case Pub = 'PUB'; /** * Sent by client @@ -40,7 +40,7 @@ final class NatsProtocolOperation * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#sub */ - public const SUB = 'SUB'; + case Sub = 'SUB'; /** * Sent by client @@ -48,7 +48,7 @@ final class NatsProtocolOperation * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#unsub */ - public const UNSUB = 'UNSUB'; + case Unsub = 'UNSUB'; /** * Sent by server @@ -56,21 +56,21 @@ final class NatsProtocolOperation * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#msg */ - public const MSG = 'MSG'; + case Msg = 'MSG'; /** * PING keep-alive message. * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#pingpong */ - public const PING = 'PING'; + case Ping = 'PING'; /** * PONG keep-alive response. * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#pingpong */ - public const PONG = 'PONG'; + case Pong = 'PONG'; /** * Sent by Server @@ -78,7 +78,7 @@ final class NatsProtocolOperation * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#okerr */ - public const ACK = '+OK'; + case Ack = '+OK'; /** * Sent by Server @@ -86,18 +86,10 @@ final class NatsProtocolOperation * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#okerr */ - public const ERR = '-ERR'; + case Err = '-ERR'; - public const AVAILABLE_OPERATIONS = [ - self::INFO, - self::CONNECT, - self::PUB, - self::SUB, - self::UNSUB, - self::MSG, - self::PING, - self::PONG, - self::ACK, - self::ERR, - ]; + public function isOperation(string $value): bool + { + return $this->value === $value; + } } diff --git a/src/Encoder/EncoderInterface.php b/src/Encoder/EncoderInterface.php index 1c66860..6dc4980 100644 --- a/src/Encoder/EncoderInterface.php +++ b/src/Encoder/EncoderInterface.php @@ -9,13 +9,7 @@ */ interface EncoderInterface { - /** - * @param mixed $payload - */ - public function encode($payload): string; + public function encode(object|string|array $payload): string; - /** - * @return mixed - */ - public function decode(string $payload); + public function decode(string $payload): object|array; } diff --git a/src/Encoder/JsonEncoder.php b/src/Encoder/JsonEncoder.php index 8ede95a..5646bed 100644 --- a/src/Encoder/JsonEncoder.php +++ b/src/Encoder/JsonEncoder.php @@ -6,10 +6,7 @@ final class JsonEncoder implements EncoderInterface { - /** - * @param mixed $payload - */ - public function encode($payload): string + public function encode(object|string|array $payload): string { return json_encode($payload); } diff --git a/src/Exception/NatsConnectionRefusedException.php b/src/Exception/NatsConnectionRefusedException.php index 9d58a8d..9f58ab9 100644 --- a/src/Exception/NatsConnectionRefusedException.php +++ b/src/Exception/NatsConnectionRefusedException.php @@ -4,6 +4,8 @@ namespace EJTJ3\PhpNats\Exception; -final class NatsConnectionRefusedException extends \RuntimeException implements NatsExceptionInterface +use RuntimeException; + +final class NatsConnectionRefusedException extends RuntimeException implements NatsExceptionInterface { } diff --git a/src/Exception/NatsInvalidOperationException.php b/src/Exception/NatsInvalidOperationException.php deleted file mode 100644 index e51419d..0000000 --- a/src/Exception/NatsInvalidOperationException.php +++ /dev/null @@ -1,22 +0,0 @@ - $data + * @param array $response */ - public function __construct(array $data) + public static function fromData(array $response): self { - $this->serverId = $data['server_id']; - $this->version = $data['version']; - $this->go = $data['go'] ?? null; - $this->host = $data['host']; - $this->port = $data['port']; - $this->proto = $data['proto']; - $this->maxPayload = $data['max_payload']; - $this->authRequired = $data['auth_required'] ?? false; - $this->tlsRequired = $data['tls_required'] ?? false; - $this->tlsVerify = $data['tls_verify'] ?? false; - $this->connectUrls = $data['connect_urls'] ?? []; - $this->clientId = $data['client_id'] ?? null; - $this->lameDuckMode = $data['ldm'] ?? false; + return new self( + serverId: $response['server_id'], + version: $response['version'], + go: $response['go'] ?? null, + host: $response['host'], + port: $response['port'], + proto: $response['proto'], + maxPayload: $response['max_payload'], + tlsRequired: $response['tls_required'] ?? false, + tlsVerify: $response['tls_verify'] ?? false, + authRequired: $response['auth_required'] ?? false, + connectUrls: $response['connect_urls'] ?? [], + clientId: $response['client_id'] ?? null, + lameDuckMode: $response['ldm'] ?? false + ); } /** diff --git a/src/Transport/NatsTransportInterface.php b/src/Transport/NatsTransportInterface.php index 5d88a1d..10624b3 100644 --- a/src/Transport/NatsTransportInterface.php +++ b/src/Transport/NatsTransportInterface.php @@ -18,8 +18,5 @@ public function isConnected(): bool; public function write(string $payload): void; - /** - * @return string|false - */ - public function receive(int $length = 0); + public function receive(int $length = 0): bool|string; } diff --git a/src/Transport/Stream/StreamTransport.php b/src/Transport/Stream/StreamTransport.php index c410b71..f0d8d48 100644 --- a/src/Transport/Stream/StreamTransport.php +++ b/src/Transport/Stream/StreamTransport.php @@ -16,11 +16,15 @@ final class StreamTransport implements NatsTransportInterface */ private $stream; - /** - * @var int<0, max>, - */ - private int $chunkSize; - + public function __construct( + /** + * @var int<0, max> $chunkSize + */ + private readonly int $chunkSize = 1500, + ) { + $this->stream = null; + } + /** * Close will close the connection to the server. */ @@ -30,15 +34,6 @@ public function close(): void $this->stream = null; } - /** - * @param int<0, max> $chunkSize - */ - public function __construct(int $chunkSize = 1500) - { - $this->stream = null; - $this->chunkSize = $chunkSize; - } - public function isClosed(): bool { return $this->stream === null; @@ -60,7 +55,7 @@ public function connect(TransportOptionsInterface $option): void $errorCode = null; $errorMessage = null; - set_error_handler(static fn () => true); + set_error_handler(static fn() => true); $stream = stream_socket_client( $address, @@ -94,8 +89,8 @@ public function enableTls(): void if (!stream_socket_enable_crypto( $this->getStream(), true, - STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT) - ) { + STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT + )) { throw new NatsConnectionRefusedException('Failed to connect: Error negotiating crypto'); } @@ -109,7 +104,11 @@ public function write(string $payload): void { $length = strlen($payload); - while (true) { + if ($length === 0) { + return; + } + + do { $written = @fwrite($this->getStream(), $payload); if ($written === false) { @@ -124,32 +123,30 @@ public function write(string $payload): void if ($length > 0) { $payload = substr($payload, (0 - $length)); - } else { - break; } - } + } while ($length > 0); } public function receive(int $length = 0): string { - if ($length > 0) { - $chunkSize = $this->chunkSize; - $line = null; - $receivedBytes = 0; + if ($length <= 0) { + return fgets($this->getStream()); + } - while ($receivedBytes < $length) { - $bytesLeft = ($length - $receivedBytes); + $chunkSize = $this->chunkSize; + $line = ""; + $receivedBytes = 0; - if ($bytesLeft < $this->chunkSize) { - $chunkSize = $bytesLeft; - } + while ($receivedBytes < $length) { + $bytesLeft = ($length - $receivedBytes); - $readChunk = fread($this->getStream(), $chunkSize); - $receivedBytes += strlen($readChunk); - $line .= $readChunk; + if ($bytesLeft < $this->chunkSize) { + $chunkSize = $bytesLeft; } - } else { - $line = fgets($this->getStream()); + + $readChunk = fread($this->getStream(), $chunkSize); + $receivedBytes += strlen($readChunk); + $line .= $readChunk; } return $line; diff --git a/src/Transport/TranssportOption.php b/src/Transport/TranssportOption.php index 69755ac..03c2d6e 100644 --- a/src/Transport/TranssportOption.php +++ b/src/Transport/TranssportOption.php @@ -6,17 +6,11 @@ final class TranssportOption implements TransportOptionsInterface { - private int $timeout; - - private string $host; - - private int $port; - - public function __construct(string $host, int $port, int $timeout) - { - $this->timeout = $timeout; - $this->host = $host; - $this->port = $port; + public function __construct( + private readonly string $host, + private readonly int $port, + private readonly int $timeout + ) { } public function getTimeout(): int diff --git a/tests/Constant/NatsProtocolOperationTest.php b/tests/Constant/NatsProtocolOperationTest.php new file mode 100644 index 0000000..ae0fbce --- /dev/null +++ b/tests/Constant/NatsProtocolOperationTest.php @@ -0,0 +1,17 @@ +isOperation('PUB')); + self::assertFalse(NatsProtocolOperation::Pub->isOperation('PUS')); + } +}