Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix event stream error handling and reconnection #6749

Merged
merged 2 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions pkg/webui/console/store/middleware/logics/events.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2019 The Things Network Foundation, The Things Industries B.V.
// Copyright © 2023 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,6 +145,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
if (isUnauthenticatedError(error)) {
// The user is no longer authenticated; reinitiate the auth flow
// by refreshing the page.
// NOTE: As a result of the WebSocket refactor, the error shape is
// now very unspecific and authentication errors like before are
adriansmares marked this conversation as resolved.
Show resolved Hide resolved
// not thrown anymore. This should be addressed eventually.
window.location.reload()
} else {
dispatch(startEventsFailure(id, error))
Expand Down Expand Up @@ -174,21 +177,16 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
allow(action)
},
process: ({ action }, dispatch, done) => {
if (channel) {
try {
channel.close()
} catch (error) {
if (isNetworkError(error) || isTimeoutError(action.payload)) {
// Set the connection status to `checking` to trigger connection checks
// and detect possible offline state.
dispatch(setStatusChecking())
if (action.error) {
if (action.error?.message === 'timeout') {
// Set the connection status to `checking` to trigger connection checks
// and detect possible offline state.
dispatch(setStatusChecking())

// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
throw error
// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
}
done()
Expand Down Expand Up @@ -245,6 +243,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
type: SET_CONNECTION_STATUS,
process: ({ getState, action }, dispatch, done) => {
const isOnline = action.payload.onlineStatus === ONLINE_STATUS.ONLINE
const isOffline = action.payload.onlineStatus === ONLINE_STATUS.OFFLINE

if (isOnline) {
const state = getState()
Expand All @@ -268,6 +267,8 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
dispatch(dispatch(startEvents(ids)))
}
}
} else if (isOffline) {
// If the app went offline, close the event stream.
}

done()
Expand Down
171 changes: 101 additions & 70 deletions sdk/js/src/api/stream/subscribeToWebSocketStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @param {object} payload - - The body of the initial request.
* @param {string} baseUrl - The stream baseUrl.
* @param {string} endpoint - The stream endpoint.
* @param {number} timeout - The timeout for the stream.
*
* @example
* (async () => {
Expand All @@ -51,7 +52,12 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @returns {object} The stream subscription object with the `on` function for
* attaching listeners and the `close` function to close the stream.
*/
export default async (payload, baseUrl, endpoint = '/console/internal/events/') => {
export default async (
payload,
baseUrl,
endpoint = '/console/internal/events/',
timeout = 10000,
) => {
const subscriptionId = Date.now()
const subscriptionPayload = JSON.stringify({
type: MESSAGE_TYPES.SUBSCRIBE,
Expand All @@ -65,84 +71,109 @@ export default async (payload, baseUrl, endpoint = '/console/internal/events/')
let closeRequested = false
const url = baseUrl + endpoint

await new Promise(async resolve => {
// Add the new subscription to the subscriptions object.
// Also add the resolver function to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
subscriptions = {
...subscriptions,
[subscriptionId]: { ...initialListeners, url, _resolver: resolve },
}

const token = new Token().get()
const tokenParsed = typeof token === 'function' ? (await token()).access_token : token
const baseUrlParsed = baseUrl.replace('http', 'ws')

// Open up the WebSocket connection if it doesn't exist.
if (!wsInstances[url]) {
wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
])

// Event listener for 'open'
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})

// Broadcast connection errors to all listeners.
wsInstances[url].addEventListener('error', error => {
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], error))
resolve()
})
await Promise.race([
new Promise(async (resolve, reject) => {
// Add the new subscription to the subscriptions object.
// Also add the resolver function to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
subscriptions = {
...subscriptions,
[subscriptionId]: { ...initialListeners, url, _resolver: resolve },
}

// Event listener for 'close'
wsInstances[url].addEventListener('close', () => {
delete wsInstances[url]
})
try {
const token = new Token().get()
const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token
const baseUrlParsed = baseUrl.replace('http', 'ws')

// Open up the WebSocket connection if it doesn't exist.
if (!wsInstances[url]) {
wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
])

// Event listener for 'open'
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})

// After the WebSocket connection is open, add the event listeners.
// Wait for the subscription confirmation message before resolving.
wsInstances[url].addEventListener('message', ({ data }) => {
const dataParsed = JSON.parse(data)
const listeners = subscriptions[dataParsed.id]
// Broadcast connection errors to all listeners.
wsInstances[url].addEventListener('error', error => {
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], new Error(error)))
// The error is an error event, but we should only throw proper errors.
// It has an optional error code that we could use to map to a proper error.
// However, the error codes are optional and not always used.
reject(new Error('Error in WebSocket connection'))
})

if (!listeners) {
warn('Message received for closed or unknown subscription with ID', dataParsed.id)
// Event listener for 'close'
wsInstances[url].addEventListener('close', closeEvent => {
delete wsInstances[url]
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.CLOSE], closeRequested))

return
}
if (closeRequested) {
resolve()
} else {
reject(
new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`),
)
}
})

if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) {
notify(listeners[EVENTS.OPEN])
// Resolve the promise after the subscription confirmation message.
listeners._resolver()
}
// After the WebSocket connection is open, add the event listeners.
// Wait for the subscription confirmation message before resolving.
wsInstances[url].addEventListener('message', ({ data }) => {
const dataParsed = JSON.parse(data)
const listeners = subscriptions[dataParsed.id]

if (dataParsed.type === MESSAGE_TYPES.ERROR) {
notify(listeners[EVENTS.ERROR], dataParsed)
}
if (!listeners) {
warn('Message received for closed or unknown subscription with ID', dataParsed.id)

if (dataParsed.type === MESSAGE_TYPES.PUBLISH) {
notify(listeners[EVENTS.MESSAGE], dataParsed.event)
}
return
}

if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) {
notify(listeners[EVENTS.OPEN])
// Resolve the promise after the subscription confirmation message.
listeners._resolver()
}

if (dataParsed.type === MESSAGE_TYPES.ERROR) {
notify(listeners[EVENTS.ERROR], dataParsed)
}

if (dataParsed.type === MESSAGE_TYPES.PUBLISH) {
notify(listeners[EVENTS.MESSAGE], dataParsed.event)
}

if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) {
notify(listeners[EVENTS.CLOSE], closeRequested)
// Remove the subscription.
delete subscriptions[dataParsed.id]
if (!Object.values(subscriptions).some(s => s.url === url)) {
wsInstances[url].close()
}
if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) {
notify(listeners[EVENTS.CLOSE], closeRequested)
// Remove the subscription.
delete subscriptions[dataParsed.id]
if (!Object.values(subscriptions).some(s => s.url === url)) {
wsInstances[url].close()
}
}
})
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
// If the WebSocket connection is already open, only add the subscription.
wsInstances[url].send(subscriptionPayload)
}
})
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
// If the WebSocket connection is already open, only add the subscription.
wsInstances[url].send(subscriptionPayload)
}
})
} catch (error) {
const err = error instanceof Error ? error : new Error(error)
Object.values(subscriptions)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], err))
reject(err)
}
}),
new Promise((resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)),
])

// Return an observer object with the `on` and `close` functions for
// the current subscription.
Expand Down
Loading