From a39603f90c1edd7c6357ac5f301263d7ccc0d642 Mon Sep 17 00:00:00 2001 From: The Things Bot Date: Tue, 12 Dec 2023 13:17:46 +0000 Subject: [PATCH 01/15] all: Bump to version 3.28.2 --- data/lorawan-devices | 2 +- data/lorawan-frequency-plans | 2 +- data/lorawan-webhook-templates | 2 +- package.json | 2 +- pkg/version/ttn.go | 2 +- sdk/js/package.json | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/data/lorawan-devices b/data/lorawan-devices index 4c8ea2f54c..4413bd8a98 160000 --- a/data/lorawan-devices +++ b/data/lorawan-devices @@ -1 +1 @@ -Subproject commit 4c8ea2f54cc0575a1c0f508e7a2ac9f272be4b4c +Subproject commit 4413bd8a98e752dad4fd906e4c49387a79a09f61 diff --git a/data/lorawan-frequency-plans b/data/lorawan-frequency-plans index 6d3428a337..54d10835c5 160000 --- a/data/lorawan-frequency-plans +++ b/data/lorawan-frequency-plans @@ -1 +1 @@ -Subproject commit 6d3428a33761b2b4a5c15143d817cbf204929d84 +Subproject commit 54d10835c50a893b34cadd992df752e9eba6061e diff --git a/data/lorawan-webhook-templates b/data/lorawan-webhook-templates index 2c03558427..1142a2a668 160000 --- a/data/lorawan-webhook-templates +++ b/data/lorawan-webhook-templates @@ -1 +1 @@ -Subproject commit 2c0355842769995e3f1d304d004303d9671febe0 +Subproject commit 1142a2a66863c477ee027b147c67a0ca923f485a diff --git a/package.json b/package.json index cfc6aaeabc..96ea55b742 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ttn-stack", - "version": "3.28.1", + "version": "3.28.2", "description": "The Things Stack", "main": "index.js", "repository": "https://github.com/TheThingsNetwork/lorawan-stack.git", diff --git a/pkg/version/ttn.go b/pkg/version/ttn.go index 87ae965713..4bfb83bcb1 100644 --- a/pkg/version/ttn.go +++ b/pkg/version/ttn.go @@ -3,4 +3,4 @@ package version // TTN Version -var TTN = "3.28.1-dev" +var TTN = "3.28.2-dev" diff --git a/sdk/js/package.json b/sdk/js/package.json index 19405384b3..e6c63fa752 100644 --- a/sdk/js/package.json +++ b/sdk/js/package.json @@ -1,6 +1,6 @@ { "name": "ttn-lw", - "version": "3.28.1", + "version": "3.28.2", "description": "The Things Stack for LoRaWAN JavaScript SDK", "url": "https://github.com/TheThingsNetwork/lorawan-stack/tree/default/sdk/js", "main": "dist/index.js", From fe5aeb789d467c3c4ad63d11959f1d5f2192cdcb Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Wed, 13 Dec 2023 18:11:45 +0100 Subject: [PATCH 02/15] console: Fix WebSocket request detection --- .../internal/events/middleware/auth.go | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/pkg/console/internal/events/middleware/auth.go b/pkg/console/internal/events/middleware/auth.go index 66f2b71e20..6f7f18409e 100644 --- a/pkg/console/internal/events/middleware/auth.go +++ b/pkg/console/internal/events/middleware/auth.go @@ -30,17 +30,37 @@ var ( upgradeHeader = textproto.CanonicalMIMEHeaderKey("Upgrade") ) +func headerTokens(h http.Header, key string) []string { + var tokens []string + for _, value := range h[key] { + value := strings.Split(strings.TrimSpace(value), ",") + for _, token := range value { + token := strings.TrimSpace(token) + tokens = append(tokens, token) + } + } + return tokens +} + +func containsHeaderToken(h http.Header, key, token string) bool { + for _, t := range headerTokens(h, key) { + if strings.EqualFold(t, token) { + return true + } + } + return false +} + func isWebSocketRequest(r *http.Request) bool { h := r.Header - return strings.EqualFold(h.Get(connectionHeader), "upgrade") && - strings.EqualFold(h.Get(upgradeHeader), "websocket") + return containsHeaderToken(h, connectionHeader, "upgrade") && containsHeaderToken(h, upgradeHeader, "websocket") } // ProtocolAuthentication returns a middleware that authenticates WebSocket requests using the subprotocol. // The subprotocol must be prefixed with the given prefix. // The token is extracted from the subprotocol and used to authenticate the request. -// If the token is valid, the subprotocol is removed from the request. -// If the token is invalid, the request is not authenticated. +// If the token is valid, the subprotocol is removed from the request, and the original authorization header is removed. +// If the token is invalid, the request is unchanged. func ProtocolAuthentication(prefix string) func(http.Handler) http.Handler { prefixLen := len(prefix) return func(next http.Handler) http.Handler { @@ -49,24 +69,19 @@ func ProtocolAuthentication(prefix string) func(http.Handler) http.Handler { next.ServeHTTP(w, r) return } - if r.Header.Get(authorizationHeader) != "" { - next.ServeHTTP(w, r) - return - } - protocols := strings.Split(strings.TrimSpace(r.Header.Get(protocolHeader)), ",") - newProtocols := make([]string, 0, len(protocols)) + var protocols []string token := "" - for _, protocol := range protocols { + for _, protocol := range headerTokens(r.Header, protocolHeader) { p := strings.TrimSpace(protocol) if len(p) >= prefixLen && strings.EqualFold(prefix, p[:prefixLen]) { token = p[prefixLen:] continue } - newProtocols = append(newProtocols, p) + protocols = append(protocols, p) } if _, _, _, err := auth.SplitToken(token); err == nil { - if len(newProtocols) > 0 { - r.Header.Set(protocolHeader, strings.Join(newProtocols, ",")) + if len(protocols) > 0 { + r.Header.Set(protocolHeader, strings.Join(protocols, ",")) } else { r.Header.Del(protocolHeader) } From 90be2048ed72eee87187eda13035368a3bc0d5ce Mon Sep 17 00:00:00 2001 From: Kevin Schiffer Date: Thu, 14 Dec 2023 19:47:40 +0900 Subject: [PATCH 03/15] dev,console: Fix event stream concurrency and state machine --- .../console/store/middleware/logics/events.js | 10 +- .../api/stream/subscribeToWebSocketStream.js | 214 ++++++++++++------ 2 files changed, 151 insertions(+), 73 deletions(-) diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index e03e82d85e..513fb8ef0b 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -175,9 +175,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { allow(action) }, - process: ({ action }, dispatch, done) => { - if (action.error) { - if (action.error?.message === 'timeout') { + process: async ({ action }, dispatch, done) => { + if (action.type === START_EVENTS_FAILURE) { + if (action?.error?.message === 'timeout') { // Set the connection status to `checking` to trigger connection checks // and detect possible offline state. dispatch(setStatusChecking()) @@ -188,6 +188,10 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { return done() } } + if (action.type === STOP_EVENTS && Boolean(channel)) { + // Close the connection if it wasn't closed already. + await channel.close() + } done() }, }), diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index 6cb6232e69..ca38abb5a0 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -12,15 +12,56 @@ // See the License for the specific language governing permissions and // limitations under the License. +import traverse from 'traverse' + import Token from '../../util/token' import { warn } from '../../../../../pkg/webui/lib/log' import { notify, EVENTS, MESSAGE_TYPES } from './shared' -const wsInstances = {} -let subscriptions = {} const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}) -let closeRequested = false + +const store = () => { + const connections = {} + + return { + getInstance: url => traverse(connections).get([url, 'instance']), + setInstance: (url, instance) => { + traverse(connections).set([url, 'instance'], instance) + return instance + }, + deleteInstance: url => { + if (url in connections) { + delete connections[url] + } + }, + getSubscriptions: url => Object.values(traverse(connections).get([url, 'subscriptions'] || {})), + getSubscription: (url, sid) => traverse(connections).get([url, 'subscriptions', sid]) || null, + setSubscription: (url, sid, subscription) => { + const subs = traverse(connections).get([url, 'subscriptions']) || {} + subs[sid] = subscription + traverse(connections).set([url, 'subscriptions'], subs) + return subs[sid] + }, + markSubscriptionClosing: (url, sid) => { + if (traverse(connections).has([url, 'subscriptions', sid])) { + traverse(connections).set([url, 'subscriptions', sid, 'closeRequested'], true) + } + }, + getSubscriptionCount: url => { + const subs = traverse(connections).get([url, 'subscriptions']) + return subs ? Object.keys(subs).length : 0 + }, + deleteSubscription: (url, sid) => { + const subscriptions = traverse(connections).get([url, 'subscriptions']) + if (subscriptions && subscriptions[sid]) { + delete subscriptions[sid] + } + }, + } +} + +const state = store() /** * Opens a new stream. @@ -59,7 +100,7 @@ export default async ( endpoint = '/console/internal/events/', timeout = 10000, ) => { - const subscriptionId = Date.now() + const subscriptionId = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER) const subscriptionPayload = JSON.stringify({ type: MESSAGE_TYPES.SUBSCRIBE, id: subscriptionId, @@ -70,16 +111,23 @@ export default async ( id: subscriptionId, }) const url = baseUrl + endpoint + let wsInstance = state.getInstance(url) await Promise.race([ new Promise(async (resolve, reject) => { // Add the new subscription to the subscriptions object. - // Also add the resolver function to the subscription object to be able + // Also add the resolver functions to the subscription object to be able // to resolve the promise after the subscription confirmation message. - subscriptions = { - ...subscriptions, - [subscriptionId]: { ...initialListeners, url, _resolver: resolve }, + if (state.getSubscription(url, subscriptionId) !== null) { + reject(new Error('Subscription with the same ID already exists')) } + state.setSubscription(url, subscriptionId, { + ...initialListeners, + url, + resolve, + reject, + closeRequested: false, + }) try { const token = new Token().get() @@ -87,94 +135,110 @@ export default async ( const baseUrlParsed = baseUrl.replace('http', 'ws') // Open up the WebSocket connection if it doesn't exist. - if (!wsInstances[url]) { - wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [ - 'ttn.lorawan.v3.console.internal.events.v1', - `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, - ]) + if (!wsInstance) { + wsInstance = state.setInstance( + url, + new WebSocket(`${baseUrlParsed}${endpoint}`, [ + 'ttn.lorawan.v3.console.internal.events.v1', + `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, + ]), + ) // Broadcast connection errors to all listeners. - wsInstances[url].addEventListener('error', error => { - Object.values(subscriptions) - .filter(s => s.url === url) - .forEach(s => notify(s[EVENTS.ERROR], new Error(error))) - // The error is an error event, but we should only throw proper errors. - // It has an optional error code that we could use to map to a proper error. - // However, the error codes are optional and not always used. - reject(new Error('Error in WebSocket connection')) + wsInstance.addEventListener('error', () => { + const err = new Error('Error in WebSocket connection') + const subscriptions = state.getSubscriptions(url) + for (const s of subscriptions) { + notify(s[EVENTS.ERROR], err) + // The error is an error event, but we should only throw proper errors. + // It has an optional error code that we could use to map to a proper error. + // However, the error codes are optional and not always used. + s.reject(err) + } }) // Event listener for 'close' - wsInstances[url].addEventListener('close', closeEvent => { + wsInstance.addEventListener('close', closeEvent => { // TODO: Handle close event codes. // https://github.com/TheThingsNetwork/lorawan-stack/issues/6752 + const subscriptions = state.getSubscriptions(url) + const wasClean = closeEvent?.wasClean ?? false - delete wsInstances[url] - Object.values(subscriptions) - .filter(s => s.url === url) - .forEach(s => notify(s[EVENTS.CLOSE], closeRequested)) - - if (closeRequested) { - resolve() - } else { - reject( - new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`), - ) + for (const s of subscriptions) { + notify(s[EVENTS.CLOSE], wasClean) + if (wasClean) { + s.resolve() + } else { + s.reject( + new Error( + `WebSocket connection closed unexpectedly with code ${closeEvent.code}`, + ), + ) + } } + + state.deleteInstance(url) }) // After the WebSocket connection is open, add the event listeners. // Wait for the subscription confirmation message before resolving. - wsInstances[url].addEventListener('message', ({ data }) => { + wsInstance.addEventListener('message', ({ data }) => { const dataParsed = JSON.parse(data) - const listeners = subscriptions[dataParsed.id] + const sid = dataParsed.id + const subscription = state.getSubscription(url, sid) - if (!listeners) { - warn('Message received for closed or unknown subscription with ID', dataParsed.id) + if (!subscription) { + warn('Message received for closed or unknown subscription with ID', sid) return } if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { - notify(listeners[EVENTS.OPEN]) + notify(subscription[EVENTS.OPEN]) // Resolve the promise after the subscription confirmation message. - listeners._resolver() + subscription.resolve() } if (dataParsed.type === MESSAGE_TYPES.ERROR) { - notify(listeners[EVENTS.ERROR], dataParsed) + notify(subscription[EVENTS.ERROR], dataParsed) } if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { - notify(listeners[EVENTS.MESSAGE], dataParsed.event) + notify(subscription[EVENTS.MESSAGE], dataParsed.event) } if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { - notify(listeners[EVENTS.CLOSE], closeRequested) - // Remove the subscription. - delete subscriptions[dataParsed.id] - if (!Object.values(subscriptions).some(s => s.url === url)) { - wsInstances[url].close() + notify(subscription[EVENTS.CLOSE], subscription.closeRequested) + // Remove the subscription + state.deleteSubscription(url, sid) + if (state.getSubscriptionCount(url) === 0) { + wsInstance.close() + state.deleteInstance(url) } } }) } - if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) { + if (wsInstance.readyState === WebSocket.OPEN) { // If the WebSocket connection is already open, only add the subscription. - wsInstances[url].send(subscriptionPayload) - } else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.CONNECTING) { + wsInstance.send(subscriptionPayload) + } else if (wsInstance.readyState === WebSocket.CONNECTING) { // Otherwise wait for the connection to open and then add the subscription. - wsInstances[url].addEventListener('open', () => { - wsInstances[url].send(subscriptionPayload) - }) + const onOpen = () => { + wsInstance.send(subscriptionPayload) + wsInstance.removeEventListener('open', onOpen) + } + wsInstance.addEventListener('open', onOpen) + } else { + reject(new Error('WebSocket connection is closed')) } } catch (error) { const err = error instanceof Error ? error : new Error(error) - Object.values(subscriptions) - .filter(s => s.url === url) - .forEach(s => notify(s[EVENTS.ERROR], err)) - reject(err) + const subscriptions = state.getSubscriptions(url) + for (const s of subscriptions) { + notify(s[EVENTS.ERROR], err) + s.reject(err) + } } }), new Promise((resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)), @@ -189,25 +253,35 @@ export default async ( `${eventName} event is not supported. Should be one of: open, message, error or close`, ) } - subscriptions[subscriptionId][eventName] = callback + const subscription = state.getSubscription(url, subscriptionId) + subscription[eventName] = callback return this }, close: () => { - if (wsInstances[url]) { - closeRequested = true - wsInstances[url].send(unsubscribePayload) - - // Wait for the server to confirm the unsubscribe. - return new Promise(resolve => { - wsInstances[url].addEventListener('message', ({ data }) => { - const { type, id } = JSON.parse(data) - if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) { - resolve() - } - }) - }) + if ( + !wsInstance || + wsInstance.readyState === WebSocket.CLOSED || + wsInstance.readyState === WebSocket.CLOSING + ) { + warn('WebSocket was already closed') + return Promise.resolve() } + + state.markSubscriptionClosing(url, subscriptionId) + wsInstance.send(unsubscribePayload) + + // Wait for the server to confirm the unsubscribe. + return new Promise(resolve => { + const onMessage = ({ data }) => { + const { type, id } = JSON.parse(data) + if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) { + resolve() + } + wsInstance.removeEventListener('message', onMessage) + } + wsInstance.addEventListener('message', onMessage) + }) }, } } From b785ee440ccb59d26ce34ceace12d1f5ebcc1926 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Thu, 14 Dec 2023 14:49:12 +0100 Subject: [PATCH 04/15] console: Refactor subscription state --- .../api/stream/subscribeToWebSocketStream.js | 383 +++++++++--------- 1 file changed, 186 insertions(+), 197 deletions(-) diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index ca38abb5a0..40ea4862c0 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -21,47 +21,184 @@ import { notify, EVENTS, MESSAGE_TYPES } from './shared' const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}) -const store = () => { - const connections = {} - - return { - getInstance: url => traverse(connections).get([url, 'instance']), - setInstance: (url, instance) => { - traverse(connections).set([url, 'instance'], instance) - return instance - }, - deleteInstance: url => { - if (url in connections) { - delete connections[url] +const newSubscription = (resolve, reject, unsubscribe) => { + const listeners = { ...initialListeners } + let closeRequested = false + const externalSubscription = { + on: (eventName, callback) => { + if (!Object.values(EVENTS).includes(eventName)) { + throw new Error( + `${eventName} event is not supported. Should be one of: open, message, error or close`, + ) } + listeners[eventName] = callback + return externalSubscription }, - getSubscriptions: url => Object.values(traverse(connections).get([url, 'subscriptions'] || {})), - getSubscription: (url, sid) => traverse(connections).get([url, 'subscriptions', sid]) || null, - setSubscription: (url, sid, subscription) => { - const subs = traverse(connections).get([url, 'subscriptions']) || {} - subs[sid] = subscription - traverse(connections).set([url, 'subscriptions'], subs) - return subs[sid] + close: () => { + closeRequested = true + return unsubscribe() }, - markSubscriptionClosing: (url, sid) => { - if (traverse(connections).has([url, 'subscriptions', sid])) { - traverse(connections).set([url, 'subscriptions', sid, 'closeRequested'], true) - } + } + return { + onError: err => { + notify(listeners[EVENTS.ERROR], err) + // If an error occurs while we are trying to subscribe, we should reject + // the promise in order to propagate the implicit subscription failure. + reject(err) + }, + onClose: closeEvent => { + notify(listeners[EVENTS.CLOSE], closeRequested) + // If the connection has been closed while we are trying subscribe, we should + // reject the promise in order to propagate the implicit subscription failure. + reject(new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`)) }, - getSubscriptionCount: url => { - const subs = traverse(connections).get([url, 'subscriptions']) - return subs ? Object.keys(subs).length : 0 + onMessage: dataParsed => { + if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { + notify(listeners[EVENTS.OPEN]) + // Resolve the promise after the subscription confirmation message. + resolve(externalSubscription) + } + + if (dataParsed.type === MESSAGE_TYPES.ERROR) { + notify(listeners[EVENTS.ERROR], dataParsed.error) + } + + if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { + notify(listeners[EVENTS.MESSAGE], dataParsed.event) + } + + if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { + notify(listeners[EVENTS.CLOSE], closeRequested) + } }, - deleteSubscription: (url, sid) => { - const subscriptions = traverse(connections).get([url, 'subscriptions']) - if (subscriptions && subscriptions[sid]) { + } +} + +const newInstance = (wsInstance, onClose) => { + const subscriptions = {} + let closeRequested = false + + // Broadcast connection errors to all subscriptions. + wsInstance.addEventListener('error', () => { + const err = new Error('Error in WebSocket connection') + for (const subscription of Object.values(subscriptions)) { + subscription.onError(err) + } + }) + + // Broadcast connection closure to all subscriptions. + wsInstance.addEventListener('close', closeEvent => { + if (closeRequested) { + // If the close has been requested already, the instance has been + // deregistered and there are no subscriptions left. + return + } + // TODO: Handle close event codes. + // https://github.com/TheThingsNetwork/lorawan-stack/issues/6752 + for (const subscription of Object.values(subscriptions)) { + subscription.onClose(closeEvent) + } + onClose() + }) + + // Broadcast messages to the correct subscription. + wsInstance.addEventListener('message', ({ data }) => { + const dataParsed = JSON.parse(data) + const sid = dataParsed.id + const subscription = traverse(subscriptions).get([sid]) || null + + if (!subscription) { + warn('Message received for closed or unknown subscription with ID', sid) + return + } + + subscription.onMessage(dataParsed) + + if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { + delete subscriptions[sid] + if (Object.keys(subscriptions).length === 0) { + closeRequested = true + wsInstance.close() + onClose() + } + } + }) + + return { + subscribe: (sid, resolve, reject, subscribePayload, unsubscribePayload) => { + if (sid in subscriptions) { + throw new Error(`Subscription with ID ${sid} already exists`) + } + + let unsubscribed = null + const unsubscribe = () => { + if (unsubscribed) { + return unsubscribed + } + + if ( + wsInstance.readyState === WebSocket.CLOSED || + wsInstance.readyState === WebSocket.CLOSING + ) { + warn('WebSocket was already closed') + return Promise.resolve() + } + + wsInstance.send(unsubscribePayload) + + // Wait for the server to confirm the unsubscribe. + unsubscribed = new Promise(resolve => { + const onMessage = ({ data }) => { + const { type, id } = JSON.parse(data) + if (id === sid && type === MESSAGE_TYPES.UNSUBSCRIBE) { + resolve() + wsInstance.removeEventListener('message', onMessage) + } + } + wsInstance.addEventListener('message', onMessage) + const onClose = () => { + resolve() + wsInstance.removeEventListener('close', onClose) + } + wsInstance.addEventListener('close', onClose) + }) + return unsubscribed + } + + const subscription = newSubscription(resolve, reject, unsubscribe) + subscriptions[sid] = subscription + + if (wsInstance.readyState === WebSocket.OPEN) { + // If the WebSocket connection is already open, only add the subscription. + wsInstance.send(subscribePayload) + } else if (wsInstance.readyState === WebSocket.CONNECTING) { + // Otherwise wait for the connection to open and then add the subscription. + const onOpen = () => { + wsInstance.send(subscribePayload) + wsInstance.removeEventListener('open', onOpen) + } + wsInstance.addEventListener('open', onOpen) + } else { delete subscriptions[sid] + throw new Error('WebSocket connection is closed') } }, } } -const state = store() +const newStore = () => { + const connections = {} + return { + getInstance: url => traverse(connections).get([url]), + setInstance: (url, wsInstance) => + traverse(connections).set( + [url], + newInstance(wsInstance, () => delete connections[url]), + ), + } +} + +const state = newStore() /** * Opens a new stream. @@ -111,177 +248,29 @@ export default async ( id: subscriptionId, }) const url = baseUrl + endpoint - let wsInstance = state.getInstance(url) + const token = new Token().get() + const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token + const baseUrlParsed = baseUrl.replace('http', 'ws') + + return await Promise.race([ + new Promise((resolve, reject) => { + let instance = state.getInstance(url) + // Open up the WebSocket connection if it doesn't exist. + if (!instance) { + instance = state.setInstance( + url, + new WebSocket(`${baseUrlParsed}${endpoint}`, [ + 'ttn.lorawan.v3.console.internal.events.v1', + `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, + ]), + ) + } - await Promise.race([ - new Promise(async (resolve, reject) => { // Add the new subscription to the subscriptions object. // Also add the resolver functions to the subscription object to be able // to resolve the promise after the subscription confirmation message. - if (state.getSubscription(url, subscriptionId) !== null) { - reject(new Error('Subscription with the same ID already exists')) - } - state.setSubscription(url, subscriptionId, { - ...initialListeners, - url, - resolve, - reject, - closeRequested: false, - }) - - try { - const token = new Token().get() - const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token - const baseUrlParsed = baseUrl.replace('http', 'ws') - - // Open up the WebSocket connection if it doesn't exist. - if (!wsInstance) { - wsInstance = state.setInstance( - url, - new WebSocket(`${baseUrlParsed}${endpoint}`, [ - 'ttn.lorawan.v3.console.internal.events.v1', - `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, - ]), - ) - - // Broadcast connection errors to all listeners. - wsInstance.addEventListener('error', () => { - const err = new Error('Error in WebSocket connection') - const subscriptions = state.getSubscriptions(url) - for (const s of subscriptions) { - notify(s[EVENTS.ERROR], err) - // The error is an error event, but we should only throw proper errors. - // It has an optional error code that we could use to map to a proper error. - // However, the error codes are optional and not always used. - s.reject(err) - } - }) - - // Event listener for 'close' - wsInstance.addEventListener('close', closeEvent => { - // TODO: Handle close event codes. - // https://github.com/TheThingsNetwork/lorawan-stack/issues/6752 - const subscriptions = state.getSubscriptions(url) - const wasClean = closeEvent?.wasClean ?? false - - for (const s of subscriptions) { - notify(s[EVENTS.CLOSE], wasClean) - if (wasClean) { - s.resolve() - } else { - s.reject( - new Error( - `WebSocket connection closed unexpectedly with code ${closeEvent.code}`, - ), - ) - } - } - - state.deleteInstance(url) - }) - - // After the WebSocket connection is open, add the event listeners. - // Wait for the subscription confirmation message before resolving. - wsInstance.addEventListener('message', ({ data }) => { - const dataParsed = JSON.parse(data) - const sid = dataParsed.id - const subscription = state.getSubscription(url, sid) - - if (!subscription) { - warn('Message received for closed or unknown subscription with ID', sid) - - return - } - - if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { - notify(subscription[EVENTS.OPEN]) - // Resolve the promise after the subscription confirmation message. - subscription.resolve() - } - - if (dataParsed.type === MESSAGE_TYPES.ERROR) { - notify(subscription[EVENTS.ERROR], dataParsed) - } - - if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { - notify(subscription[EVENTS.MESSAGE], dataParsed.event) - } - - if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { - notify(subscription[EVENTS.CLOSE], subscription.closeRequested) - // Remove the subscription - state.deleteSubscription(url, sid) - if (state.getSubscriptionCount(url) === 0) { - wsInstance.close() - state.deleteInstance(url) - } - } - }) - } - - if (wsInstance.readyState === WebSocket.OPEN) { - // If the WebSocket connection is already open, only add the subscription. - wsInstance.send(subscriptionPayload) - } else if (wsInstance.readyState === WebSocket.CONNECTING) { - // Otherwise wait for the connection to open and then add the subscription. - const onOpen = () => { - wsInstance.send(subscriptionPayload) - wsInstance.removeEventListener('open', onOpen) - } - wsInstance.addEventListener('open', onOpen) - } else { - reject(new Error('WebSocket connection is closed')) - } - } catch (error) { - const err = error instanceof Error ? error : new Error(error) - const subscriptions = state.getSubscriptions(url) - for (const s of subscriptions) { - notify(s[EVENTS.ERROR], err) - s.reject(err) - } - } + instance.subscribe(subscriptionId, resolve, reject, subscriptionPayload, unsubscribePayload) }), - new Promise((resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)), + new Promise((_resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)), ]) - - // Return an observer object with the `on` and `close` functions for - // the current subscription. - return { - on(eventName, callback) { - if (!Object.values(EVENTS).includes(eventName)) { - throw new Error( - `${eventName} event is not supported. Should be one of: open, message, error or close`, - ) - } - const subscription = state.getSubscription(url, subscriptionId) - subscription[eventName] = callback - - return this - }, - close: () => { - if ( - !wsInstance || - wsInstance.readyState === WebSocket.CLOSED || - wsInstance.readyState === WebSocket.CLOSING - ) { - warn('WebSocket was already closed') - return Promise.resolve() - } - - state.markSubscriptionClosing(url, subscriptionId) - wsInstance.send(unsubscribePayload) - - // Wait for the server to confirm the unsubscribe. - return new Promise(resolve => { - const onMessage = ({ data }) => { - const { type, id } = JSON.parse(data) - if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) { - resolve() - } - wsInstance.removeEventListener('message', onMessage) - } - wsInstance.addEventListener('message', onMessage) - }) - }, - } } From 3e5829f1dbdda893c79dc849159f3cf4c08ddd9d Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Thu, 14 Dec 2023 23:56:09 +0100 Subject: [PATCH 05/15] console: Use static listeners --- .../console/store/middleware/logics/events.js | 39 +++++++----- .../api/stream/subscribeToWebSocketStream.js | 59 ++++++++++--------- sdk/js/src/service/applications.js | 6 +- sdk/js/src/service/devices/index.js | 6 +- sdk/js/src/service/gateways.js | 6 +- sdk/js/src/service/organizations.js | 4 +- sdk/js/src/util/combine-streams.js | 33 ++++++----- 7 files changed, 86 insertions(+), 67 deletions(-) diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index 513fb8ef0b..69957039d2 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -19,7 +19,13 @@ import CONNECTION_STATUS from '@console/constants/connection-status' import EVENT_TAIL from '@console/constants/event-tail' import { getCombinedDeviceId } from '@ttn-lw/lib/selectors/id' -import { isUnauthenticatedError, isNetworkError, isTimeoutError } from '@ttn-lw/lib/errors/utils' +import { + isUnauthenticatedError, + isPermissionDeniedError, + isNetworkError, + isTimeoutError, +} from '@ttn-lw/lib/errors/utils' +import { TokenError } from '@ttn-lw/lib/errors/custom-errors' import { SET_CONNECTION_STATUS, setStatusChecking } from '@ttn-lw/lib/store/actions/status' import { selectIsOnlineStatus } from '@ttn-lw/lib/store/selectors/status' @@ -131,22 +137,24 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const filterRegExp = Boolean(filter) ? filter.filterRegExp : undefined try { - channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after) + const listeners = { + message: message => dispatch(getEventSuccess(id, message)), + error: error => dispatch(getEventFailure(id, error)), + close: wasClientRequest => { + dispatch(closeEvents(id, { silent: wasClientRequest })) + channel?.close() + channel = null + }, + } + channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after, listeners) dispatch(startEventsSuccess(id, { silent })) - - channel.on('message', message => dispatch(getEventSuccess(id, message))) - channel.on('error', error => dispatch(getEventFailure(id, error))) - channel.on('close', wasClientRequest => { - dispatch(closeEvents(id, { silent: wasClientRequest })) - channel = null - }) } catch (error) { - if (isUnauthenticatedError(error)) { + if ( + error instanceof TokenError && + (isUnauthenticatedError(error?.cause) || isPermissionDeniedError(error?.cause)) + ) { // The user is no longer authenticated; reinitiate the auth flow // by refreshing the page. - // NOTE: As a result of the WebSocket refactor, the error shape is - // now very unspecific and authentication errors like before are - // not thrown anymore. This should be addressed eventually. window.location.reload() } else { dispatch(startEventsFailure(id, error)) @@ -177,7 +185,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { }, process: async ({ action }, dispatch, done) => { if (action.type === START_EVENTS_FAILURE) { - if (action?.error?.message === 'timeout') { + if (action.error?.message === 'timeout') { // Set the connection status to `checking` to trigger connection checks // and detect possible offline state. dispatch(setStatusChecking()) @@ -211,8 +219,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const status = selectEntityEventsStatus(getState(), id) const connected = status === CONNECTION_STATUS.CONNECTED const interrupted = selectEntityEventsInterrupted(getState(), id) - if (!connected && interrupted) { + if (!connected || interrupted) { reject() + return } allow(action) diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index 40ea4862c0..c6d86876cf 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -15,25 +15,15 @@ import traverse from 'traverse' import Token from '../../util/token' -import { warn } from '../../../../../pkg/webui/lib/log' import { notify, EVENTS, MESSAGE_TYPES } from './shared' const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}) -const newSubscription = (resolve, reject, unsubscribe) => { - const listeners = { ...initialListeners } +const newSubscription = (unsubscribe, newListeners, resolve, reject) => { + const listeners = { ...initialListeners, ...newListeners } let closeRequested = false const externalSubscription = { - on: (eventName, callback) => { - if (!Object.values(EVENTS).includes(eventName)) { - throw new Error( - `${eventName} event is not supported. Should be one of: open, message, error or close`, - ) - } - listeners[eventName] = callback - return externalSubscription - }, close: () => { closeRequested = true return unsubscribe() @@ -108,7 +98,6 @@ const newInstance = (wsInstance, onClose) => { const subscription = traverse(subscriptions).get([sid]) || null if (!subscription) { - warn('Message received for closed or unknown subscription with ID', sid) return } @@ -125,7 +114,7 @@ const newInstance = (wsInstance, onClose) => { }) return { - subscribe: (sid, resolve, reject, subscribePayload, unsubscribePayload) => { + subscribe: (sid, subscribePayload, unsubscribePayload, listeners, resolve, reject) => { if (sid in subscriptions) { throw new Error(`Subscription with ID ${sid} already exists`) } @@ -140,7 +129,6 @@ const newInstance = (wsInstance, onClose) => { wsInstance.readyState === WebSocket.CLOSED || wsInstance.readyState === WebSocket.CLOSING ) { - warn('WebSocket was already closed') return Promise.resolve() } @@ -165,7 +153,7 @@ const newInstance = (wsInstance, onClose) => { return unsubscribed } - const subscription = newSubscription(resolve, reject, unsubscribe) + const subscription = newSubscription(unsubscribe, listeners, resolve, reject) subscriptions[sid] = subscription if (wsInstance.readyState === WebSocket.OPEN) { @@ -206,37 +194,43 @@ const state = newStore() * @async * @param {object} payload - - The body of the initial request. * @param {string} baseUrl - The stream baseUrl. + * @param {object} listeners - The listeners object. * @param {string} endpoint - The stream endpoint. - * @param {number} timeout - The timeout for the stream. + * @param {number} timeout - The connection timeout for the stream. * * @example * (async () => { * const stream = await stream( * { identifiers: [{ application_ids: { application_id: 'my-app' }}]}, * 'http://localhost:8080', - * '/api/v3', + * { + * message: ({ data }) => console.log('received data', JSON.parse(data)), + * error: error => console.log(error), + * close: wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server'), + * }, * ) * - * // Add listeners to the stream. - * stream - * .on('open', () => console.log('conn opened')) - * .on('message', ({ data }) => console.log('received data', JSON.parse(data))) - * .on('error', error => console.log(error)) - * .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server')) - * - * // Close the stream after 20 s. + * // Close the stream after 20 s. * setTimeout(() => stream.close(), 20000) * })() * - * @returns {object} The stream subscription object with the `on` function for - * attaching listeners and the `close` function to close the stream. + * @returns {object} The stream subscription object the `close` function to close the stream. */ export default async ( payload, baseUrl, + listeners, endpoint = '/console/internal/events/', timeout = 10000, ) => { + for (const eventName of Object.keys(listeners)) { + if (!Object.values(EVENTS).includes(eventName)) { + throw new Error( + `${eventName} event is not supported. Should be one of: open, message, error or close`, + ) + } + } + const subscriptionId = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER) const subscriptionPayload = JSON.stringify({ type: MESSAGE_TYPES.SUBSCRIBE, @@ -269,7 +263,14 @@ export default async ( // Add the new subscription to the subscriptions object. // Also add the resolver functions to the subscription object to be able // to resolve the promise after the subscription confirmation message. - instance.subscribe(subscriptionId, resolve, reject, subscriptionPayload, unsubscribePayload) + instance.subscribe( + subscriptionId, + subscriptionPayload, + unsubscribePayload, + listeners, + resolve, + reject, + ) }), new Promise((_resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)), ]) diff --git a/sdk/js/src/service/applications.js b/sdk/js/src/service/applications.js index 42cde4598e..18d91ef420 100644 --- a/sdk/js/src/service/applications.js +++ b/sdk/js/src/service/applications.js @@ -217,7 +217,7 @@ class Applications { // Events Stream - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(id => ({ application_ids: { application_id: id }, @@ -236,7 +236,9 @@ class Applications { distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) + const streams = [...baseUrls].map(baseUrl => + subscribeToWebSocketStream(payload, baseUrl, listeners), + ) // Combine all stream sources to one subscription generator. return combineStreams(streams) diff --git a/sdk/js/src/service/devices/index.js b/sdk/js/src/service/devices/index.js index 5b8e0021d4..cacc66dcb9 100644 --- a/sdk/js/src/service/devices/index.js +++ b/sdk/js/src/service/devices/index.js @@ -680,7 +680,7 @@ class Devices { // Events Stream - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(ids => ({ device_ids: ids, @@ -699,7 +699,9 @@ class Devices { distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) + const streams = [...baseUrls].map(baseUrl => + subscribeToWebSocketStream(payload, baseUrl, listeners), + ) // Combine all stream sources to one subscription generator. return combineStreams(streams) diff --git a/sdk/js/src/service/gateways.js b/sdk/js/src/service/gateways.js index 41f5d820a7..19bb818196 100644 --- a/sdk/js/src/service/gateways.js +++ b/sdk/js/src/service/gateways.js @@ -240,7 +240,7 @@ class Gateways { // Events Stream - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(id => ({ gateway_ids: { gateway_id: id }, @@ -262,7 +262,9 @@ class Gateways { distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) + const streams = [...baseUrls].map(baseUrl => + subscribeToWebSocketStream(payload, baseUrl, listeners), + ) // Combine all stream sources to one subscription generator. return combineStreams(streams) diff --git a/sdk/js/src/service/organizations.js b/sdk/js/src/service/organizations.js index 5469acd031..e63022185e 100644 --- a/sdk/js/src/service/organizations.js +++ b/sdk/js/src/service/organizations.js @@ -156,7 +156,7 @@ class Organizations { // Events stream. - async openStream(identifiers, names, tail, after) { + async openStream(identifiers, names, tail, after, listeners) { const payload = { identifiers: identifiers.map(id => ({ organization_ids: { organization_id: id }, @@ -168,7 +168,7 @@ class Organizations { const baseUrl = this._stackConfig.getComponentUrlByName(STACK_COMPONENTS_MAP.is) - return subscribeToWebSocketStream(payload, baseUrl) + return subscribeToWebSocketStream(payload, baseUrl, listeners) } } diff --git a/sdk/js/src/util/combine-streams.js b/sdk/js/src/util/combine-streams.js index 75e20855b5..d542e22ac1 100644 --- a/sdk/js/src/util/combine-streams.js +++ b/sdk/js/src/util/combine-streams.js @@ -16,8 +16,8 @@ * Combines multiple streams into a single subscription provider. * * @param {Array} streams - An array of (async) stream functions. - * @returns {object} The stream subscription object with the `on` function for - * attaching listeners and the `close` function to close the stream. + * @returns {object} The stream subscription object with the `close` function + * to close the stream. */ const combinedStream = async streams => { if (!(streams instanceof Array) || streams.length === 0) { @@ -26,19 +26,22 @@ const combinedStream = async streams => { return streams[0] } - const subscribers = await Promise.all(streams) - - return { - on: (eventName, callback) => { - for (const subscriber of subscribers) { - subscriber.on(eventName, callback) - } - }, - close: () => { - for (const subscriber of subscribers) { - subscriber.close() - } - }, + try { + const subscribers = await Promise.all(streams) + return { + close: () => Promise.all(subscribers.map(subscriber => subscriber.close())), + } + } catch (error) { + // Ensure that if only some streams fail, the successful ones are closed. + await Promise.all( + streams.map(async stream => { + try { + const subscriber = await stream + await subscriber.close() + } catch {} + }), + ) + throw error } } From c60497de339cca101d9ab108d8f5c6d1a5397758 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Fri, 15 Dec 2023 14:41:29 +0100 Subject: [PATCH 06/15] console: Make combined and individual streams semantically equivalent --- .../console/store/middleware/logics/events.js | 3 +- sdk/js/src/api/stream/shared.js | 32 +++++- .../api/stream/subscribeToWebSocketStream.js | 21 ++-- .../api/stream/subscribeToWebSocketStreams.js | 101 ++++++++++++++++++ sdk/js/src/service/applications.js | 10 +- sdk/js/src/service/devices/index.js | 10 +- sdk/js/src/service/gateways.js | 10 +- sdk/js/src/util/combine-streams.js | 48 --------- 8 files changed, 151 insertions(+), 84 deletions(-) create mode 100644 sdk/js/src/api/stream/subscribeToWebSocketStreams.js delete mode 100644 sdk/js/src/util/combine-streams.js diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index 69957039d2..9ffe0d0539 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -142,12 +142,12 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { error: error => dispatch(getEventFailure(id, error)), close: wasClientRequest => { dispatch(closeEvents(id, { silent: wasClientRequest })) - channel?.close() channel = null }, } channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after, listeners) dispatch(startEventsSuccess(id, { silent })) + channel.open() } catch (error) { if ( error instanceof TokenError && @@ -288,7 +288,6 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { }), createLogic({ type: SET_EVENT_FILTER, - debounce: 250, process: async ({ action }, dispatch, done) => { if (channel) { try { diff --git a/sdk/js/src/api/stream/shared.js b/sdk/js/src/api/stream/shared.js index 8d1b8b18ff..f2b79d2bb0 100644 --- a/sdk/js/src/api/stream/shared.js +++ b/sdk/js/src/api/stream/shared.js @@ -19,7 +19,6 @@ export const notify = (listener, ...args) => { } export const EVENTS = Object.freeze({ - OPEN: 'open', MESSAGE: 'message', ERROR: 'error', CLOSE: 'close', @@ -31,3 +30,34 @@ export const MESSAGE_TYPES = Object.freeze({ PUBLISH: 'publish', ERROR: 'error', }) + +export const INITIAL_LISTENERS = Object.freeze( + Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}), +) + +export const newQueuedListeners = listeners => { + const queue = [] + let open = false + const queuedListeners = Object.values(EVENTS).reduce( + (acc, curr) => ({ + ...acc, + [curr]: (...args) => { + if (open) { + notify(listeners[curr], ...args) + } else { + queue.push([curr, args]) + } + }, + }), + {}, + ) + return [ + () => { + open = true + for (const [event, args] of queue) { + notify(listeners[event], ...args) + } + }, + queuedListeners, + ] +} diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index c6d86876cf..6b9133c605 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -16,14 +16,13 @@ import traverse from 'traverse' import Token from '../../util/token' -import { notify, EVENTS, MESSAGE_TYPES } from './shared' +import { notify, newQueuedListeners, EVENTS, MESSAGE_TYPES, INITIAL_LISTENERS } from './shared' -const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}) - -const newSubscription = (unsubscribe, newListeners, resolve, reject) => { - const listeners = { ...initialListeners, ...newListeners } +const newSubscription = (unsubscribe, originalListeners, resolve, reject) => { let closeRequested = false + const [open, listeners] = newQueuedListeners(originalListeners) const externalSubscription = { + open, close: () => { closeRequested = true return unsubscribe() @@ -44,7 +43,6 @@ const newSubscription = (unsubscribe, newListeners, resolve, reject) => { }, onMessage: dataParsed => { if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { - notify(listeners[EVENTS.OPEN]) // Resolve the promise after the subscription confirmation message. resolve(externalSubscription) } @@ -210,11 +208,15 @@ const state = newStore() * }, * ) * + * // Start the stream in order to start dispatching events. + * stream.open() + * * // Close the stream after 20 s. * setTimeout(() => stream.close(), 20000) * })() * - * @returns {object} The stream subscription object the `close` function to close the stream. + * @returns {object} The stream subscription object the `open` function to start sending events to the listeners and + * the `close` function to close the stream. */ export default async ( payload, @@ -226,10 +228,11 @@ export default async ( for (const eventName of Object.keys(listeners)) { if (!Object.values(EVENTS).includes(eventName)) { throw new Error( - `${eventName} event is not supported. Should be one of: open, message, error or close`, + `${eventName} event is not supported. Should be one of: message, error or close`, ) } } + const filledListeners = { ...INITIAL_LISTENERS, ...listeners } const subscriptionId = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER) const subscriptionPayload = JSON.stringify({ @@ -267,7 +270,7 @@ export default async ( subscriptionId, subscriptionPayload, unsubscribePayload, - listeners, + filledListeners, resolve, reject, ) diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStreams.js b/sdk/js/src/api/stream/subscribeToWebSocketStreams.js new file mode 100644 index 0000000000..2ac1b2a05f --- /dev/null +++ b/sdk/js/src/api/stream/subscribeToWebSocketStreams.js @@ -0,0 +1,101 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { EVENTS, INITIAL_LISTENERS, notify } from './shared' +import subscribeToWebSocketStream from './subscribeToWebSocketStream' + +/* + * Subscribe to an event stream with multiple base URLs. + * Semantically equivalent to subscribeToWebSocketStream, but guarantees uniform stream closure and unaffected + * event emission in case of failure of one or more streams. + */ +export default async ( + payload, + baseUrls, + listeners, + endpoint = '/console/internal/events/', + timeout = 10000, +) => { + if (!(baseUrls instanceof Array) || baseUrls.length === 0) { + throw new Error('Cannot subscribe to events without base URLs') + } + if (baseUrls.length === 1) { + return subscribeToWebSocketStream(payload, baseUrls[0], listeners, endpoint, timeout) + } + + for (const eventName of Object.keys(listeners)) { + if (!Object.values(EVENTS).includes(eventName)) { + throw new Error( + `${eventName} event is not supported. Should be one of: message, error or close`, + ) + } + } + const filledListeners = { ...INITIAL_LISTENERS, ...listeners } + + // Interweaving multiple streams has the side effect of making the standard event listener + // state machine look erratic externally - only certain streams may fail, while others may continue + // to work. Upper layers which use the stream must not be exposed to this detail - once one + // stream fails, we need to ensure that all streams are closed and no stray events are emitted. + // + // The standard state machine guarantees that once an error or close event is emitted, no further + // message events will be emitted. It is also guaranteed that once an error event is emitted, a close + // event will follow. + let [closeAll, hadError, hadClose] = [() => {}, false, false] + const uniformListeners = { + [EVENTS.MESSAGE]: (...params) => { + if (hadClose || hadError) { + return + } + notify(filledListeners[EVENTS.MESSAGE], ...params) + }, + [EVENTS.ERROR]: (...params) => { + if (hadClose || hadError) { + return + } + hadError = true + notify(filledListeners[EVENTS.ERROR], ...params) + }, + [EVENTS.CLOSE]: (...params) => { + if (hadClose) { + return + } + hadClose = true + notify(filledListeners[EVENTS.CLOSE], ...params) + closeAll() + }, + } + + const pendingStreams = baseUrls.map(baseUrl => + subscribeToWebSocketStream(payload, baseUrl, uniformListeners, endpoint, timeout), + ) + try { + const streams = await Promise.all(pendingStreams) + closeAll = () => Promise.all(streams.map(stream => stream.close())) + return { + open: () => streams.forEach(stream => stream.open()), + close: closeAll, + } + } catch (error) { + // Ensure that if only some streams fail, the successful ones are closed. + await Promise.all( + pendingStreams.map(async pendingStream => { + try { + const stream = await pendingStream + await stream.close() + } catch {} + }), + ) + throw error + } +} diff --git a/sdk/js/src/service/applications.js b/sdk/js/src/service/applications.js index 18d91ef420..7ad00cfc22 100644 --- a/sdk/js/src/service/applications.js +++ b/sdk/js/src/service/applications.js @@ -15,8 +15,7 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream' -import combineStreams from '../util/combine-streams' +import subscribeToWebSocketStreams from '../api/stream/subscribeToWebSocketStreams' import { STACK_COMPONENTS_MAP } from '../util/constants' import Devices from './devices' @@ -235,13 +234,8 @@ class Applications { const baseUrls = new Set( distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - - const streams = [...baseUrls].map(baseUrl => - subscribeToWebSocketStream(payload, baseUrl, listeners), - ) - // Combine all stream sources to one subscription generator. - return combineStreams(streams) + return subscribeToWebSocketStreams(payload, [...baseUrls], listeners) } } diff --git a/sdk/js/src/service/devices/index.js b/sdk/js/src/service/devices/index.js index cacc66dcb9..a3a975a1d2 100644 --- a/sdk/js/src/service/devices/index.js +++ b/sdk/js/src/service/devices/index.js @@ -19,12 +19,11 @@ import traverse from 'traverse' import { notify, EVENTS } from '../../api/stream/shared' import Marshaler from '../../util/marshaler' -import subscribeToWebSocketStream from '../../api/stream/subscribeToWebSocketStream' +import subscribeToWebSocketStreams from '../../api/stream/subscribeToWebSocketStreams' import deviceEntityMap from '../../../generated/device-entity-map.json' import DownlinkQueue from '../downlink-queue' import { STACK_COMPONENTS_MAP } from '../../util/constants' import DeviceClaim from '../claim' -import combineStreams from '../../util/combine-streams' import Repository from './repository' import { splitSetPaths, splitGetPaths, makeRequests } from './split' @@ -698,13 +697,8 @@ class Devices { const baseUrls = new Set( distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - - const streams = [...baseUrls].map(baseUrl => - subscribeToWebSocketStream(payload, baseUrl, listeners), - ) - // Combine all stream sources to one subscription generator. - return combineStreams(streams) + return subscribeToWebSocketStreams(payload, [...baseUrls], listeners) } async simulateUplink(applicationId, deviceId, uplink) { diff --git a/sdk/js/src/service/gateways.js b/sdk/js/src/service/gateways.js index 19bb818196..2f16e0fd03 100644 --- a/sdk/js/src/service/gateways.js +++ b/sdk/js/src/service/gateways.js @@ -15,9 +15,8 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream' +import subscribeToWebSocketStreams from '../api/stream/subscribeToWebSocketStreams' import { STACK_COMPONENTS_MAP } from '../util/constants' -import combineStreams from '../util/combine-streams' import ApiKeys from './api-keys' import Collaborators from './collaborators' @@ -261,13 +260,8 @@ class Gateways { const baseUrls = new Set( distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - - const streams = [...baseUrls].map(baseUrl => - subscribeToWebSocketStream(payload, baseUrl, listeners), - ) - // Combine all stream sources to one subscription generator. - return combineStreams(streams) + return subscribeToWebSocketStreams(payload, [...baseUrls], listeners) } // Gateway Configuration Server. diff --git a/sdk/js/src/util/combine-streams.js b/sdk/js/src/util/combine-streams.js deleted file mode 100644 index d542e22ac1..0000000000 --- a/sdk/js/src/util/combine-streams.js +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * Combines multiple streams into a single subscription provider. - * - * @param {Array} streams - An array of (async) stream functions. - * @returns {object} The stream subscription object with the `close` function - * to close the stream. - */ -const combinedStream = async streams => { - if (!(streams instanceof Array) || streams.length === 0) { - throw new Error('Cannot combine streams with invalid stream array.') - } else if (streams.length === 1) { - return streams[0] - } - - try { - const subscribers = await Promise.all(streams) - return { - close: () => Promise.all(subscribers.map(subscriber => subscriber.close())), - } - } catch (error) { - // Ensure that if only some streams fail, the successful ones are closed. - await Promise.all( - streams.map(async stream => { - try { - const subscriber = await stream - await subscriber.close() - } catch {} - }), - ) - throw error - } -} - -export default combinedStream From 0b2461558c2e4cea082a848da8a0dcbfb8fc102f Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Fri, 15 Dec 2023 15:59:22 +0100 Subject: [PATCH 07/15] console: Unify subscription closure promise --- .../api/stream/subscribeToWebSocketStream.js | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index 6b9133c605..4c7a5db8ac 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -18,7 +18,7 @@ import Token from '../../util/token' import { notify, newQueuedListeners, EVENTS, MESSAGE_TYPES, INITIAL_LISTENERS } from './shared' -const newSubscription = (unsubscribe, originalListeners, resolve, reject) => { +const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolveClose) => { let closeRequested = false const [open, listeners] = newQueuedListeners(originalListeners) const externalSubscription = { @@ -37,6 +37,7 @@ const newSubscription = (unsubscribe, originalListeners, resolve, reject) => { }, onClose: closeEvent => { notify(listeners[EVENTS.CLOSE], closeRequested) + resolveClose() // If the connection has been closed while we are trying subscribe, we should // reject the promise in order to propagate the implicit subscription failure. reject(new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`)) @@ -57,6 +58,7 @@ const newSubscription = (unsubscribe, originalListeners, resolve, reject) => { if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { notify(listeners[EVENTS.CLOSE], closeRequested) + resolveClose() } }, } @@ -117,41 +119,26 @@ const newInstance = (wsInstance, onClose) => { throw new Error(`Subscription with ID ${sid} already exists`) } - let unsubscribed = null + // The `unsubscribed` promise is used in order to guarantee that calls to the `close` method + // of the subscription finish _after_ the closure events have been emitted. Callers can expect + // that after `close` resolves, no further events will be emitted. + let resolveClose = null + const unsubscribed = new Promise(resolve => { + resolveClose = resolve + }) + let unsubscribeCalled = false const unsubscribe = () => { - if (unsubscribed) { - return unsubscribed - } - - if ( - wsInstance.readyState === WebSocket.CLOSED || - wsInstance.readyState === WebSocket.CLOSING - ) { - return Promise.resolve() - } + if (!unsubscribeCalled) { + unsubscribeCalled = true - wsInstance.send(unsubscribePayload) - - // Wait for the server to confirm the unsubscribe. - unsubscribed = new Promise(resolve => { - const onMessage = ({ data }) => { - const { type, id } = JSON.parse(data) - if (id === sid && type === MESSAGE_TYPES.UNSUBSCRIBE) { - resolve() - wsInstance.removeEventListener('message', onMessage) - } - } - wsInstance.addEventListener('message', onMessage) - const onClose = () => { - resolve() - wsInstance.removeEventListener('close', onClose) + if (wsInstance.state === WebSocket.open) { + wsInstance.send(unsubscribePayload) } - wsInstance.addEventListener('close', onClose) - }) + } return unsubscribed } - const subscription = newSubscription(unsubscribe, listeners, resolve, reject) + const subscription = newSubscription(unsubscribe, listeners, resolve, reject, resolveClose) subscriptions[sid] = subscription if (wsInstance.readyState === WebSocket.OPEN) { From 1d7d63350c9c58be55a5460042f0e699e5fe5c60 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Fri, 15 Dec 2023 17:11:48 +0100 Subject: [PATCH 08/15] console: Make event errors informational --- pkg/webui/console/store/middleware/logics/events.js | 9 ++------- pkg/webui/console/store/reducers/events.js | 2 -- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index 9ffe0d0539..e461a3a930 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -35,7 +35,6 @@ import { createStartEventsStreamFailureActionType, createStartEventsStreamSuccessActionType, createEventStreamClosedActionType, - createGetEventMessageFailureActionType, createGetEventMessageSuccessActionType, createSetEventsFilterActionType, getEventMessageSuccess, @@ -71,7 +70,6 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const START_EVENTS_FAILURE = createStartEventsStreamFailureActionType(reducerName) const STOP_EVENTS = createStopEventsStreamActionType(reducerName) const EVENT_STREAM_CLOSED = createEventStreamClosedActionType(reducerName) - const GET_EVENT_MESSAGE_FAILURE = createGetEventMessageFailureActionType(reducerName) const GET_EVENT_MESSAGE_SUCCESS = createGetEventMessageSuccessActionType(reducerName) const SET_EVENT_FILTER = createSetEventsFilterActionType(reducerName) const startEventsSuccess = startEventsStreamSuccess(reducerName) @@ -140,10 +138,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const listeners = { message: message => dispatch(getEventSuccess(id, message)), error: error => dispatch(getEventFailure(id, error)), - close: wasClientRequest => { - dispatch(closeEvents(id, { silent: wasClientRequest })) - channel = null - }, + close: wasClientRequest => dispatch(closeEvents(id, { silent: wasClientRequest })), } channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after, listeners) dispatch(startEventsSuccess(id, { silent })) @@ -204,7 +199,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { }, }), createLogic({ - type: [GET_EVENT_MESSAGE_FAILURE, EVENT_STREAM_CLOSED], + type: EVENT_STREAM_CLOSED, cancelType: [START_EVENTS_SUCCESS, GET_EVENT_MESSAGE_SUCCESS, STOP_EVENTS], warnTimeout: 0, validate: ({ getState, action = {} }, allow, reject) => { diff --git a/pkg/webui/console/store/reducers/events.js b/pkg/webui/console/store/reducers/events.js index e16d23bfc0..c00c14b8ff 100644 --- a/pkg/webui/console/store/reducers/events.js +++ b/pkg/webui/console/store/reducers/events.js @@ -155,8 +155,6 @@ const createNamedEventReducer = (reducerName = '') => { return { ...state, ...addEvent(state, createSyntheticEventFromError(action.error)), - status: CONNECTION_STATUS.DISCONNECTED, - interrupted: true, } case PAUSE_EVENTS: return { From 19279acc80036da324f4db695d9a24a0525f7b20 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Fri, 15 Dec 2023 19:14:51 +0100 Subject: [PATCH 09/15] console: Periodically ping event clients --- pkg/console/internal/events/events.go | 9 ++++++++- pkg/console/internal/events/eventsmux/mux.go | 3 ++- .../events/subscriptions/subscriptions.go | 3 ++- .../subscriptions/subscriptions_test.go | 3 ++- pkg/console/internal/events/tasks.go | 19 +++++++++++++++++++ 5 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pkg/console/internal/events/events.go b/pkg/console/internal/events/events.go index 3422b160dc..e1f66c33db 100644 --- a/pkg/console/internal/events/events.go +++ b/pkg/console/internal/events/events.go @@ -19,6 +19,7 @@ import ( "context" "net/http" "sync" + "time" "github.com/gorilla/mux" "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" @@ -26,8 +27,10 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/middleware" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/log" + "go.thethings.network/lorawan-stack/v3/pkg/random" "go.thethings.network/lorawan-stack/v3/pkg/ratelimit" "go.thethings.network/lorawan-stack/v3/pkg/task" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" @@ -40,6 +43,9 @@ import ( const ( authorizationProtocolPrefix = "ttn.lorawan.v3.header.authorization.bearer." protocolV1 = "ttn.lorawan.v3.console.internal.events.v1" + + pingPeriod = time.Minute + pingJitter = 0.1 ) // Component is the interface of the component to the events API handler. @@ -95,7 +101,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { } defer conn.Close(websocket.StatusNormalClosure, "main task closed") - ctx, cancel := context.WithCancelCause(ctx) + ctx, cancel := errorcontext.New(ctx) defer cancel(nil) var wg sync.WaitGroup @@ -108,6 +114,7 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { "console_events_mux": makeMuxTask(m, cancel), "console_events_read": makeReadTask(conn, m, rateLimit, cancel), "console_events_write": makeWriteTask(conn, m, cancel), + "console_events_ping": makePingTask(conn, cancel, random.Jitter(pingPeriod, pingJitter)), } { wg.Add(1) h.component.StartTask(&task.Config{ diff --git a/pkg/console/internal/events/eventsmux/mux.go b/pkg/console/internal/events/eventsmux/mux.go index e0874f9c51..b7bcf062bc 100644 --- a/pkg/console/internal/events/eventsmux/mux.go +++ b/pkg/console/internal/events/eventsmux/mux.go @@ -20,6 +20,7 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/log" ) @@ -54,7 +55,7 @@ func (m *mux) Responses() <-chan protocol.Response { // Run implements Interface. func (m *mux) Run(ctx context.Context) (err error) { - ctx, cancel := context.WithCancelCause(ctx) + ctx, cancel := errorcontext.New(ctx) defer func() { cancel(err) }() subs := m.createSubs(ctx, cancel) defer subs.Close() diff --git a/pkg/console/internal/events/subscriptions/subscriptions.go b/pkg/console/internal/events/subscriptions/subscriptions.go index 0a099fffdd..faea7a4b73 100644 --- a/pkg/console/internal/events/subscriptions/subscriptions.go +++ b/pkg/console/internal/events/subscriptions/subscriptions.go @@ -22,6 +22,7 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" "go.thethings.network/lorawan-stack/v3/pkg/auth/rights/rightsutil" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/errors" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/log" @@ -141,7 +142,7 @@ func (s *subscriptions) Subscribe( return err } ch := make(chan events.Event, channelSize(tail)) - ctx, cancel := context.WithCancelCause(s.ctx) + ctx, cancel := errorcontext.New(s.ctx) defer func() { if err != nil { cancel(err) diff --git a/pkg/console/internal/events/subscriptions/subscriptions_test.go b/pkg/console/internal/events/subscriptions/subscriptions_test.go index 10aaf738bb..a397789e1c 100644 --- a/pkg/console/internal/events/subscriptions/subscriptions_test.go +++ b/pkg/console/internal/events/subscriptions/subscriptions_test.go @@ -22,6 +22,7 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" + "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/task" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" @@ -143,7 +144,7 @@ func runTestSubscriptions( _, historical := subscriber.(interface{ historical() }) a, ctx := test.New(t) - ctx, cancel := context.WithCancelCause(ctx) + ctx, cancel := errorcontext.New(ctx) defer cancel(nil) timeout := test.Delay << 3 diff --git a/pkg/console/internal/events/tasks.go b/pkg/console/internal/events/tasks.go index af7024a5b6..298cb1f3f6 100644 --- a/pkg/console/internal/events/tasks.go +++ b/pkg/console/internal/events/tasks.go @@ -18,6 +18,7 @@ import ( "context" "errors" "io" + "time" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" @@ -79,3 +80,21 @@ func makeWriteTask(conn *websocket.Conn, m eventsmux.Interface, cancel func(erro } } } + +func makePingTask(conn *websocket.Conn, cancel func(error), period time.Duration) func(context.Context) error { + return func(ctx context.Context) (err error) { + ticker := time.NewTicker(period) + defer ticker.Stop() + defer func() { cancel(err) }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := conn.Ping(ctx); err != nil { + return err + } + } + } + } +} From 390b33ae231427094b29e9c6940b3eaae00fe5c0 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Fri, 15 Dec 2023 19:29:40 +0100 Subject: [PATCH 10/15] console: Keep connections alive with no subscriptions --- sdk/js/src/api/stream/subscribeToWebSocketStream.js | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index 4c7a5db8ac..ec513d16f3 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -66,7 +66,6 @@ const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolv const newInstance = (wsInstance, onClose) => { const subscriptions = {} - let closeRequested = false // Broadcast connection errors to all subscriptions. wsInstance.addEventListener('error', () => { @@ -78,11 +77,6 @@ const newInstance = (wsInstance, onClose) => { // Broadcast connection closure to all subscriptions. wsInstance.addEventListener('close', closeEvent => { - if (closeRequested) { - // If the close has been requested already, the instance has been - // deregistered and there are no subscriptions left. - return - } // TODO: Handle close event codes. // https://github.com/TheThingsNetwork/lorawan-stack/issues/6752 for (const subscription of Object.values(subscriptions)) { @@ -105,11 +99,6 @@ const newInstance = (wsInstance, onClose) => { if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { delete subscriptions[sid] - if (Object.keys(subscriptions).length === 0) { - closeRequested = true - wsInstance.close() - onClose() - } } }) From 5d125b611592320369fb0c23d896dabb458bd6b9 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Fri, 15 Dec 2023 22:34:28 +0100 Subject: [PATCH 11/15] console: Close unauthenticated connections --- pkg/console/internal/events/eventsmux/mux.go | 4 ++++ pkg/console/internal/events/eventsmux/mux_test.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pkg/console/internal/events/eventsmux/mux.go b/pkg/console/internal/events/eventsmux/mux.go index b7bcf062bc..8c5c2af3ba 100644 --- a/pkg/console/internal/events/eventsmux/mux.go +++ b/pkg/console/internal/events/eventsmux/mux.go @@ -18,6 +18,7 @@ package eventsmux import ( "context" + "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/protocol" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions" "go.thethings.network/lorawan-stack/v3/pkg/errorcontext" @@ -64,6 +65,9 @@ func (m *mux) Run(ctx context.Context) (err error) { case <-ctx.Done(): return ctx.Err() case req := <-m.requestCh: + if err := rights.RequireAuthenticated(ctx); err != nil { + return err + } var resp protocol.Response switch req := req.(type) { case *protocol.SubscribeRequest: diff --git a/pkg/console/internal/events/eventsmux/mux_test.go b/pkg/console/internal/events/eventsmux/mux_test.go index de220f52fa..895da2db1d 100644 --- a/pkg/console/internal/events/eventsmux/mux_test.go +++ b/pkg/console/internal/events/eventsmux/mux_test.go @@ -125,6 +125,10 @@ func TestMux(t *testing.T) { // nolint:gocyclo unique.ID(ctx, appIDs): ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL), }), }) + ctx = rights.NewContextWithAuthInfo(ctx, &ttnpb.AuthInfoResponse{ + UniversalRights: ttnpb.RightsFrom(ttnpb.Right_RIGHT_ALL), + IsAdmin: true, + }) subs := &mockSubscriptions{ ctx: ctx, From 71390d0daa2807247dad0ea11d216292ce9e230e Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Sat, 16 Dec 2023 20:40:17 +0100 Subject: [PATCH 12/15] console: Clarify reconnection logic --- pkg/webui/console/lib/events/utils.js | 26 +++- .../console/store/middleware/logics/events.js | 42 ++---- pkg/webui/console/store/reducers/events.js | 1 + sdk/js/src/api/stream/shared.js | 1 + .../api/stream/subscribeToWebSocketStream.js | 140 +++++++++++++----- 5 files changed, 136 insertions(+), 74 deletions(-) diff --git a/pkg/webui/console/lib/events/utils.js b/pkg/webui/console/lib/events/utils.js index fa1d34436b..16d2c9c66d 100644 --- a/pkg/webui/console/lib/events/utils.js +++ b/pkg/webui/console/lib/events/utils.js @@ -23,13 +23,29 @@ export const defineSyntheticEvent = name => data => ({ data, }) -export const createSyntheticEventFromError = error => { +const convertError = error => { if (error instanceof Error) { - const errorString = error.toString() - if (error.message === 'network error' || error.message === 'Error in body stream') { - return createNetworkErrorEvent({ error: errorString }) + return { + ...error, + message: error.message, + name: error.name, + // The stack is omitted intentionally, as it is not relevant for a user. } + } + return error +} - return createUnknownErrorEvent({ error: errorString }) +export const createSyntheticEventFromError = error => { + if (error instanceof Error) { + if ( + error.name === 'ConnectionError' || + error.name === 'ConnectionClosedError' || + error.name === 'ConnectionTimeoutError' + ) { + return createNetworkErrorEvent({ error: convertError(error) }) + } else if (error.name === 'ProtocolError') { + return createUnknownErrorEvent({ error: convertError(error) }) + } + return createUnknownErrorEvent({ error: convertError(error) }) } } diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index e461a3a930..661689aa4c 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -19,12 +19,6 @@ import CONNECTION_STATUS from '@console/constants/connection-status' import EVENT_TAIL from '@console/constants/event-tail' import { getCombinedDeviceId } from '@ttn-lw/lib/selectors/id' -import { - isUnauthenticatedError, - isPermissionDeniedError, - isNetworkError, - isTimeoutError, -} from '@ttn-lw/lib/errors/utils' import { TokenError } from '@ttn-lw/lib/errors/custom-errors' import { SET_CONNECTION_STATUS, setStatusChecking } from '@ttn-lw/lib/store/actions/status' import { selectIsOnlineStatus } from '@ttn-lw/lib/store/selectors/status' @@ -144,10 +138,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { dispatch(startEventsSuccess(id, { silent })) channel.open() } catch (error) { - if ( - error instanceof TokenError && - (isUnauthenticatedError(error?.cause) || isPermissionDeniedError(error?.cause)) - ) { + if (error instanceof TokenError) { // The user is no longer authenticated; reinitiate the auth flow // by refreshing the page. window.location.reload() @@ -180,16 +171,14 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { }, process: async ({ action }, dispatch, done) => { if (action.type === START_EVENTS_FAILURE) { - if (action.error?.message === 'timeout') { - // Set the connection status to `checking` to trigger connection checks - // and detect possible offline state. - dispatch(setStatusChecking()) + // Set the connection status to `checking` to trigger connection checks + // and detect possible offline state. + dispatch(setStatusChecking()) - // In case of a network error, the connection could not be closed - // since the network connection is disrupted. We can regard this - // as equivalent to a closed connection. - return done() - } + // In case of a network error, the connection could not be closed + // since the network connection is disrupted. We can regard this + // as equivalent to a closed connection. + return done() } if (action.type === STOP_EVENTS && Boolean(channel)) { // Close the connection if it wasn't closed already. @@ -284,18 +273,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { createLogic({ type: SET_EVENT_FILTER, process: async ({ action }, dispatch, done) => { - if (channel) { - try { - await channel.close() - } catch (error) { - if (isNetworkError(error) || isTimeoutError(action.payload)) { - dispatch(setStatusChecking()) - } else { - throw error - } - } finally { - dispatch(startEvents(action.id, { silent: true })) - } + if (Boolean(channel)) { + await channel.close() + dispatch(startEvents(action.id, { silent: true })) } done() }, diff --git a/pkg/webui/console/store/reducers/events.js b/pkg/webui/console/store/reducers/events.js index c00c14b8ff..dc0e6b6022 100644 --- a/pkg/webui/console/store/reducers/events.js +++ b/pkg/webui/console/store/reducers/events.js @@ -150,6 +150,7 @@ const createNamedEventReducer = (reducerName = '') => { : state.events), error: action.error, status: CONNECTION_STATUS.DISCONNECTED, + interrupted: true, } case GET_EVENT_FAILURE: return { diff --git a/sdk/js/src/api/stream/shared.js b/sdk/js/src/api/stream/shared.js index f2b79d2bb0..e5765f20f3 100644 --- a/sdk/js/src/api/stream/shared.js +++ b/sdk/js/src/api/stream/shared.js @@ -57,6 +57,7 @@ export const newQueuedListeners = listeners => { for (const [event, args] of queue) { notify(listeners[event], ...args) } + queue.splice(0, queue.length) }, queuedListeners, ] diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index ec513d16f3..64dc6811c2 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -18,7 +18,44 @@ import Token from '../../util/token' import { notify, newQueuedListeners, EVENTS, MESSAGE_TYPES, INITIAL_LISTENERS } from './shared' -const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolveClose) => { +export class ConnectionError extends Error { + constructor(message) { + super(message) + this.name = 'ConnectionError' + } +} + +export class ConnectionClosedError extends ConnectionError { + constructor(message, code) { + super(message) + this.name = 'ConnectionClosedError' + this.code = code + } +} + +export class ConnectionTimeoutError extends ConnectionError { + constructor(message) { + super(message) + this.name = 'ConnectionTimeoutError' + } +} + +export class ProtocolError extends Error { + constructor(error) { + super(error.message) + this.name = 'ProtocolError' + this.code = error.code + this.details = error.details + } +} + +const newSubscription = ( + unsubscribe, + originalListeners, + resolveSubscribe, + rejectSubscribe, + resolveClose, +) => { let closeRequested = false const [open, listeners] = newQueuedListeners(originalListeners) const externalSubscription = { @@ -31,25 +68,25 @@ const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolv return { onError: err => { notify(listeners[EVENTS.ERROR], err) - // If an error occurs while we are trying to subscribe, we should reject - // the promise in order to propagate the implicit subscription failure. - reject(err) + + rejectSubscribe(err) }, onClose: closeEvent => { notify(listeners[EVENTS.CLOSE], closeRequested) + + rejectSubscribe(new ConnectionClosedError('WebSocket connection closed', closeEvent.code)) resolveClose() - // If the connection has been closed while we are trying subscribe, we should - // reject the promise in order to propagate the implicit subscription failure. - reject(new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`)) }, onMessage: dataParsed => { if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { - // Resolve the promise after the subscription confirmation message. - resolve(externalSubscription) + resolveSubscribe(externalSubscription) } if (dataParsed.type === MESSAGE_TYPES.ERROR) { - notify(listeners[EVENTS.ERROR], dataParsed.error) + const err = new ProtocolError(dataParsed.error) + notify(listeners[EVENTS.ERROR], err) + + rejectSubscribe(err) } if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { @@ -58,6 +95,7 @@ const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolv if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { notify(listeners[EVENTS.CLOSE], closeRequested) + resolveClose() } }, @@ -69,7 +107,7 @@ const newInstance = (wsInstance, onClose) => { // Broadcast connection errors to all subscriptions. wsInstance.addEventListener('error', () => { - const err = new Error('Error in WebSocket connection') + const err = new ConnectionError('WebSocket connection error') for (const subscription of Object.values(subscriptions)) { subscription.onError(err) } @@ -165,6 +203,20 @@ const state = newStore() /** * Opens a new stream. * + * Implementation guarantees: + * - No events will be sent to the listeners before the `open` function is called. + * - No events will be sent to the listeners after the `close` function is called. + * - The `close` function will resolve after all events have been sent to the listeners. + * - The `close` function does not throw any errors. + * - The `close` function can be called multiple times. + * - The `open` function can be called multiple times. + * - The `open` function does not throw any errors, as long as the event listeners do not throw. + * - No `message` event will follow an `error` event. + * - No `message` event will follow a `close` event. + * - No `error` event will follow a `close` event. + * - No `error` event will follow another `error` event. + * - No `close` event will follow another `close` event. + * * @async * @param {object} payload - - The body of the initial request. * @param {string} baseUrl - The stream baseUrl. @@ -225,32 +277,44 @@ export default async ( const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token const baseUrlParsed = baseUrl.replace('http', 'ws') - return await Promise.race([ - new Promise((resolve, reject) => { - let instance = state.getInstance(url) - // Open up the WebSocket connection if it doesn't exist. - if (!instance) { - instance = state.setInstance( - url, - new WebSocket(`${baseUrlParsed}${endpoint}`, [ - 'ttn.lorawan.v3.console.internal.events.v1', - `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, - ]), - ) - } - - // Add the new subscription to the subscriptions object. - // Also add the resolver functions to the subscription object to be able - // to resolve the promise after the subscription confirmation message. - instance.subscribe( - subscriptionId, - subscriptionPayload, - unsubscribePayload, - filledListeners, - resolve, - reject, + const subscribe = new Promise((resolve, reject) => { + let instance = state.getInstance(url) + // Open up the WebSocket connection if it doesn't exist. + if (!instance) { + instance = state.setInstance( + url, + new WebSocket(`${baseUrlParsed}${endpoint}`, [ + 'ttn.lorawan.v3.console.internal.events.v1', + `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, + ]), ) - }), - new Promise((_resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)), - ]) + } + + // Add the new subscription to the subscriptions object. + // Also add the resolver functions to the subscription object to be able + // to resolve the promise after the subscription confirmation message. + instance.subscribe( + subscriptionId, + subscriptionPayload, + unsubscribePayload, + filledListeners, + resolve, + reject, + ) + }) + + try { + return await Promise.race([ + subscribe, + new Promise((_resolve, reject) => + setTimeout(() => reject(new ConnectionTimeoutError('Timed out')), timeout), + ), + ]) + } catch (err) { + subscribe.then( + subscription => subscription.close(), + () => {}, + ) + throw err + } } From cc7ca65365d3a0b3737e279fef17dbc2b0dede08 Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Mon, 18 Dec 2023 11:28:22 +0100 Subject: [PATCH 13/15] console: Address review comments --- pkg/webui/console/lib/events/utils.js | 3 ++ .../console/store/middleware/logics/events.js | 35 ++++++++----------- .../api/stream/subscribeToWebSocketStream.js | 3 +- .../api/stream/subscribeToWebSocketStreams.js | 6 +++- 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/pkg/webui/console/lib/events/utils.js b/pkg/webui/console/lib/events/utils.js index 16d2c9c66d..57ed39e7d7 100644 --- a/pkg/webui/console/lib/events/utils.js +++ b/pkg/webui/console/lib/events/utils.js @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { ingestError } from '@ttn-lw/lib/errors/utils' + import { createNetworkErrorEvent, createUnknownErrorEvent } from './definitions' export const defineSyntheticEvent = name => data => ({ @@ -44,6 +46,7 @@ export const createSyntheticEventFromError = error => { ) { return createNetworkErrorEvent({ error: convertError(error) }) } else if (error.name === 'ProtocolError') { + ingestError(error.error) return createUnknownErrorEvent({ error: convertError(error) }) } return createUnknownErrorEvent({ error: convertError(error) }) diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index 661689aa4c..635602bf77 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -91,8 +91,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { }, validate: ({ getState, action = {} }, allow, reject) => { if (!action.id) { - reject() - return + return reject() } const id = typeof action.id === 'object' ? getCombinedDeviceId(action.id) : action.id @@ -104,11 +103,10 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const connected = status === CONNECTION_STATUS.CONNECTED const connecting = status === CONNECTION_STATUS.CONNECTING if (connected || connecting || !isOnline) { - reject() - return + return reject() } - allow(action) + return allow(action) }, process: async ({ getState, action }, dispatch) => { const { id, silent } = action @@ -152,8 +150,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { type: [STOP_EVENTS, START_EVENTS_FAILURE], validate: ({ getState, action = {} }, allow, reject) => { if (!action.id) { - reject() - return + return reject() } const id = typeof action.id === 'object' ? getCombinedDeviceId(action.id) : action.id @@ -163,11 +160,10 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const connected = status === CONNECTION_STATUS.CONNECTED const connecting = status === CONNECTION_STATUS.CONNECTING if (!connected && !connecting) { - reject() - return + return reject() } - allow(action) + return allow(action) }, process: async ({ action }, dispatch, done) => { if (action.type === START_EVENTS_FAILURE) { @@ -184,7 +180,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { // Close the connection if it wasn't closed already. await channel.close() } - done() + return done() }, }), createLogic({ @@ -193,8 +189,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { warnTimeout: 0, validate: ({ getState, action = {} }, allow, reject) => { if (!action.id) { - reject() - return + return reject() } const id = typeof action.id === 'object' ? getCombinedDeviceId(action.id) : action.id @@ -204,11 +199,10 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { const connected = status === CONNECTION_STATUS.CONNECTED const interrupted = selectEntityEventsInterrupted(getState(), id) if (!connected || interrupted) { - reject() - return + return reject() } - allow(action) + return allow(action) }, process: ({ getState, action }, dispatch, done) => { const isOnline = selectIsOnlineStatus(getState()) @@ -227,11 +221,11 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { dispatch(startEvents(action.id)) } else { clearInterval(reconnector) - done() + return done() } }, 5000) } else { - done() + return done() } }, }), @@ -267,17 +261,18 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { // If the app went offline, close the event stream. } - done() + return done() }, }), createLogic({ type: SET_EVENT_FILTER, + debounce: 250, process: async ({ action }, dispatch, done) => { if (Boolean(channel)) { await channel.close() dispatch(startEvents(action.id, { silent: true })) } - done() + return done() }, }), ] diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js index 64dc6811c2..bca8a83b18 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -44,8 +44,7 @@ export class ProtocolError extends Error { constructor(error) { super(error.message) this.name = 'ProtocolError' - this.code = error.code - this.details = error.details + this.error = error } } diff --git a/sdk/js/src/api/stream/subscribeToWebSocketStreams.js b/sdk/js/src/api/stream/subscribeToWebSocketStreams.js index 2ac1b2a05f..87a73dd53f 100644 --- a/sdk/js/src/api/stream/subscribeToWebSocketStreams.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStreams.js @@ -93,7 +93,11 @@ export default async ( try { const stream = await pendingStream await stream.close() - } catch {} + } catch { + // Only the pending stream promise may throw, as `close` does not throw. + // Although multiple streams may fail, we will rethrow only the first error + // and ignore the rest, as they are not really actionable. + } }), ) throw error From f758fe52c4ec2a5ff5ebf2b1a784ca2fe9ff839b Mon Sep 17 00:00:00 2001 From: Adrian-Stefan Mares Date: Tue, 19 Dec 2023 18:49:15 +0200 Subject: [PATCH 14/15] console: Disable WebSocket compression for Safari --- go.mod | 1 + go.sum | 2 ++ pkg/console/internal/events/events.go | 14 +++++++++++++- tools/go.mod | 1 + tools/go.sum | 2 ++ 5 files changed, 19 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index be8e476f53..1b7361d68f 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/klauspost/compress v1.17.2 github.com/kr/pretty v0.3.1 github.com/lib/pq v1.10.9 + github.com/mileusna/useragent v1.3.4 github.com/mitchellh/mapstructure v1.5.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/nats-io/nats-server/v2 v2.10.4 diff --git a/go.sum b/go.sum index f2db5ff523..3b61876c02 100644 --- a/go.sum +++ b/go.sum @@ -548,6 +548,8 @@ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvls github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04nTH68g= github.com/maxatome/go-testdeep v1.12.0/go.mod h1:lPZc/HAcJMP92l7yI6TRz1aZN5URwUBUAfUNvrclaNM= +github.com/mileusna/useragent v1.3.4 h1:MiuRRuvGjEie1+yZHO88UBYg8YBC/ddF6T7F56i3PCk= +github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= diff --git a/pkg/console/internal/events/events.go b/pkg/console/internal/events/events.go index e1f66c33db..3d8a106d8c 100644 --- a/pkg/console/internal/events/events.go +++ b/pkg/console/internal/events/events.go @@ -22,6 +22,7 @@ import ( "time" "github.com/gorilla/mux" + "github.com/mileusna/useragent" "go.thethings.network/lorawan-stack/v3/pkg/auth/rights" "go.thethings.network/lorawan-stack/v3/pkg/config" "go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux" @@ -90,10 +91,21 @@ func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) { return } + // Safari versions above 15 cannot handle compression correctly when the + // `NSURLSession Websocket` experimental feature is enabled (it is enabled by default). + // Versions above 17 still show the same issues, but the experimental feature is baseline. + // As such, we disable compression for Safari for all versions in order to ensure the best + // user experience. + // https://github.com/TheThingsNetwork/lorawan-stack/issues/6782 + compressionMode := websocket.CompressionContextTakeover + if ua := useragent.Parse(r.UserAgent()); ua.Name == useragent.Safari { + compressionMode = websocket.CompressionDisabled + } + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ Subprotocols: []string{protocolV1}, InsecureSkipVerify: true, // CORS is not enabled for APIs. - CompressionMode: websocket.CompressionContextTakeover, + CompressionMode: compressionMode, }) if err != nil { logger.WithError(err).Debug("Failed to accept WebSocket") diff --git a/tools/go.mod b/tools/go.mod index e767a14246..b615441896 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -150,6 +150,7 @@ require ( github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/mileusna/useragent v1.3.4 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.1 // indirect diff --git a/tools/go.sum b/tools/go.sum index d5263f63d2..187d51fe46 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -562,6 +562,8 @@ github.com/mattn/goveralls v0.0.12/go.mod h1:44ImGEUfmqH8bBtaMrYKsM65LXfNLWmwaxF github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= +github.com/mileusna/useragent v1.3.4 h1:MiuRRuvGjEie1+yZHO88UBYg8YBC/ddF6T7F56i3PCk= +github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= From 4a7d6714f585762339a941e6e50b12814c4456b3 Mon Sep 17 00:00:00 2001 From: The Things Bot Date: Wed, 20 Dec 2023 13:34:38 +0000 Subject: [PATCH 15/15] all: Enter release date of version 3.28.2 into the changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15ed3760f0..4c35ed6efe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ For details about compatibility between different releases, see the **Commitment ### Security -## [3.28.2] - unreleased +## [3.28.2] - 2023-12-20 ### Added