Skip to content

Commit

Permalink
Merge branch 'single-conn-multi-sub'
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestKz committed Jan 15, 2024
2 parents 521d1c9 + 3e24a7b commit c973053
Show file tree
Hide file tree
Showing 12 changed files with 1,648 additions and 246 deletions.
224 changes: 224 additions & 0 deletions docs/IcepeakProtocolMultiSub.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# The Icepeak Multiple-Subcription-Single-Connection Protocol

This protocol was developed atop the existing protocol to allow the client to subscribe to multiple "paths"/"topics" using a single websocket connection.

Whereas the original existing protocol required a new websocket connection for each path the client wanted to subscribe to.

# Initial Connection

A client can connect to a multiple-subscription protocol connection with a URL such as:
`ws://yourdomain.com/?method=reusable`

The URL path:
- points to the root of your icepeak service
- has the query parameter `method` set to `reusable`

The authorisation mechanism is a "subscription deadline timout": Initial websocket connection does not need authorisation, but if the client does not subscribe to a client before the server-configured timeout, then the connection will be closed un-politely, i.e the server will not send a WS close control message.

Timeout is set by the `--first-subscription-deadline-timeout` flag, the input is in terms of microseconds, the default value is `1000000` (1 second).


# Subscribing, Unsubscribing & Updates

In summary:
- The client can request to subscribe to an array of paths.
- The client, on the single connection, can cumulatively keep subscribing and unsubscribing to paths by sending corresponding payloads.
- The server also sends back a response about the status, and some data of the corresponding subscription or unsubscription request.
- The server will send the client the new value/update at the subscribed path whenever there is a change at that path.
- Both, subscription and unsubscription requests are idempotent, in terms of the effect on the state of the server, actual response received from client varies depending on the state of the server.

## Update

`JSON Schema` declaration of the update the server will send to the client upon subscribed path change:
```javascript
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "wss://updates.channable.com",
"title": "Path changes",
"description": "Indicates changes occurred at the subscribed paths",
"type": "object",
"properties": {
"type": { "const": "update" },
"path": { "type": "string" },
"value": {}
}
}
```

Example:
```javascript
{
"type": "update",
"path": "path/to/value",
"value": <any-type>
}
```

## Subscribe

In summary:
- The client can send a payload that contains an array of paths to subscribe to.
- The client has to send a JWT that authorises all the paths.
- The client can expect a response from the server that contains the status/acknowledgement of the request.
- Upon a successful request, the client can expect the payload to also contain the current value of the paths requested.

### Client Subscribe Request
Each subscription is checked against a JWT to see if the user is authorised to access the path(s).

`JSON Schema` declaration of the subscribe client request:
```javascript
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "wss://updates.channable.com/",
"title": "Add subscription",
"description": "Adding an additional subscription",
"type": "object",
"properties": {
"type": { "const": "subscribe" },
"paths": { "type": "array", "items": { "type": "string" } },
"token": { "type": "string" }
}
}

```
Example:
```javascript
{
"type": "subscribe",
"paths": [ "path/one", "path/two", "otherpath" ],
"token": "eyJ..."
}
```


### Server Subscribe Response
The server sends back a payload to the client. The payload will always contain a status code:

| Status code | When |
| ---- | -------------------------------------------- |
| 200 | Subscription was successfully processed |
| 400 | Request payload was malformed |
| 401 | No authorization token provided |
| 403 | Authorization token was rejected / malformed |


If the status code is `200` and **whether or not the client is already subscribed**, the client can expect a payload from server that contains:
- The status code.
- Path(s) of the subscription requested.
- The current value(s) of that path(s).

`JSON Schema` declaration of the server response:
```javascript
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "wss://updates.channable.com",
"title": "Subscription status",
"description": "Indicates whether the subscription was successful",
"type": "object",
"properties": {
"type": { "const": "subscribe" },
"paths": {
"type": "array",
"path": {
"type": "object",
"properties": {
"path": { "type": "string" },
"value": {}
}
}
},
"code": { "type": "number" },
"message": { "type": "string" },
"extra": {}
}
}
```

Example success:
```javascript
{
"type": "subscribe",
"paths": [
{ "path": "path/one", "value": val },
{ "path": "path/two", "value": val2 },
{ "path": "otherpath", "value": val3 },
],
"code": 200,
"message": "You've been successfully subscribed to the paths",
"extra": {}
}
```

## Unsubscribe

In summary:
- The client can send a payload that contains paths to unsubscribe from.
- The client can expect a response from the server that contains the status/acknowledgement of the request, and paths unsubscribed from.

### Client Unsubscribe Request

`JSON Schema` declaration of the unsubscribe client request:
```javascript
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "wss://updates.channable.com",
"title": "Remove Subscription",
"description": "Remove an existing subscription",
"type": "object",
"properties": {
"type": { "const": "unsubscribe" },
"paths": { "type": "array", "items": { "type": "string" } }
}
}
```

