Skip to content

Commit

Permalink
console: Clarify reconnection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansmares committed Dec 16, 2023
1 parent f0eb6e7 commit d7acadc
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 74 deletions.
26 changes: 21 additions & 5 deletions pkg/webui/console/lib/events/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,29 @@ export const defineSyntheticEvent = name => data => ({
data,
})

export const createSyntheticEventFromError = error => {
const convertError = error => {
if (error instanceof Error) {
const errorString = error.toString()
if (error.message === 'network error' || error.message === 'Error in body stream') {
return createNetworkErrorEvent({ error: errorString })
return {
...error,
message: error.message,
name: error.name,
// The stack is omitted intentionally, as it is not relevant for a user.
}
}
return error
}

return createUnknownErrorEvent({ error: errorString })
export const createSyntheticEventFromError = error => {
if (error instanceof Error) {
if (
error.name === 'ConnectionError' ||
error.name === 'ConnectionClosedError' ||
error.name === 'ConnectionTimeoutError'
) {
return createNetworkErrorEvent({ error: convertError(error) })
} else if (error.name === 'ProtocolError') {
return createUnknownErrorEvent({ error: convertError(error) })
}
return createUnknownErrorEvent({ error: convertError(error) })
}
}
42 changes: 11 additions & 31 deletions pkg/webui/console/store/middleware/logics/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ import CONNECTION_STATUS from '@console/constants/connection-status'
import EVENT_TAIL from '@console/constants/event-tail'

import { getCombinedDeviceId } from '@ttn-lw/lib/selectors/id'
import {
isUnauthenticatedError,
isPermissionDeniedError,
isNetworkError,
isTimeoutError,
} from '@ttn-lw/lib/errors/utils'
import { TokenError } from '@ttn-lw/lib/errors/custom-errors'
import { SET_CONNECTION_STATUS, setStatusChecking } from '@ttn-lw/lib/store/actions/status'
import { selectIsOnlineStatus } from '@ttn-lw/lib/store/selectors/status'
Expand Down Expand Up @@ -144,10 +138,7 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
dispatch(startEventsSuccess(id, { silent }))
channel.open()
} catch (error) {
if (
error instanceof TokenError &&
(isUnauthenticatedError(error?.cause) || isPermissionDeniedError(error?.cause))
) {
if (error instanceof TokenError) {
// The user is no longer authenticated; reinitiate the auth flow
// by refreshing the page.
window.location.reload()
Expand Down Expand Up @@ -180,16 +171,14 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
},
process: async ({ action }, dispatch, done) => {
if (action.type === START_EVENTS_FAILURE) {
if (action.error?.message === 'timeout') {
// Set the connection status to `checking` to trigger connection checks
// and detect possible offline state.
dispatch(setStatusChecking())
// Set the connection status to `checking` to trigger connection checks
// and detect possible offline state.
dispatch(setStatusChecking())

// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
// In case of a network error, the connection could not be closed
// since the network connection is disrupted. We can regard this
// as equivalent to a closed connection.
return done()
}
if (action.type === STOP_EVENTS && Boolean(channel)) {
// Close the connection if it wasn't closed already.
Expand Down Expand Up @@ -284,18 +273,9 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => {
createLogic({
type: SET_EVENT_FILTER,
process: async ({ action }, dispatch, done) => {
if (channel) {
try {
await channel.close()
} catch (error) {
if (isNetworkError(error) || isTimeoutError(action.payload)) {
dispatch(setStatusChecking())
} else {
throw error
}
} finally {
dispatch(startEvents(action.id, { silent: true }))
}
if (Boolean(channel)) {
await channel.close()
dispatch(startEvents(action.id, { silent: true }))
}
done()
},
Expand Down
1 change: 1 addition & 0 deletions pkg/webui/console/store/reducers/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ const createNamedEventReducer = (reducerName = '') => {
: state.events),
error: action.error,
status: CONNECTION_STATUS.DISCONNECTED,
interrupted: true,
}
case GET_EVENT_FAILURE:
return {
Expand Down
1 change: 1 addition & 0 deletions sdk/js/src/api/stream/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export const newQueuedListeners = listeners => {
for (const [event, args] of queue) {
notify(listeners[event], ...args)
}
queue.splice(0, queue.length)
},
queuedListeners,
]
Expand Down
140 changes: 102 additions & 38 deletions sdk/js/src/api/stream/subscribeToWebSocketStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,44 @@ import Token from '../../util/token'

import { notify, newQueuedListeners, EVENTS, MESSAGE_TYPES, INITIAL_LISTENERS } from './shared'

const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolveClose) => {
export class ConnectionError extends Error {
constructor(message) {
super(message)
this.name = 'ConnectionError'
}
}

export class ConnectionClosedError extends ConnectionError {
constructor(message, code) {
super(message)
this.name = 'ConnectionClosedError'
this.code = code
}
}

export class ConnectionTimeoutError extends ConnectionError {
constructor(message) {
super(message)
this.name = 'ConnectionTimeoutError'
}
}

export class ProtocolError extends Error {
constructor(error) {
super(error.message)
this.name = 'ProtocolError'
this.code = error.code
this.details = error.details
}
}

const newSubscription = (
unsubscribe,
originalListeners,
resolveSubscribe,
rejectSubscribe,
resolveClose,
) => {
let closeRequested = false
const [open, listeners] = newQueuedListeners(originalListeners)
const externalSubscription = {
Expand All @@ -31,25 +68,25 @@ const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolv
return {
onError: err => {
notify(listeners[EVENTS.ERROR], err)
// If an error occurs while we are trying to subscribe, we should reject
// the promise in order to propagate the implicit subscription failure.
reject(err)

rejectSubscribe(err)
},
onClose: closeEvent => {
notify(listeners[EVENTS.CLOSE], closeRequested)

rejectSubscribe(new ConnectionClosedError('WebSocket connection closed', closeEvent.code))
resolveClose()
// If the connection has been closed while we are trying subscribe, we should
// reject the promise in order to propagate the implicit subscription failure.
reject(new Error(`WebSocket connection closed unexpectedly with code ${closeEvent.code}`))
},
onMessage: dataParsed => {
if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) {
// Resolve the promise after the subscription confirmation message.
resolve(externalSubscription)
resolveSubscribe(externalSubscription)
}

if (dataParsed.type === MESSAGE_TYPES.ERROR) {
notify(listeners[EVENTS.ERROR], dataParsed.error)
const err = new ProtocolError(dataParsed.error)
notify(listeners[EVENTS.ERROR], err)

rejectSubscribe(err)
}

if (dataParsed.type === MESSAGE_TYPES.PUBLISH) {
Expand All @@ -58,6 +95,7 @@ const newSubscription = (unsubscribe, originalListeners, resolve, reject, resolv

if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) {
notify(listeners[EVENTS.CLOSE], closeRequested)

resolveClose()
}
},
Expand All @@ -69,7 +107,7 @@ const newInstance = (wsInstance, onClose) => {

// Broadcast connection errors to all subscriptions.
wsInstance.addEventListener('error', () => {
const err = new Error('Error in WebSocket connection')
const err = new ConnectionError('WebSocket connection error')
for (const subscription of Object.values(subscriptions)) {
subscription.onError(err)
}
Expand Down Expand Up @@ -165,6 +203,20 @@ const state = newStore()
/**
* Opens a new stream.
*
* Implementation guarantees:
* - No events will be sent to the listeners before the `open` function is called.
* - No events will be sent to the listeners after the `close` function is called.
* - The `close` function will resolve after all events have been sent to the listeners.
* - The `close` function does not throw any errors.
* - The `close` function can be called multiple times.
* - The `open` function can be called multiple times.
* - The `open` function does not throw any errors, as long as the event listeners do not throw.
* - No `message` event will follow an `error` event.
* - No `message` event will follow a `close` event.
* - No `error` event will follow a `close` event.
* - No `error` event will follow another `error` event.
* - No `close` event will follow another `close` event.
*
* @async
* @param {object} payload - - The body of the initial request.
* @param {string} baseUrl - The stream baseUrl.
Expand Down Expand Up @@ -225,32 +277,44 @@ export default async (
const tokenParsed = typeof token === 'function' ? `${(await token()).access_token}` : token
const baseUrlParsed = baseUrl.replace('http', 'ws')

return await Promise.race([
new Promise((resolve, reject) => {
let instance = state.getInstance(url)
// Open up the WebSocket connection if it doesn't exist.
if (!instance) {
instance = state.setInstance(
url,
new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
]),
)
}

// Add the new subscription to the subscriptions object.
// Also add the resolver functions to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
instance.subscribe(
subscriptionId,
subscriptionPayload,
unsubscribePayload,
filledListeners,
resolve,
reject,
const subscribe = new Promise((resolve, reject) => {
let instance = state.getInstance(url)
// Open up the WebSocket connection if it doesn't exist.
if (!instance) {
instance = state.setInstance(
url,
new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
]),
)
}),
new Promise((_resolve, reject) => setTimeout(() => reject(new Error('timeout')), timeout)),
])
}

// Add the new subscription to the subscriptions object.
// Also add the resolver functions to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
instance.subscribe(
subscriptionId,
subscriptionPayload,
unsubscribePayload,
filledListeners,
resolve,
reject,
)
})

try {
return await Promise.race([
subscribe,
new Promise((_resolve, reject) =>
setTimeout(() => reject(new ConnectionTimeoutError('Timed out')), timeout),
),
])
} catch (err) {
subscribe.then(
subscription => subscription.close(),
() => {},
)
throw err
}
}

0 comments on commit d7acadc

Please sign in to comment.