Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api, worker, application-generic): Add exhaustive error handling for bridge requests #6715

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions apps/api/src/app/bridge/usecases/sync/sync.usecase.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { BadRequestException, HttpException, Injectable } from '@nestjs/common';

import {
EnvironmentRepository,
Expand Down Expand Up @@ -56,8 +56,12 @@ export class Sync {
retriesLimit: 1,
workflowOrigin: WorkflowOriginEnum.EXTERNAL,
})) as DiscoverOutput;
} catch (error: any) {
throw new BadRequestException(`Bridge URL is not valid. ${error.message}`);
} catch (error) {
if (error instanceof HttpException) {
throw new BadRequestException(error.message);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplifying the error thrown here.

Let non-HTTP exceptions bubble up to the API exception handler.

throw error;
}

if (!discover) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,23 @@ export class ExecuteBridgeJob {
let raw: { retryCount?: number; statusCode?: number; message: string; url?: string };

if (error.response) {
let rawMessage: Record<string, unknown>;
const errorResponseBody = error?.response?.body;
try {
rawMessage = JSON.parse(errorResponseBody);
} catch {
Logger.error(`Unexpected body received from Bridge: ${errorResponseBody}`, LOG_CONTEXT);
rawMessage = {
error: `Unexpected body received from Bridge: ${errorResponseBody}`,
};
}

raw = {
url: statelessBridgeUrl,
statusCode: error.response?.statusCode,
message: error.response?.statusMessage,
...(error.response?.retryCount ? { retryCount: error.response?.retryCount } : {}),
...(error?.response?.body?.length > 0 ? { raw: JSON.parse(error?.response?.body) } : {}),
...(error?.response?.body?.length > 0 ? { raw: rawMessage } : {}),
Copy link
Contributor Author

@rifont rifont Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this lack of error handling during body parsing was contributing to execution detail logs not surfacing due to an inability to parse the error body. This was observed with errors such as "BSON circular value cannot be converted..."

};
} else if (error.message) {
raw = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,19 @@ import {
NotFoundException,
BadRequestException,
HttpException,
GatewayTimeoutException,
} from '@nestjs/common';
import got, { OptionsOfTextResponseBody, RequestError } from 'got';
import got, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the era of fetch, do we need to rely on got? Got by default gives us retries which I am not very sure if it's a great idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will tackle this offline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elaborating a little, is the thinking that we should handle retries elsewhere? At the queue layer, for example?

I tend to agree, the value we're getting from retries in practice is minimal, especially considering the total retry duration after incorporating backoff is less than 1 minute. It's unlikely an external bridge endpoint would recover from a total network fault within that timespan.

CacheError,
HTTPError,
MaxRedirectsError,
OptionsOfTextResponseBody,
ReadError,
RequestError,
TimeoutError,
UnsupportedProtocolError,
UploadError,
} from 'got';
import { createHmac } from 'node:crypto';

import {
Expand Down Expand Up @@ -33,14 +44,15 @@ export const RETRYABLE_HTTP_CODES: number[] = [
408, 413, 429, 500, 502, 503, 504, 521, 522, 524,
];
const RETRYABLE_ERROR_CODES: string[] = [
'ETIMEDOUT',
'ECONNRESET',
'EADDRINUSE',
'ECONNREFUSED',
'EPIPE',
'ENOTFOUND',
'ENETUNREACH',
'EAI_AGAIN',
'EAI_AGAIN', // DNS resolution failed, retry
'ECONNREFUSED', // Connection refused by the server
'ECONNRESET', // Connection was forcibly closed by a peer
'EADDRINUSE', // Address already in use
'EPIPE', // Broken pipe
'ETIMEDOUT', // Operation timed out
'ENOTFOUND', // DNS lookup failed
'EHOSTUNREACH', // No route to host
'ENETUNREACH', // Network is unreachable
];

const LOG_CONTEXT = 'ExecuteBridgeRequest';
Expand Down Expand Up @@ -76,11 +88,6 @@ export class ExecuteBridgeRequest {
);
}

const secretKey = await this.getDecryptedSecretKey.execute(
GetDecryptedSecretKeyCommand.create({
environmentId: command.environmentId,
}),
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring many of these to discrete methods to make the execute method easier to read.

const bridgeUrl = this.getBridgeUrl(
environment.bridge?.url || environment.echo?.url,
command.environmentId,
Expand Down Expand Up @@ -125,25 +132,13 @@ export class ExecuteBridgeRequest {
},
};

const timestamp = Date.now();
const novuSignatureHeader = `t=${timestamp},v1=${this.createHmacBySecretKey(
secretKey,
timestamp,
command.event || {},
)}`;

const request = [PostActionEnum.EXECUTE, PostActionEnum.PREVIEW].includes(
command.action as PostActionEnum,
)
? got.post
: got.get;

const headers = {
[HttpRequestHeaderKeysEnum.BYPASS_TUNNEL_REMINDER]: 'true',
[HttpRequestHeaderKeysEnum.CONTENT_TYPE]: 'application/json',
[HttpHeaderKeysEnum.NOVU_SIGNATURE_DEPRECATED]: novuSignatureHeader,
[HttpHeaderKeysEnum.NOVU_SIGNATURE]: novuSignatureHeader,
};
const headers = await this.buildRequestHeaders(command);

Logger.log(`Making bridge request to \`${url}\``, LOG_CONTEXT);
try {
Expand All @@ -152,75 +147,38 @@ export class ExecuteBridgeRequest {
headers,
}).json();
} catch (error) {
if (error instanceof RequestError) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a discrete handler for errors.

let body: Record<string, unknown>;
try {
body = JSON.parse(error.response.body as string);
} catch (e) {
// If the body is not valid JSON, we'll just use an empty object.
body = {};
}

if (Object.values(ErrorCodeEnum).includes(body.code as ErrorCodeEnum)) {
// Handle known Bridge errors. Propagate the error code and message.
throw new HttpException(body, error.response.statusCode);
} else if (body.code === TUNNEL_ERROR_CODE) {
// Handle known tunnel errors
const tunnelBody = body as TunnelResponseError;
Logger.error(
`Could not establish tunnel connection for \`${url}\`. Error: \`${tunnelBody.message}\``,
LOG_CONTEXT,
);
throw new NotFoundException(BRIDGE_EXECUTION_ERROR.TUNNEL_NOT_FOUND);
} else if (error.response?.statusCode === 502) {
/*
* Tunnel was live, but the Bridge endpoint was down.
* 502 is thrown by the tunnel service when the Bridge endpoint is not reachable.
*/
Logger.error(
`Bridge endpoint unavailable for \`${url}\``,
LOG_CONTEXT,
);
throw new BadRequestException(
BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_UNAVAILABLE,
);
} else if (error.response?.statusCode === 404) {
// Bridge endpoint wasn't found.
Logger.error(`Bridge endpoint not found for \`${url}\``, LOG_CONTEXT);
throw new NotFoundException(
BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_NOT_FOUND,
);
} else if (error.response?.statusCode === 405) {
// The Bridge endpoint didn't expose the required methods.
Logger.error(
`Bridge endpoint method not configured for \`${url}\``,
LOG_CONTEXT,
);
throw new BadRequestException(
BRIDGE_EXECUTION_ERROR.BRIDGE_METHOD_NOT_CONFIGURED,
);
} else {
// Unknown errors when calling the Bridge endpoint.
Logger.error(
`Unknown bridge request error calling \`${url}\`: \`${JSON.stringify(
body,
)}\``,
LOG_CONTEXT,
);
throw error;
}
} else {
// Handle unknown, non-request errors.
Logger.error(
`Unknown bridge error calling \`${url}\``,
error,
LOG_CONTEXT,
);
throw error;
}
this.handleResponseError(error, url);
}
}

