From 9566c1d58a986016935a926c9f8bcd1f917902cc Mon Sep 17 00:00:00 2001 From: Evert Jan Hakvoort Date: Thu, 27 Jul 2023 11:19:44 +0200 Subject: [PATCH] chore: work in progress --- composer.json | 3 +- src/Connection/Acknowledgement.php | 9 + src/Connection/ClientConnectionOptions.php | 45 ++- src/Connection/Error.php | 9 + src/Connection/HMsg.php | 76 +++++ src/Connection/MessageInterface.php | 10 + src/Connection/Msg.php | 55 ++++ src/Connection/NatsConnection.php | 287 ++++++++++++++---- src/Connection/NatsConnectionOption.php | 2 +- src/Connection/NatsResponseInterface.php | 9 + src/Connection/Ping.php | 9 + src/Connection/Pong.php | 9 + src/Connection/Response.php | 52 ++++ src/Connection/Server.php | 3 +- src/{ServerInfo => Connection}/ServerInfo.php | 62 ++-- src/Connection/Subscription.php | 14 + src/Constant/Nats.php | 11 +- src/Constant/NatsProtocolOperation.php | 8 +- src/Exception/NatsReadException.php | 11 + src/Exception/NatsTimeoutException.php | 11 + src/Transport/NatsTransportInterface.php | 4 +- src/Transport/Stream/StreamTransport.php | 35 +-- src/Util/StringUtil.php | 19 +- .../ClientConnectionOptionsTest.php | 7 +- tests/Connection/NatsConnectionOptionTest.php | 3 +- tests/Connection/ServerCollectionTest.php | 3 +- tests/Util/StringUtilTest.php | 24 ++ 27 files changed, 643 insertions(+), 147 deletions(-) create mode 100644 src/Connection/Acknowledgement.php create mode 100644 src/Connection/Error.php create mode 100644 src/Connection/HMsg.php create mode 100644 src/Connection/MessageInterface.php create mode 100644 src/Connection/Msg.php create mode 100644 src/Connection/NatsResponseInterface.php create mode 100644 src/Connection/Ping.php create mode 100644 src/Connection/Pong.php create mode 100644 src/Connection/Response.php rename src/{ServerInfo => Connection}/ServerInfo.php (70%) create mode 100644 src/Connection/Subscription.php create mode 100644 src/Exception/NatsReadException.php create mode 100644 src/Exception/NatsTimeoutException.php diff --git a/composer.json b/composer.json index 3faf1cd..027ded2 100644 --- a/composer.json +++ b/composer.json @@ -8,8 +8,7 @@ }, "authors": [ { - "name": "Evert Jan Hakvoort", - "email": "evertjan@hakvoort.io" + "name": "EJTJ3" } ], "require": { diff --git a/src/Connection/Acknowledgement.php b/src/Connection/Acknowledgement.php new file mode 100644 index 0000000..d817a1e --- /dev/null +++ b/src/Connection/Acknowledgement.php @@ -0,0 +1,9 @@ +echo = $echo; } + public function isNoResponders(): bool + { + return $this->noResponders; + } + + public function setNoResponders(bool $noResponders): void + { + $this->noResponders = $noResponders; + } + + public function isHeaders(): bool + { + return $this->headers; + } + + public function setHeaders(bool $headers): void + { + $this->headers = $headers; + } + /** * @return array */ @@ -171,6 +200,8 @@ public function toArray(): array 'version' => Nats::VERSION, 'protocol' => $this->protocol, 'echo' => $this->echo, + 'no_responders' => $this->noResponders, + 'headers' => $this->headers, ]; } } diff --git a/src/Connection/Error.php b/src/Connection/Error.php new file mode 100644 index 0000000..f49677e --- /dev/null +++ b/src/Connection/Error.php @@ -0,0 +1,9 @@ +headers = []; + $this->payload = null; + } + + public function getPayload(): string + { + return $this->payload ?? ''; + } + + public function getHeader(string $key) + { + return $this->headers[$key] ?? null; + } + + public function setPayload(string $payload): void + { + $this->payload = $payload; + } + + public function setHeaders(string $headers): void + { + // Check if we have an inlined status. + $parts = explode(' ', $headers); + + if (isset($parts[1]) && strlen($parts[1]) === 3) { + $this->headers['status'] = (int) $parts[1]; + } + } + + // HMSG [reply-to] <#header bytes> <#total bytes>␍␊[headers]␍␊␍␊[payload]␍␊ + public static function create(string $protocolMessage): HMsg + { + $parts = StringUtil::explode($protocolMessage, 5); + + try { + return match (count($parts)) { + 4 => new self($parts[0], $parts[1], null, (int) $parts[2], (int) $parts[3]), + 5 => new self($parts[0], $parts[1], $parts[2], (int) $parts[3], (int) $parts[4]), + default => throw new InvalidArgumentException('Invalid msg') + }; + } catch (Exception $e) { + // Add own exception + throw $e; + } + } +} diff --git a/src/Connection/MessageInterface.php b/src/Connection/MessageInterface.php new file mode 100644 index 0000000..acb743a --- /dev/null +++ b/src/Connection/MessageInterface.php @@ -0,0 +1,10 @@ +payload = ''; + } + + /** + * @param string $protocolMessage MSG [reply-to] <#bytes>␍␊[payload]␍␊ + * + * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-6 + */ + public static function create(string $protocolMessage): self + { + $parts = StringUtil::explode($protocolMessage, 4); + + return match (count($parts)) { + 3 => new self($parts[0], $parts[1], null, (int) $parts[2]), + 4 => new self($parts[0], $parts[1], $parts[2], (int) $parts[3]), + default => throw new InvalidArgumentException('Invalid format') + }; + } + + public function getPayload(): string + { + return $this->payload ?? ''; + } + + public function setPayload(string $payload): void + { + $this->payload = $payload; + } +} diff --git a/src/Connection/NatsConnection.php b/src/Connection/NatsConnection.php index fc9e354..7eb3484 100644 --- a/src/Connection/NatsConnection.php +++ b/src/Connection/NatsConnection.php @@ -10,49 +10,48 @@ use EJTJ3\PhpNats\Encoder\JsonEncoder; use EJTJ3\PhpNats\Exception\NatsConnectionRefusedException; use EJTJ3\PhpNats\Exception\NatsInvalidResponseException; -use EJTJ3\PhpNats\Exception\TransportAlreadyConnectedException; use EJTJ3\PhpNats\Logger\NullLogger; -use EJTJ3\PhpNats\ServerInfo\ServerInfo; use EJTJ3\PhpNats\Transport\NatsTransportInterface; use EJTJ3\PhpNats\Transport\Stream\StreamTransport; use EJTJ3\PhpNats\Transport\TranssportOption; use EJTJ3\PhpNats\Util\StringUtil; use Exception; +use InvalidArgumentException; +use LogicException; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerInterface; -/** - * @author Evert Jan Hakvoort - */ final class NatsConnection implements LoggerAwareInterface { - private EncoderInterface $encoder; - - private NatsTransportInterface $transport; - - private NatsConnectionOptionInterface $connectionOptions; - private ?ServerInfo $serverInfo; private ?Server $currentServer; - private LoggerInterface $logger; - private bool $connected; + private bool $enableNoResponder; + + /** + * @deprecated + */ + private bool $isVerbose; + public function __construct( - NatsConnectionOptionInterface $connectionOptions, - NatsTransportInterface $transport = new StreamTransport(), - EncoderInterface $encoder = new JsonEncoder(), - LoggerInterface $logger = new NullLogger() + private readonly NatsConnectionOptionInterface $connectionOptions, + private readonly NatsTransportInterface $transport = new StreamTransport(), + private readonly EncoderInterface $encoder = new JsonEncoder(), + private LoggerInterface $logger = new NullLogger() ) { - $this->transport = $transport; - $this->encoder = $encoder; - $this->connectionOptions = $connectionOptions; $this->serverInfo = null; - $this->logger = $logger; $this->currentServer = null; $this->connected = false; + $this->isVerbose = false; + } + + public function setNoResponders(bool $enabled = true): void + { + // Headers must be enabled for no responders. + $this->enableNoResponder = $enabled; } /** @@ -61,14 +60,14 @@ public function __construct( public function connect(): void { if ($this->transport->isConnected()) { - throw new TransportAlreadyConnectedException('Transport is already connected'); + return; } foreach ($this->connectionOptions->getServerCollection()->getServers() as $server) { $transportOption = new TranssportOption( - $server->getHost(), - $server->getPort(), - $this->connectionOptions->getTimeout() + host: $server->getHost(), + port: $server->getPort(), + timeout: $this->connectionOptions->getTimeout() ); try { @@ -85,7 +84,7 @@ public function connect(): void } if ($this->currentServer === null) { - throw new NatsConnectionRefusedException('Could not connect to servers!'); + throw new NatsConnectionRefusedException('nats: no servers available for connection'); } $this->logger->debug(sprintf('Connected to %s', $this->currentServer->getHost())); @@ -116,31 +115,50 @@ public function close(): void $this->transport->close(); } + public function setLogger(LoggerInterface $logger): void + { + $this->logger = $logger; + } + /** + * The PUB message publishes the message payload to the given subject name, + * optionally supplying a reply subject. If a reply subject is supplied, it will be delivered to eligible + * subscribers along with the supplied payload. Note that the payload itself is optional. + * To omit the payload, set the payload size to 0, but the second CRLF is still required. + * + * @param string $subject the destination subject to publish to + * @param string $payload the message payload data + * @param string|null $replyTo the reply subject that subscribers can use to send a response back + * to the publisher/requester + * * @throws Exception */ - public function publish(string $subject, string $payload): void + public function publish(string $subject, string $payload, string $replyTo = null): void { if ($this->isConnected() === false) { throw new NatsConnectionRefusedException('Connection is closed'); } - $message = sprintf('%s %s', $subject, strlen($payload)); + $a = [ + $subject, + ]; - $this->doWrite(NatsProtocolOperation::Pub, $message . Nats::CR_LF . $payload); - } + if (!StringUtil::isEmpty($replyTo)) { + $a[] = $replyTo; + } - private function isErrorResponse(string $response): bool - { - return NatsProtocolOperation::Err->isOperation(substr($response, 0, 4)); + $a[] = strlen($payload) . Nats::CR_LF . $payload; + + $this->doWrite(NatsProtocolOperation::Pub, implode(' ', $a)); + $this->validateAcknowledgement(); } /** * @throws Exception */ - public function ping(): void + private function ping(): void { - $this->doWrite(NatsProtocolOperation::Ping, "ping"); + $this->doWrite(NatsProtocolOperation::Ping, 'ping'); } /** @@ -150,11 +168,38 @@ public function validatePing(): void { $this->ping(); - $pingResponse = $this->getResponse(); + $msg = $this->getMsg(); - if (!NatsProtocolOperation::Pong->isOperation($pingResponse)) { - throw new NatsInvalidResponseException('Did not receive a pong from the server'); + if ($msg instanceof Pong) { + return; } + + throw new NatsInvalidResponseException('Did not receive a pong from the server'); + } + + /** + * @throws Exception + */ + public function request(string $subject, string $reply = null): MessageInterface + { + $replySubject = StringUtil::isEmpty($reply) ? self::createSid() : $reply; + + $sub = $this->subscribe($replySubject, null); + + $this->publish($subject, '', $replySubject); + + // process msg + $msg = $this->getMsg(); + + $this->unsubscribe($sub->subscriptionId); + + if ($msg instanceof HMsg) { + if ($msg->getHeader('status') === Nats::HEADER_NO_RESPONDER) { + throw new InvalidArgumentException('No responders are available'); + } + } + + return $msg; } public function getServerInfo(): ?ServerInfo @@ -174,8 +219,17 @@ private function doConnect(): void $server = $this->currentServer; $connectionOptions = new ClientConnectionOptions(); - $connectionOptions->setPedantic(true); - $connectionOptions->setVerbose(true); + $connectionOptions->setPedantic(false); + $connectionOptions->setVerbose(false); + + if ($this->enableNoResponder) { + $connectionOptions->setNoResponders(true); + $connectionOptions->setHeaders(true); + } + + if ($connectionOptions->isVerbose() === true) { + $this->isVerbose = true; + } if (!StringUtil::isEmpty($server->getUser()) && !StringUtil::isEmpty($server->getPassword())) { $connectionOptions->setUser($server->getUser()); @@ -183,52 +237,148 @@ private function doConnect(): void } $this->doWrite(NatsProtocolOperation::Connect, $connectionOptions->toArray()); + $this->validateAcknowledgement(); + } - if ($connectionOptions->isVerbose() === true) { - $connectResponse = $this->getResponse(); + /** + * initiates a subscription to a subject, optionally joining a distributed queue group. + * + * @param string|null $queueGroup if specified, the subscriber will join this queue group + * @param string $subject the subject name to subscribe to + * + * @throws Exception + */ + private function subscribe(string $subject, ?string $queueGroup): Subscription + { + $subscriptionId = self::createSid(); + $sub = new Subscription($subject, $subscriptionId); - if (NatsProtocolOperation::Ack->isOperation($connectResponse) === false) { - throw new NatsInvalidResponseException('Nats did not send a normal response'); + $payload = [$subject]; + + if (!StringUtil::isEmpty($queueGroup)) { + $payload[] = $queueGroup; + } + + $payload[] = $subscriptionId; + $this->doWrite(NatsProtocolOperation::Sub, implode(' ', $payload)); + $this->validateAcknowledgement(); + + return $sub; + } + + /** + * @throws Exception + */ + private function unsubscribe(string $subscriptionId): void + { + $this->doWrite(NatsProtocolOperation::Unsub, $subscriptionId); + $this->validateAcknowledgement(); + } + + /** + * @throws Exception + */ + private function saveRead(int $maxBytes = 0, int $timeout = 100): string + { + $line = ''; + $timeoutTarget = microtime(true) + $timeout; + $receivedBytes = 0; + while ($receivedBytes < $maxBytes || $maxBytes === 0) { + $chunkSize = 1024; + $bytesLeft = ($maxBytes - $receivedBytes); + + if ($maxBytes !== 0 && $bytesLeft < $chunkSize) { + $chunkSize = $bytesLeft; + } + + $read = $this->transport->read($chunkSize, Nats::CR_LF); + + if ($read === false) { + throw new Exception('Could not read from stream'); + } + + $receivedBytes += strlen($read); + + $line .= $read; + + // End of string is reached + if (strlen($read) < 1024) { + break; + } + + if (microtime(true) >= $timeoutTarget) { + throw new InvalidArgumentException('Timeout reached'); } } + + return $line; } - private function createServerInfo(): ServerInfo + /** + * @deprecated + */ + private function validateAcknowledgement(): void { - $rawData = $this->getResponse(); + if ($this->isVerbose === false) { + return; + } - [$operation, $data] = explode(' ', $rawData); + $ack = $this->getMsg(); - if (!NatsProtocolOperation::Info->isOperation($operation)) { - throw new NatsInvalidResponseException('Server information is not correct'); + if (!$ack instanceof Acknowledgement) { + throw new NatsInvalidResponseException('Nats did not send a ack. response'); } + } - $data = $this->encoder->decode($data); + private function createServerInfo(): ServerInfo + { + $serverInfoMsg = $this->getMsg(); - $serverInfo = ServerInfo::fromData($data); + if (!$serverInfoMsg instanceof ServerInfo) { + throw new InvalidArgumentException('Invalid Response'); + } - $this->serverInfo = $serverInfo; + $this->serverInfo = $serverInfoMsg; - return $serverInfo; + return $serverInfoMsg; } - private function getResponse(): string + /** + * @throws Exception + */ + private function getMsg(): NatsResponseInterface { - $response = $this->transport->receive(); + $line = $this->saveRead(0, 300); + + $response = Response::parse($line); - if ($response === false) { - throw new NatsInvalidResponseException('Did not get any response from nats. Connection is not valid'); + if ($response instanceof Ping) { + $this->pong(); + // @TODO add check for infinite loop + return $this->getMsg(); } - if (StringUtil::isEmpty($response)) { - throw new NatsInvalidResponseException('Got an empty response from nats, try using tls instead'); + if ($response instanceof ServerInfo || $response instanceof Pong || $response instanceof Acknowledgement || $response instanceof Error) { + return $response; } - if ($this->isErrorResponse($response)) { - throw new NatsInvalidResponseException(sprintf('Receive an error response from nats: %s', $response)); + if ($response instanceof Msg) { + $payload = $this->saveRead($response->bytes); + $response->setPayload($payload); + + return $response; + } + + if ($response instanceof HMsg) { + $headers = $this->saveRead($response->headerBytes); + $response->setHeaders($headers); + $payload = $this->saveRead($response->totalBytes - $response->headerBytes); + $response->setPayload($payload); + + return $response; } - return trim($response); + throw new LogicException('Msg type is not yet implemented'); } /** @@ -247,8 +397,17 @@ private function doWrite(NatsProtocolOperation $operation, array|string $payload $this->transport->write($payload); } - public function setLogger(LoggerInterface $logger): void + /** + * @throws Exception + */ + private function pong(): void { - $this->logger = $logger; + $this->doWrite(NatsProtocolOperation::Pong, 'pong'); + } + + /** A unique alphanumeric subscription ID, generated by the client. */ + private static function createSid(): string + { + return bin2hex(random_bytes(4)); } } diff --git a/src/Connection/NatsConnectionOption.php b/src/Connection/NatsConnectionOption.php index d73a794..7a3dc94 100644 --- a/src/Connection/NatsConnectionOption.php +++ b/src/Connection/NatsConnectionOption.php @@ -29,7 +29,7 @@ final class NatsConnectionOption implements NatsConnectionOptionInterface */ public function __construct( array|Server|string|ServerCollection $servers = [], - ?string $name = null, + string $name = null, int $timeout = 5, bool $randomize = false ) { diff --git a/src/Connection/NatsResponseInterface.php b/src/Connection/NatsResponseInterface.php new file mode 100644 index 0000000..e65be42 --- /dev/null +++ b/src/Connection/NatsResponseInterface.php @@ -0,0 +1,9 @@ +isOperation($payload)) { + return new Pong(); + } + + if (NatsProtocolOperation::Ack->isOperation($payload)) { + return new Acknowledgement(); + } + + if (NatsProtocolOperation::Err->isOperation($payload)) { + return new Error(); + } + + if (!str_contains($payload, ' ')) { + throw new RuntimeException('Invalid response format'); + } + + [$responseType, $body] = explode(' ', $payload, 2); + + $responseType = strtoupper($responseType); + + $operation = NatsProtocolOperation::tryFrom($responseType); + + if ($operation === null) { + throw new RuntimeException('Operation is not supported'); + } + + dump($operation); + + return match ($operation) { + NatsProtocolOperation::HEADER_MSG => HMsg::create($body), + NatsProtocolOperation::Msg => Msg::create($body), + NatsProtocolOperation::Info => \EJTJ3\PhpNats\Connection\ServerInfo::fromData($body), + NatsProtocolOperation::Ping => new Ping(), + NatsProtocolOperation::Pong => new Pong(), + default => throw new LogicException('Not implemented yet') + }; + } +} diff --git a/src/Connection/Server.php b/src/Connection/Server.php index 15f3cf4..f2b394c 100644 --- a/src/Connection/Server.php +++ b/src/Connection/Server.php @@ -5,6 +5,7 @@ namespace EJTJ3\PhpNats\Connection; use EJTJ3\PhpNats\Constant\Nats; +use InvalidArgumentException; use Nyholm\Dsn\Configuration\Url; use Nyholm\Dsn\DsnParser; @@ -61,7 +62,7 @@ private function addDefaultScheme(string $url): string if (count($schemeParts) === 1) { $url = sprintf('nats://%s', $url); } elseif (!in_array($schemeParts[0], ['nats', 'tls'], true)) { - throw new \InvalidArgumentException('Scheme is not supported'); + throw new InvalidArgumentException('Scheme is not supported'); } return $url; diff --git a/src/ServerInfo/ServerInfo.php b/src/Connection/ServerInfo.php similarity index 70% rename from src/ServerInfo/ServerInfo.php rename to src/Connection/ServerInfo.php index f8c1990..bdd4ad9 100644 --- a/src/ServerInfo/ServerInfo.php +++ b/src/Connection/ServerInfo.php @@ -2,23 +2,23 @@ declare(strict_types=1); -namespace EJTJ3\PhpNats\ServerInfo; +namespace EJTJ3\PhpNats\Connection; /** * @see https://docs.nats.io/reference/reference-protocols/nats-protocol#info */ -final class ServerInfo +final class ServerInfo implements NatsResponseInterface { public function __construct( /** * The unique identifier of the NATS server. */ - private readonly string $serverId, + private readonly string $serverId, /** * The version of the NATS server. */ - private readonly string $version, + private readonly string $version, /** * The version of golang the NATS server was built with. @@ -29,80 +29,80 @@ public function __construct( * The IP address used to start the NATS server, * by default this will be 0.0.0.0 and can be configured with -client_advertise host:port. */ - private readonly string $host, + private readonly string $host, /** * The port number the NATS server is configured to listen on. */ - private readonly int $port, + private readonly int $port, /** * An integer indicating the protocol version of the server. * The server version 1.2.0 sets this to 1 to indicate that it supports the "Echo" feature. */ - private readonly int $proto, + private readonly int $proto, /** * Maximum payload size, in bytes, that the server will accept from the client. */ - private readonly int $maxPayload, + private readonly int $maxPayload, /** * If this is set, then the client must perform the TLS/1.2 handshake. * Note, this used to be ssl_required and has been updated along with the protocol from SSL to TLS. */ - private readonly bool $tlsRequired, + private readonly bool $tlsRequired, /** * If this is set, the client must provide a valid certificate during the TLS handshake. */ - private readonly bool $tlsVerify, + private readonly bool $tlsVerify, /** * If this is set, then the client should try to authenticate upon connect. */ - private readonly bool $authRequired, + private readonly bool $authRequired, /** * @var string[] $connectUrls * * An optional list of server urls that a client can connect to */ - private readonly array $connectUrls, + private readonly array $connectUrls, /** * An optional unsigned integer (64 bits) representing the internal client identifier in the server. * This can be used to filter client connections in monitoring, correlate with error logs, etc... */ - private readonly ?int $clientId, + private readonly ?int $clientId, /** * If the server supports Lame Duck Mode notifications, * and the current server has transitioned to lame duck, ldm will be set to true. */ - private readonly bool $lameDuckMode + private readonly bool $lameDuckMode ) { } - /** - * @param array $response - */ - public static function fromData(array $response): self + public static function fromData(string $response): self { + /** @var array $content */ + $content = json_decode($response, true); + return new self( - serverId: $response['server_id'], - version: $response['version'], - go: $response['go'] ?? null, - host: $response['host'], - port: $response['port'], - proto: $response['proto'], - maxPayload: $response['max_payload'], - tlsRequired: $response['tls_required'] ?? false, - tlsVerify: $response['tls_verify'] ?? false, - authRequired: $response['auth_required'] ?? false, - connectUrls: $response['connect_urls'] ?? [], - clientId: $response['client_id'] ?? null, - lameDuckMode: $response['ldm'] ?? false + serverId: $content['server_id'], + version: $content['version'], + go: $content['go'] ?? null, + host: $content['host'], + port: $content['port'], + proto: $content['proto'], + maxPayload: $content['max_payload'], + tlsRequired: $content['tls_required'] ?? false, + tlsVerify: $content['tls_verify'] ?? false, + authRequired: $content['auth_required'] ?? false, + connectUrls: $content['connect_urls'] ?? [], + clientId: $content['client_id'] ?? null, + lameDuckMode: $content['ldm'] ?? false ); } diff --git a/src/Connection/Subscription.php b/src/Connection/Subscription.php new file mode 100644 index 0000000..035f8e9 --- /dev/null +++ b/src/Connection/Subscription.php @@ -0,0 +1,14 @@ + - */ abstract class Nats { final public const DEFAULT_PORT = 4222; @@ -15,5 +12,11 @@ abstract class Nats final public const LANG = 'php'; - final public const VERSION = '0.0.1'; + final public const VERSION = '0.0.2'; + + final public const HEADER_LINE = 'NATS/1.0'; + + final public const HEADER_STATUS_LENGTH = 3; + + final public const HEADER_NO_RESPONDER = 503; } diff --git a/src/Constant/NatsProtocolOperation.php b/src/Constant/NatsProtocolOperation.php index 6a47ef5..386a093 100644 --- a/src/Constant/NatsProtocolOperation.php +++ b/src/Constant/NatsProtocolOperation.php @@ -5,8 +5,6 @@ namespace EJTJ3\PhpNats\Constant; /** - * @author Evert Jan Hakvoort - * * @see https://docs.nats.io/reference/reference-protocols/nats-protocol */ enum NatsProtocolOperation: string @@ -58,6 +56,12 @@ enum NatsProtocolOperation: string */ case Msg = 'MSG'; + /** + * Sent by server + * Delivers a message payload to a subscriber with NATS headers. + */ + case HEADER_MSG = 'HMSG'; + /** * PING keep-alive message. * diff --git a/src/Exception/NatsReadException.php b/src/Exception/NatsReadException.php new file mode 100644 index 0000000..0eb2d5b --- /dev/null +++ b/src/Exception/NatsReadException.php @@ -0,0 +1,11 @@ + $chunkSize */ - private readonly int $chunkSize = 1500, + private readonly int $chunkSize = 1024, ) { $this->stream = null; } - + /** * Close will close the connection to the server. */ @@ -55,7 +56,7 @@ public function connect(TransportOptionsInterface $option): void $errorCode = null; $errorMessage = null; - set_error_handler(static fn() => true); + set_error_handler(static fn () => true); $stream = stream_socket_client( $address, @@ -109,7 +110,7 @@ public function write(string $payload): void } do { - $written = @fwrite($this->getStream(), $payload); + $written = @fwrite($this->getStream(), $payload, $this->chunkSize); if ($written === false) { throw new NatsStreamWriteException('Error sending data'); @@ -122,34 +123,14 @@ public function write(string $payload): void $length -= $written; if ($length > 0) { - $payload = substr($payload, (0 - $length)); + $payload = substr($payload, 0 - $length); } } while ($length > 0); } - public function receive(int $length = 0): string + public function read(int $length, string $lineEnding = Nats::CR_LF): bool|string { - if ($length <= 0) { - return fgets($this->getStream()); - } - - $chunkSize = $this->chunkSize; - $line = ""; - $receivedBytes = 0; - - while ($receivedBytes < $length) { - $bytesLeft = ($length - $receivedBytes); - - if ($bytesLeft < $this->chunkSize) { - $chunkSize = $bytesLeft; - } - - $readChunk = fread($this->getStream(), $chunkSize); - $receivedBytes += strlen($readChunk); - $line .= $readChunk; - } - - return $line; + return stream_get_line($this->getStream(), $length, $lineEnding); } /** diff --git a/src/Util/StringUtil.php b/src/Util/StringUtil.php index a1edb4d..864fddc 100644 --- a/src/Util/StringUtil.php +++ b/src/Util/StringUtil.php @@ -8,6 +8,23 @@ final class StringUtil { public static function isEmpty(?string $value): bool { - return strlen(trim($value ?? '')) === 0; + return trim($value ?? '') === ''; + } + + /** + * @return array + */ + public static function explode(string $string, int $limit = null): array + { + if ($limit === null) { + $parts = explode(' ', $string); + } else { + $parts = explode(' ', $string, $limit); + } + + return array_values(array_filter( + $parts, + static fn (?string $part) => !StringUtil::isEmpty($part)) + ); } } diff --git a/tests/Connection/ClientConnectionOptionsTest.php b/tests/Connection/ClientConnectionOptionsTest.php index b276b44..d5f98e8 100644 --- a/tests/Connection/ClientConnectionOptionsTest.php +++ b/tests/Connection/ClientConnectionOptionsTest.php @@ -6,7 +6,6 @@ use EJTJ3\PhpNats\Connection\ClientConnectionOptions; use EJTJ3\PhpNats\Constant\Nats; -use Generator; use PHPUnit\Framework\TestCase; final class ClientConnectionOptionsTest extends TestCase @@ -139,6 +138,8 @@ public function testToArray(ClientConnectionOptions $options): void 'version' => Nats::VERSION, 'protocol' => 1, 'echo' => true, + 'no_responders' => true, + 'headers' => true, ]; $options->setEcho(true); @@ -150,11 +151,13 @@ public function testToArray(ClientConnectionOptions $options): void $options->setAuthToken('authToken'); $options->setTlsRequired(true); $options->setPedantic(true); + $options->setNoResponders(true); + $options->setHeaders(true); $this->assertSame($expected, $options->toArray()); } - public function createClientConnectionOptions(): Generator + public function createClientConnectionOptions(): \Generator { yield [new ClientConnectionOptions()]; } diff --git a/tests/Connection/NatsConnectionOptionTest.php b/tests/Connection/NatsConnectionOptionTest.php index aa11876..d3d7240 100644 --- a/tests/Connection/NatsConnectionOptionTest.php +++ b/tests/Connection/NatsConnectionOptionTest.php @@ -5,7 +5,6 @@ namespace Connection; use EJTJ3\PhpNats\Connection\NatsConnectionOption; -use InvalidArgumentException; use PHPUnit\Framework\TestCase; final class NatsConnectionOptionTest extends TestCase @@ -28,7 +27,7 @@ public function testSingleServer(): void public function testEmptyServer(): void { - $this->expectException(InvalidArgumentException::class); + $this->expectException(\InvalidArgumentException::class); new NatsConnectionOption(''); } diff --git a/tests/Connection/ServerCollectionTest.php b/tests/Connection/ServerCollectionTest.php index c964445..212e57e 100644 --- a/tests/Connection/ServerCollectionTest.php +++ b/tests/Connection/ServerCollectionTest.php @@ -6,7 +6,6 @@ use EJTJ3\PhpNats\Connection\Server; use EJTJ3\PhpNats\Connection\ServerCollection; -use InvalidArgumentException; use PHPUnit\Framework\TestCase; final class ServerCollectionTest extends TestCase @@ -26,7 +25,7 @@ public function testServerCollection() public function testEmptyServers(): void { - $this->expectException(InvalidArgumentException::class); + $this->expectException(\InvalidArgumentException::class); new ServerCollection([]); } } diff --git a/tests/Util/StringUtilTest.php b/tests/Util/StringUtilTest.php index a0e9e7a..bcad19b 100644 --- a/tests/Util/StringUtilTest.php +++ b/tests/Util/StringUtilTest.php @@ -24,4 +24,28 @@ public function testNonEmptyStrings(): void $this->assertFalse(StringUtil::isEmpty($case)); } } + + public function testExplode(): void + { + $string = 'MSG reply 2d4a3629 1649'; + $parts = StringUtil::explode($string); + + self::assertEquals([ + 'MSG', + 'reply', + '2d4a3629', + '1649', + ], $parts); + } + + public function testExplodeWithLimit(): void + { + $string = 'MSG reply 2d4a3629 1649'; + $parts = StringUtil::explode($string, 2); + + self::assertEquals([ + 'MSG', + 'reply 2d4a3629 1649', + ], $parts); + } }