Example unsubscribe request:
```javascript
{
"type": "unsubscribe",
"paths": [ "path/one", "path/two", "path/three" ],
}
```

### Server Unsubscribe Response
The server sends back a payload to the client. The payload will always contain a status code:

| Status code | When |
| ------------- | -------------------------------- |
| 200 | Unsubscription was successfully processed |
| 400 | Request payload was malformed |

If the status code is `200`, the client can expect a payload from server that contains:
- the list of paths that the client had been meaningfully unsubscribe from, i.e only the paths that the client had subscribed to.

`JSON Schema` declaration of the unsubscribe client request:
```javascript
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "wss://updates.channable.com",
"title": "Unsubscription status",
"description": "Indicates whether the unsubscription was successful",
"type": "object",
"properties": {
"type": { "const": "unsubscribe" },
"paths": { "type": "array", "items": { "type": "string" } },
"code": { "type": "number" },
"message": { "type": "string" },
"extra": {}
}
}
```

Example succesful unsubscribe response:
```javascript
{
"type": "unsubscribe",
"paths": [ "path/one", "path/two" ],
"code": 200,
"message": "You've been successfully unsubscribed from the paths",
"extra": {}
}
```

# Invalid Client Message
The server will close the websocket connection with an informative message if the client payload is not recognised.
6 changes: 6 additions & 0 deletions server/icepeak.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ library
Icepeak.Server.Store
Icepeak.Server.Subscription
Icepeak.Server.WebsocketServer
Icepeak.Server.WebsocketServer.SingleSubscription
Icepeak.Server.WebsocketServer.MultiSubscription
Icepeak.Server.WebsocketServer.Payload
Icepeak.Server.WebsocketServer.Utils

other-modules: Paths_icepeak
hs-source-dirs: src
Expand Down Expand Up @@ -177,6 +181,7 @@ test-suite spec
Icepeak.Server.SocketSpec
Icepeak.Server.StoreSpec
Icepeak.Server.SubscriptionTreeSpec
Icepeak.Server.MultiSubscriptionSpec
OrphanInstances
Paths_icepeak

Expand Down Expand Up @@ -223,5 +228,6 @@ test-suite spec
, wai-websockets
, warp
, websockets
, hspec-expectations-json

default-language: Haskell2010
7 changes: 4 additions & 3 deletions server/icepeak.nix
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

# Haskell packages
, aeson, async, base, bytestring, clock, containers, directory, hashable, hspec
, hspec-wai, http-types, jwt, monad-logger, mtl, network, optparse-applicative
, prometheus-client, prometheus-metrics-ghc, QuickCheck, quickcheck-instances
, hspec-wai, hspec-expectations-json, http-types, jwt, monad-logger, mtl, network
, optparse-applicative, prometheus-client
, prometheus-metrics-ghc, QuickCheck, quickcheck-instances
, random, raven-haskell, scotty, sqlite-simple, securemem, stm, text, time, unix
, unordered-containers, uuid, wai, wai-extra, wai-middleware-prometheus
, wai-websockets, warp, websockets }:
Expand Down Expand Up @@ -83,7 +84,7 @@ mkDerivation {
websockets
];

testHaskellDepends = [ hspec hspec-wai QuickCheck quickcheck-instances ];
testHaskellDepends = [ hspec hspec-wai QuickCheck quickcheck-instances hspec-expectations-json ];

license = lib.licenses.bsd3;
}
12 changes: 12 additions & 0 deletions server/src/Icepeak/Server/Config.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE NumericUnderscores #-}

module Icepeak.Server.Config (
Config (..),
MetricsConfig (..),
Expand Down Expand Up @@ -51,6 +53,11 @@ data Config = Config
-- to respond with a pong. If no pong is sent within this timeframe then the
-- connection is considered to have timed out and it will be terminated.
, configWebSocketPongTimeout:: Int
-- | The amount of time in microseconds to wait for a subscription request before closing the connection.
-- This is used for the 'MultiSubscription.hs' protocol.
-- The initial connection to the server is not behind authorisation, this timeout mechanism
-- is used to prevent unwanted connections.
, configInitialSubscriptionTimeoutMicroSeconds:: Int
}

data MetricsConfig = MetricsConfig
Expand Down Expand Up @@ -121,6 +128,11 @@ configParser environment = Config
metavar "WS-PONG-TIMEOUT" <>
value 30 <>
help "The timespan in seconds after sending a ping during which the client has to respond with a pong. If no pong is sent within this timeframe then the connection is considered to have timed out and it will be terminated.")
<*> option auto
(long "first-subscription-deadline-timeout" <>
metavar "MICROSECONDS" <>
value 1_000_000 <> -- 1 second
help "The amount of time in microseconds to wait for a subscription request before closing the connection. This is used for the multiple subscription protocol. The initial connection to the server is not behind authorisation, and hence this timeout mechanism is used to disconnect unwanted connections.")

where
environ var = foldMap value (lookup var environment)
Expand Down
Loading

0 comments on commit c973053

Please sign in to comment.