private async buildRequestHeaders(command: ExecuteBridgeRequestCommand) {
const novuSignatureHeader = await this.buildRequestSignature(command);

return {
[HttpRequestHeaderKeysEnum.BYPASS_TUNNEL_REMINDER]: 'true',
[HttpRequestHeaderKeysEnum.CONTENT_TYPE]: 'application/json',
[HttpHeaderKeysEnum.NOVU_SIGNATURE_DEPRECATED]: novuSignatureHeader,
[HttpHeaderKeysEnum.NOVU_SIGNATURE]: novuSignatureHeader,
};
}

private async buildRequestSignature(command: ExecuteBridgeRequestCommand) {
const secretKey = await this.getDecryptedSecretKey.execute(
GetDecryptedSecretKeyCommand.create({
environmentId: command.environmentId,
}),
);

const timestamp = Date.now();
const novuSignatureHeader = `t=${timestamp},v1=${this.createHmacBySecretKey(
secretKey,
timestamp,
command.event || {},
)}`;

return novuSignatureHeader;
}

private createHmacBySecretKey(
secretKey: string,
timestamp: number,
Expand Down Expand Up @@ -256,8 +214,19 @@ export class ExecuteBridgeRequest {
switch (workflowOrigin) {
case WorkflowOriginEnum.NOVU_CLOUD:
return `${this.getApiUrl()}/v1/environments/${environmentId}/bridge`;
case WorkflowOriginEnum.EXTERNAL:
case WorkflowOriginEnum.EXTERNAL: {
if (!environmentBridgeUrl) {
throw new BadRequestException({
code: BRIDGE_EXECUTION_ERROR.INVALID_BRIDGE_URL.code,
message:
BRIDGE_EXECUTION_ERROR.INVALID_BRIDGE_URL.message(
environmentBridgeUrl,
),
});
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw an exception here explicitly if trying to request to an external Bridge URL when it doesn't exist on the environment.


return environmentBridgeUrl;
}
default:
throw new Error(`Unsupported workflow origin: ${workflowOrigin}`);
}
Expand All @@ -272,4 +241,135 @@ export class ExecuteBridgeRequest {

return apiUrl;
}

private handleResponseError(error: unknown, url: string) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exhaustive error handling added for all of:

  • got package error codes (it does a good job of handling these with explicit classes per error type
  • All Node.js error code (ref)

if (error instanceof RequestError) {
let body: Record<string, unknown>;
try {
body = JSON.parse(error.response.body as string);
} catch (e) {
// If the body is not valid JSON, we'll just use an empty object.
body = {};
}

if (
error instanceof HTTPError &&
Object.values(ErrorCodeEnum).includes(body.code as ErrorCodeEnum)
) {
// Handle known Bridge errors. Propagate the error code and message.
throw new HttpException(body, error.response.statusCode);
} else if (body.code === TUNNEL_ERROR_CODE) {
rifont marked this conversation as resolved.
Show resolved Hide resolved
// Handle known tunnel errors
const tunnelBody = body as TunnelResponseError;
Logger.error(
`Could not establish tunnel connection for \`${url}\`. Error: \`${tunnelBody.message}\``,
LOG_CONTEXT,
);
throw new NotFoundException({
message: BRIDGE_EXECUTION_ERROR.TUNNEL_NOT_FOUND.message(url),
code: BRIDGE_EXECUTION_ERROR.TUNNEL_NOT_FOUND.code,
});
} else if (error instanceof TimeoutError) {
Logger.error(`Bridge request timeout for \`${url}\``, LOG_CONTEXT);
throw new GatewayTimeoutException({
message: BRIDGE_EXECUTION_ERROR.BRIDGE_REQUEST_TIMEOUT.message(url),
code: BRIDGE_EXECUTION_ERROR.BRIDGE_REQUEST_TIMEOUT.code,
});
} else if (error instanceof UnsupportedProtocolError) {
Logger.error(`Unsupported protocol for \`${url}\``, LOG_CONTEXT);
throw new BadRequestException({
message: BRIDGE_EXECUTION_ERROR.UNSUPPORTED_PROTOCOL.message(url),
code: BRIDGE_EXECUTION_ERROR.UNSUPPORTED_PROTOCOL.code,
});
} else if (error instanceof ReadError) {
Logger.error(
`Response body could not be read for \`${url}\``,
LOG_CONTEXT,
);
throw new BadRequestException({
message: BRIDGE_EXECUTION_ERROR.RESPONSE_READ_ERROR.message(url),
code: BRIDGE_EXECUTION_ERROR.RESPONSE_READ_ERROR.code,
});
} else if (error instanceof UploadError) {
Logger.error(
`Error uploading request body for \`${url}\``,
LOG_CONTEXT,
);
throw new BadRequestException({
message: BRIDGE_EXECUTION_ERROR.REQUEST_UPLOAD_ERROR.message(url),
code: BRIDGE_EXECUTION_ERROR.REQUEST_UPLOAD_ERROR.code,
});
} else if (error instanceof CacheError) {
Logger.error(`Error caching request for \`${url}\``, LOG_CONTEXT);
throw new BadRequestException({
message: BRIDGE_EXECUTION_ERROR.REQUEST_CACHE_ERROR.message(url),
code: BRIDGE_EXECUTION_ERROR.REQUEST_CACHE_ERROR.code,
});
} else if (error instanceof MaxRedirectsError) {
Logger.error(`Maximum redirects exceeded for \`${url}\``, LOG_CONTEXT);
throw new BadRequestException({
message:
BRIDGE_EXECUTION_ERROR.MAXIMUM_REDIRECTS_EXCEEDED.message(url),
code: BRIDGE_EXECUTION_ERROR.MAXIMUM_REDIRECTS_EXCEEDED.code,
});
} else if (error.response?.statusCode === 502) {
/*
* Tunnel was live, but the Bridge endpoint was down.
* 502 is thrown by the tunnel service when the Bridge endpoint is not reachable.
*/
Logger.error(
`Local Bridge endpoint not found for \`${url}\``,
LOG_CONTEXT,
);
throw new BadRequestException({
message:
BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_NOT_FOUND.message(url),
code: BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_NOT_FOUND.code,
});
} else if (
error.response?.statusCode === 404 ||
RETRYABLE_ERROR_CODES.includes(error.code)
) {
Logger.error(`Bridge endpoint unavailable for \`${url}\``, LOG_CONTEXT);

let codeToThrow: string;
if (RETRYABLE_ERROR_CODES.includes(error.code)) {
codeToThrow = error.code;
} else {
codeToThrow = BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_UNAVAILABLE.code;
}
throw new NotFoundException({
message:
BRIDGE_EXECUTION_ERROR.BRIDGE_ENDPOINT_UNAVAILABLE.message(url),
code: codeToThrow,
});
} else if (error.response?.statusCode === 405) {
Logger.error(
`Bridge endpoint method not configured for \`${url}\``,
LOG_CONTEXT,
);
throw new BadRequestException({
message:
BRIDGE_EXECUTION_ERROR.BRIDGE_METHOD_NOT_CONFIGURED.message(url),
code: BRIDGE_EXECUTION_ERROR.BRIDGE_METHOD_NOT_CONFIGURED.code,
});
} else {
Logger.error(
`Unknown bridge request error calling \`${url}\`: \`${JSON.stringify(
body,
)}\``,
error,
LOG_CONTEXT,
);
throw error;
}
} else {
Logger.error(
`Unknown bridge non-request error calling \`${url}\``,
error,
LOG_CONTEXT,
);
throw error;
}
}
}
Loading
Loading