Skip to content

Commit

Permalink
chore: work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
EJTJ3 committed Jul 27, 2023
1 parent 994ba2a commit 9566c1d
Show file tree
Hide file tree
Showing 27 changed files with 643 additions and 147 deletions.
3 changes: 1 addition & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
},
"authors": [
{
"name": "Evert Jan Hakvoort",
"email": "[email protected]"
"name": "EJTJ3"
}
],
"require": {
Expand Down
9 changes: 9 additions & 0 deletions src/Connection/Acknowledgement.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace EJTJ3\PhpNats\Connection;

final class Acknowledgement implements NatsResponseInterface
{
}
45 changes: 38 additions & 7 deletions src/Connection/ClientConnectionOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ public function __construct(
/**
* Turns on +OK protocol acknowledgements.
*/
private bool $verbose = true,
private bool $verbose = true,

/**
* Turns on additional strict format checking, e.g. for properly formed subjects.
*/
private bool $pedantic = true,
private bool $pedantic = true,

/**
* Indicates whether the client requires an SSL connection.
*/
private bool $tlsRequired = false,
private bool $tlsRequired = false,

/**
* Client authorization token (if auth_required is set).
Expand All @@ -52,16 +52,25 @@ public function __construct(
* Sending 1 indicates that the client supports dynamic reconfiguration of
* cluster topology changes by asynchronously receiving INFO messages with known servers it can reconnect to.
*/
private ?int $protocol = 0,
private ?int $protocol = 0,

/**
* If set to true, the server (version 1.2.0+) will not send originating messages from this
* connection to its own subscriptions. Clients should set this to true only for server supporting
* this feature, which is when proto in the INFO protocol is set to at least 1.
*/
private bool $echo = false,
)
{
private bool $echo = false,

/**
* Enable quick replies for cases where a request is sent to a topic with no responders.
*/
private bool $noResponders = false,

/**
* Whether the client supports headers.
*/
private bool $headers = false,
) {
}

public function isVerbose(): bool
Expand Down Expand Up @@ -154,6 +163,26 @@ public function setEcho(bool $echo): void
$this->echo = $echo;
}

public function isNoResponders(): bool
{
return $this->noResponders;
}

public function setNoResponders(bool $noResponders): void
{
$this->noResponders = $noResponders;
}

public function isHeaders(): bool
{
return $this->headers;
}

public function setHeaders(bool $headers): void
{
$this->headers = $headers;
}

/**
* @return array<string, mixed>
*/
Expand All @@ -171,6 +200,8 @@ public function toArray(): array
'version' => Nats::VERSION,
'protocol' => $this->protocol,
'echo' => $this->echo,
'no_responders' => $this->noResponders,
'headers' => $this->headers,
];
}
}
9 changes: 9 additions & 0 deletions src/Connection/Error.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace EJTJ3\PhpNats\Connection;

final class Error implements NatsResponseInterface
{
}
76 changes: 76 additions & 0 deletions src/Connection/HMsg.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);

namespace EJTJ3\PhpNats\Connection;

use EJTJ3\PhpNats\Util\StringUtil;
use Exception;
use InvalidArgumentException;

final class HMsg implements MessageInterface
{
private array $headers;

public ?string $payload;

public ?string $protocol;

public function __construct(
// Subject name this message was received on.
public string $subject,
// The unique alphanumeric subscription ID of the subject.
public string $subscriptionId,
// The subject on which the publisher is listening for responses.
public ?string $replyTo,
// The size of the headers section in bytes including the ␍␊␍␊ delimiter before the payload.
public int $headerBytes,
// The total size of headers and payload sections in bytes.
public int $totalBytes
) {
$this->headers = [];
$this->payload = null;
}

public function getPayload(): string
{
return $this->payload ?? '';
}

public function getHeader(string $key)
{
return $this->headers[$key] ?? null;
}

public function setPayload(string $payload): void
{
$this->payload = $payload;
}

public function setHeaders(string $headers): void
{
// Check if we have an inlined status.
$parts = explode(' ', $headers);

if (isset($parts[1]) && strlen($parts[1]) === 3) {
$this->headers['status'] = (int) $parts[1];
}
}

// HMSG <subject> <sid> [reply-to] <#header bytes> <#total bytes>␍␊[headers]␍␊␍␊[payload]␍␊
public static function create(string $protocolMessage): HMsg
{
$parts = StringUtil::explode($protocolMessage, 5);

try {
return match (count($parts)) {
4 => new self($parts[0], $parts[1], null, (int) $parts[2], (int) $parts[3]),
5 => new self($parts[0], $parts[1], $parts[2], (int) $parts[3], (int) $parts[4]),
default => throw new InvalidArgumentException('Invalid msg')
};
} catch (Exception $e) {
// Add own exception
throw $e;
}
}
}
10 changes: 10 additions & 0 deletions src/Connection/MessageInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace EJTJ3\PhpNats\Connection;

interface MessageInterface extends NatsResponseInterface
{
public function getPayload(): string;
}
55 changes: 55 additions & 0 deletions src/Connection/Msg.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace EJTJ3\PhpNats\Connection;

use EJTJ3\PhpNats\Util\StringUtil;
use InvalidArgumentException;

final class Msg implements MessageInterface
{
/**
* @var string|null the message payload data
*/
private ?string $payload;

private function __construct(
// subject name this message was received on - always
public readonly string $subject,
// The unique alphanumeric subscription ID of the subject. - always
public readonly string $sid,
// The subject on which the publisher is listening for responses. - optional
public readonly ?string $replyTo,
// Size of the payload in bytes.
public readonly int $bytes,
) {
$this->payload = '';
}

/**
* @param string $protocolMessage MSG <subject> <sid> [reply-to] <#bytes>␍␊[payload]␍␊
*
* @see https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-6
*/
public static function create(string $protocolMessage): self
{
$parts = StringUtil::explode($protocolMessage, 4);

return match (count($parts)) {
3 => new self($parts[0], $parts[1], null, (int) $parts[2]),
4 => new self($parts[0], $parts[1], $parts[2], (int) $parts[3]),
default => throw new InvalidArgumentException('Invalid format')
};
}

public function getPayload(): string
{
return $this->payload ?? '';
}

public function setPayload(string $payload): void
{
$this->payload = $payload;
}
}
Loading

0 comments on commit 9566c1d

Please sign in to comment.