Skip to content

Commit

Permalink
Merge pull request #6749 from TheThingsNetwork/fix/event-stream-recon…
Browse files Browse the repository at this point in the history
…nect

Fix event stream error handling and reconnection
  • Loading branch information
kschiffer committed Dec 8, 2023
2 parents 47f3303 + 907f52f commit 8925c04
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 85 deletions.
31 changes: 16 additions & 15 deletions pkg/webui/console/store/middleware/logics/events.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2019 The Things Network Foundation, The Things Industries B.V.
// Copyright © 2023 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,6 +145,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
if (isUnauthenticatedError(error)) {
// The user is no longer authenticated; reinitiate the auth flow
// by refreshing the page.
// NOTE: As a result of the WebSocket refactor, the error shape is
// now very unspecific and authentication errors like before are
// not thrown anymore. This should be addressed eventually.
window.location.reload()
} else {
dispatch(startEventsFailure(id, error))
Expand Down Expand Up @@ -174,21 +177,16 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
allow(action)
},
process: ({ action }, dispatch, done) => {
if (channel) {
try {
channel.close()
} catch (error) {
if (isNetworkError(error) || isTimeoutError(action.payload)) {
// Set the connection status to `checking` to trigger connection checks
// and detect possible offline state.
dispatch(setStatusChecking())
if (action.error) {
if (action.error?.message === 'timeout') {
// Set the connection status to `checking` to trigger connection checks
// and detect possible offline state.
dispatch(setStatusChecking())

// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
throw error
// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
}
done()
Expand Down Expand Up @@ -245,6 +243,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
type: SET_CONNECTION_STATUS,
process: ({ getState, action }, dispatch, done) => {
const isOnline = action.payload.onlineStatus === ONLINE_STATUS.ONLINE
const isOffline = action.payload.onlineStatus === ONLINE_STATUS.OFFLINE

if (isOnline) {
const state = getState()
Expand All @@ -268,6 +267,8 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
dispatch(dispatch(startEvents(ids)))
}
}
} else if (isOffline) {
// If the app went offline, close the event stream.
}

done()
Expand Down
171 changes: 101 additions & 70 deletions sdk/js/src/api/stream/subscribeToWebSocketStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @param {object} payload - - The body of the initial request.
* @param {string} baseUrl - The stream baseUrl.
* @param {string} endpoint - The stream endpoint.
* @param {number} timeout - The timeout for the stream.
*
* @example
* (async () => {
Expand All @@ -51,7 +52,12 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @returns {object} The stream subscription object with the `on` function for
* attaching listeners and the `close` function to close the stream.
*/
export default async (payload, baseUrl, endpoint = '/console/internal/events/') => {
export default async (
payload,
baseUrl,
endpoint = '/console/internal/events/',
timeout = 10000,
) => {
const subscriptionId = Date.now()
const subscriptionPayload = JSON.stringify({
type: MESSAGE_TYPES.SUBSCRIBE,
Expand All @@ -65,84 +71,109 @@ export default async (payload, baseUrl, endpoint = '/console/internal/events/')
let closeRequested = false
const url = baseUrl + endpoint

await new Promise(async resolve => {
// Add the new subscription to the subscriptions object.
// Also add the resolver function to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
subscriptions = {
...subscriptions,
[subscriptionId]: { ...initialListeners, url, _resolver: resolve },
}

const token = new Token().get()
const tokenParsed = typeof token === 'function' ? (await token()).access_token : token
const baseUrlParsed = baseUrl.replace('http', 'ws')

// Open up the WebSocket connection if it doesn't exist.
if (!wsInstances[url]) {
wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
])

// Event listener for 'open'
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})

// Broadcast connection errors to all listeners.
wsInstances[url].addEventListener('error', error => {
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], error))
resolve()
})
await Promise.race([
new Promise(async (resolve, reject) => {
// Add the new subscription to the subscriptions object.
// Also add the resolver function to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
subscriptions = {
...subscriptions,
[subscriptionId]: { ...initialListeners, url, _resolver: resolve },
}

// Event listener for 'close'
wsInstances[url].addEventListener('close', () => {
delete wsInstances[url]
})
try {
const token = new Token().get()
const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token
const baseUrlParsed = baseUrl.replace('http', 'ws')

// Open up the WebSocket connection if it doesn't exist.
if (!wsInstances[url]) {
wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
])

// Event listener for 'open'
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})

// After the WebSocket connection is open, add the event listeners.
// Wait for the subscription confirmation message before resolving.
wsInstances[url].addEventListener('message', ({ data }) => {
const dataParsed = JSON.parse(data)
const listeners = subscriptions[dataParsed.id]
// Broadcast connection errors to all listeners.
wsInstances[url].addEventListener('error', error => {
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], new Error(error)))
// The error is an error event, but we should only throw proper errors.
// It has an optional error code that we could use to map to a proper error.
// However, the error codes are optional and not always used.
reject(new Error('Error in WebSocket connection'))
})

if (!listeners) {
warn('Message received for closed or unknown subscription with ID', dataParsed.id)
// Event listener for 'close'
wsInstances[url].addEventListener('close', closeEvent => {
delete wsInstances[url]
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.CLOSE], closeRequested))

return
}
if (closeRequested) {
resolve()
} else {
reject(
new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`),
)
}
})

if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) {
notify(listeners[EVENTS.OPEN])
// Resolve the promise after the subscription confirmation message.
listeners._resolver()
}
// After the WebSocket connection is open, add the event listeners.
// Wait for the subscription confirmation message before resolving.
wsInstances[url].addEventListener('message', ({ data }) => {
const dataParsed = JSON.parse(data)
const listeners = subscriptions[dataParsed.id]

if (dataParsed.type === MESSAGE_TYPES.ERROR) {
notify(listeners[EVENTS.ERROR], dataParsed)
}
if (!listeners) {
warn('Message received for closed or unknown subscription with ID', dataParsed.id)

if (dataParsed.type === MESSAGE_TYPES.PUBLISH) {
notify(listeners[EVENTS.MESSAGE], dataParsed.event)
}
return
}

if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) {
notify(listeners[EVENTS.OPEN])
// Resolve the promise after the subscription confirmation message.
listeners._resolver()
}

if (dataParsed.type === MESSAGE_TYPES.ERROR) {
notify(listeners[EVENTS.ERROR], dataParsed)
}

if (dataParsed.type === MESSAGE_TYPES.PUBLISH) {
notify(listeners[EVENTS.MESSAGE], dataParsed.event)
}

if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) {
notify(listeners[EVENTS.CLOSE], closeRequested)
// Remove the subscription.
delete subscriptions[dataParsed.id]
if (!Object.values(subscriptions).some(s => s.url === url)) {
wsInstances[url].close()
}
if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) {
notify(listeners[EVENTS.CLOSE], closeRequested)
// Remove the subscription.
delete subscriptions[dataParsed.id]
if (!Object.values(subscriptions).some(s => s.url === url)) {
wsInstances[url].close()
}
}
})
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
// If the WebSocket connection is already open, only add the subscription.
wsInstances[url].send(subscriptionPayload)
}
})
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
// If the WebSocket connection is already open, only add the subscription.
wsInstances[url].send(subscriptionPayload)
}
})
} catch (error) {
const err = error instanceof Error ? error : new Error(error)
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], err))
reject(err)
}
}),
new Promise((resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)),
])

// Return an observer object with the `on` and `close` functions for
// the current subscription.
Expand Down

0 comments on commit 8925c04

Please sign in to comment.