Skip to content

Commit

Permalink
Rejecting awakeable on failed query
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Jan 19, 2024
1 parent d31ac1c commit 062ac42
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions typescript/promisify-anything/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ export const internalApi: restate.ServiceApi<typeof internalAthenaApiRouter> = {

// Public API implementation

const query = async (ctx: restate.RpcContext, param: string) => {
const query = async (ctx: restate.RpcContext, externalRequest: string) => {
const uniqueId = ctx.rand.uuidv4();
const awakeable = ctx.awakeable();

ctx.send(internalApi).query(uniqueId, {
awakeableId: awakeable.id,
query: param,
query: externalRequest,
});

return await awakeable.promise;
Expand All @@ -43,7 +43,7 @@ const client = new athena.AthenaClient({});

type QueryRequest = {
awakeableId: string;
query: string;
query?: string;
};

const queryInternal = async (ctx: restate.RpcContext, requestId: string, request: QueryRequest) => {
Expand All @@ -59,26 +59,39 @@ const queryInternal = async (ctx: restate.RpcContext, requestId: string, request
executionId = (await ctx.sideEffect(async () => {
const startQueryResult = await client.send(
new athena.StartQueryExecutionCommand({
QueryString: 'SELECT * FROM "demo_db"."table" limit 10;',
QueryString: request.query ?? 'SELECT * FROM "demo_db"."table" limit 10;',
WorkGroup: "demo-workgroup",
ClientRequestToken: requestId,
}),
);
return startQueryResult.QueryExecutionId;
})) as string;
} catch (err) {
throw new restate.TerminalError("Unable to start query", { cause: err });
//throw new restate.TerminalError("Unable to start query: " + err);
ctx.rejectAwakeable(request.awakeableId, "Unable to start query: " + err);
return;
}

const results = await ctx.sideEffect(async () => {
return await client.send(
new athena.GetQueryResultsCommand({
QueryExecutionId: executionId,
}),
);
const result = await ctx.sideEffect(async () => {
try {
return await client.send(
new athena.GetQueryResultsCommand({
QueryExecutionId: executionId,
}),
);
} catch (err) {
if (err instanceof athena.InvalidRequestException && err.message.match(/state: FAILED/)) {
return err; // side effect completes with an error result
}
throw err; // side effect will be retried
}
});

ctx.resolveAwakeable(request.awakeableId, { result: results.ResultSet, _id: results.$metadata.requestId });
if (result instanceof athena.InvalidRequestException) {
ctx.rejectAwakeable(request.awakeableId, "Query execution failed: " + result.message);
} else {
ctx.resolveAwakeable(request.awakeableId, { result: result.ResultSet, _id: result.$metadata.requestId });
}
};

export const internalAthenaApiRouter = restate.keyedRouter({
Expand Down

0 comments on commit 062ac42

Please sign in to comment.