diff --git a/patterns-use-cases/xstate/README.md b/patterns-use-cases/xstate/README.md index 7d535cf3..6ad345b0 100644 --- a/patterns-use-cases/xstate/README.md +++ b/patterns-use-cases/xstate/README.md @@ -1,6 +1,46 @@ -# Deploying a Restate Typescript greeter on AWS Lambda +# Deploying a XState state machine on Restate -Restate is a system for easily building resilient applications using **distributed durable RPC & async/await**. +This example shows how to integrate Restate deeply with +[XState](https://stately.ai/docs/xstate). The code in [lib.ts](./lib.ts) and +[promise.ts](./promise.ts) converts an XState machine into two Restate +services: -This example contains the greeter service which you can deploy on AWS Lambda. -Take a look at [how to deploy Restate services on AWS Lambda](https://docs.restate.dev/services/deployment/lambda#tutorial) for more information. +1. A keyed service, which stores the state of the state machine, keyed on an + identifier for this instance of the machine. This service is called with + every event that must be processed by the state machine. XState machines are + generally pure and are not async; side effects generally happen through + [Promise Actors](https://stately.ai/docs/promise-actors). As such, this + service should never block the machine, so other events can always be + processed. +2. An unkeyed service, which exists solely to execute Promise Actors and call + back to the state machine with their result. As this is an unkeyed service, + the Promise won't hold up any other events. This service doesn't need to be + called by you directly. + +Both services are set up and managed automatically by interpreting the state +machine definition, and should generally be deployed together, whether as a +Lambda or as a long-lived service. + +In `app.ts` you will see an example of an XState machine that uses cross-machine +communication, delays, and Promise actors, all running in Restate. However, +most XState machines should work out of the box; this is still experimental, so +we haven't tested everything yet! + +To try out this example: + +```bash +# start a local Restate instance +docker run -d -it --network=host --name restate_dev --rm restatedev/restate:0.8.1 +# start the service +npm run dev +# register the state machine service against restate +npx @restatedev/restate@0.8.1 dep register http://localhost:9080 + +# create a state machine +curl http://localhost:8080/auth/create --json '{"key": "myMachine"}' +# watch the state +watch -n1 'curl -s http://localhost:8080/auth/snapshot --json "{\"key\": \"myMachine\"}"' +# kick off the machine +curl http://localhost:8080/auth/send --json '{"key": "myMachine", "request": {"event": {"type": "AUTH"}}}' +# and watch the auth flow progress! +``` diff --git a/patterns-use-cases/xstate/package-lock.json b/patterns-use-cases/xstate/package-lock.json index bba47991..2b5c18e3 100644 --- a/patterns-use-cases/xstate/package-lock.json +++ b/patterns-use-cases/xstate/package-lock.json @@ -9,10 +9,11 @@ "version": "0.0.1", "license": "MIT", "dependencies": { - "@restatedev/restate-sdk": "^0.5.1", + "@restatedev/restate-sdk": "^0.8.1", "xstate": "^5.2.1" }, "devDependencies": { + "@types/node": "^20.12.10", "@typescript-eslint/eslint-plugin": "^6.3.0", "@typescript-eslint/parser": "^6.3.0", "esbuild": "^0.19.0", @@ -598,15 +599,15 @@ "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==" }, "node_modules/@restatedev/restate-sdk": { - "version": "0.5.2", - "resolved": "https://registry.npmjs.org/@restatedev/restate-sdk/-/restate-sdk-0.5.2.tgz", - "integrity": "sha512-WQ70cUNpe0allg+1Sa2kG116juJNpVfOcGL4po4jWw9Yw/3RTyBmo80ShszXbof+Zs4JxxOhrFBPTY+eSmqOcA==", + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@restatedev/restate-sdk/-/restate-sdk-0.8.1.tgz", + "integrity": "sha512-bMfXZVAyh64O1BP+u8xDpPxE1o4F0cu+Q+XwzLneq38sSRJ0Xa2HIoW1djYCbViVcpAqzsjo2HUnf14a42wK2g==", "dependencies": { "protobufjs": "^7.2.2", "ts-proto": "^1.140.0" }, "engines": { - "node": ">= 10" + "node": ">= 18" } }, "node_modules/@tsconfig/node10": { @@ -640,9 +641,9 @@ "dev": true }, "node_modules/@types/node": { - "version": "20.10.4", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.10.4.tgz", - "integrity": "sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==", + "version": "20.12.10", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.12.10.tgz", + "integrity": "sha512-Eem5pH9pmWBHoGAT8Dr5fdc5rYA+4NAovdM4EktRPVAAiJhmWWfQrA0cFhAbOsQdSfIHjAud6YdkbL69+zSKjw==", "dependencies": { "undici-types": "~5.26.4" } diff --git a/patterns-use-cases/xstate/package.json b/patterns-use-cases/xstate/package.json index d15f28e5..a873614a 100644 --- a/patterns-use-cases/xstate/package.json +++ b/patterns-use-cases/xstate/package.json @@ -18,10 +18,11 @@ "dev": "ts-node-dev --respawn --transpile-only ./src/app.ts" }, "dependencies": { - "@restatedev/restate-sdk": "^0.5.1", + "@restatedev/restate-sdk": "^0.8.1", "xstate": "^5.2.1" }, "devDependencies": { + "@types/node": "^20.12.10", "@typescript-eslint/eslint-plugin": "^6.3.0", "@typescript-eslint/parser": "^6.3.0", "esbuild": "^0.19.0", diff --git a/patterns-use-cases/xstate/src/app.ts b/patterns-use-cases/xstate/src/app.ts index f9d8217c..bbde7772 100644 --- a/patterns-use-cases/xstate/src/app.ts +++ b/patterns-use-cases/xstate/src/app.ts @@ -11,69 +11,72 @@ import * as restate from "@restatedev/restate-sdk"; -import {createMachine, sendTo} from 'xstate'; -import {bindXStateRouter} from "./lib"; -import {fromPromise} from "./promise"; +import { createMachine, sendTo } from "xstate"; +import { bindXStateRouter } from "./lib"; +import { fromPromise } from "./promise"; -const authServerMachine = createMachine({ - id: 'server', - initial: 'waitingForCode', - states: { - waitingForCode: { - on: { - CODE: { - target: "process" +const authServerMachine = createMachine( + { + id: "server", + initial: "waitingForCode", + states: { + waitingForCode: { + on: { + CODE: { + target: "process", + }, }, }, - }, - process: { - invoke: { - id: 'process', - src: 'authorise', - onDone: { - actions: sendTo( - ({self}) => self._parent!, - { type: 'TOKEN' }, - { delay: 1000 }, - ), + process: { + invoke: { + id: "process", + src: "authorise", + onDone: { + actions: sendTo( + ({ self }) => self._parent!, + { type: "TOKEN" }, + { delay: 1000 } + ), + }, }, - } + }, + }, + }, + { + actors: { + authorise: fromPromise( + () => new Promise((resolve) => setTimeout(resolve, 5000)) + ), }, } -}, { - actors: { - authorise: fromPromise(() => new Promise((resolve) => setTimeout(resolve, 5000))), - } -}); +); const authClientMachine = createMachine({ - id: 'client', - initial: 'idle', + id: "client", + initial: "idle", states: { idle: { on: { - AUTH: {target: 'authorizing'}, + AUTH: { target: "authorizing" }, }, }, authorizing: { invoke: { - id: 'auth-server', + id: "auth-server", src: authServerMachine, }, - entry: sendTo('auth-server', ({self}) => ({ - type: 'CODE', + entry: sendTo("auth-server", ({ self }) => ({ + type: "CODE", sender: self, })), on: { - TOKEN: {target: 'authorized'}, + TOKEN: { target: "authorized" }, }, }, authorized: { - type: 'final', + type: "final", }, }, }); - -bindXStateRouter(restate.createServer(), "foo", authClientMachine).listen() - +bindXStateRouter(restate.endpoint(), "auth", authClientMachine).listen(); diff --git a/patterns-use-cases/xstate/src/lib.ts b/patterns-use-cases/xstate/src/lib.ts index 344b9aae..a0992697 100644 --- a/patterns-use-cases/xstate/src/lib.ts +++ b/patterns-use-cases/xstate/src/lib.ts @@ -17,48 +17,54 @@ import { Observer, Snapshot, Subscription, - toObserver + toObserver, } from "xstate"; import * as restate from "@restatedev/restate-sdk"; -import {TerminalError} from "@restatedev/restate-sdk"; -import {promiseMethods} from "./promise"; +import { TerminalError } from "@restatedev/restate-sdk"; +import { promiseMethods } from "./promise"; import { AreAllImplementationsAssumedToBeProvided, - MissingImplementationsError + MissingImplementationsError, } from "xstate/dist/declarations/src/typegenTypes"; -export interface RestateActorSystem extends ActorSystem { +export interface RestateActorSystem + extends ActorSystem { _bookId: () => string; _register: (sessionId: string, actorRef: AnyActorRef) => string; _unregister: (actorRef: AnyActorRef) => void; _sendInspectionEvent: ( - event: HomomorphicOmit + event: HomomorphicOmit ) => void; - actor: (sessionId: string) => AnyActorRef | undefined, - _set: (key: K, actorRef: T['actors'][K]) => void; + actor: (sessionId: string) => AnyActorRef | undefined; + _set: (key: K, actorRef: T["actors"][K]) => void; _relay: ( source: AnyActorRef | SerialisableActorRef | undefined, target: AnyActorRef, event: AnyEventObject ) => void; - api: XStateApi> - ctx: restate.RpcContext, - systemName: string, + api: XStateApi>; + ctx: restate.KeyedContext; + systemName: string; } export type SerialisableActorRef = { id: string; sessionId: string; - _parent?: SerialisableActorRef, -} + _parent?: SerialisableActorRef; +}; -export const serialiseActorRef = (actorRef: AnyActorRef): SerialisableActorRef => { +export const serialiseActorRef = ( + actorRef: AnyActorRef +): SerialisableActorRef => { return { id: actorRef.id, sessionId: actorRef.sessionId, - _parent: actorRef._parent === undefined ? undefined : serialiseActorRef(actorRef._parent) - } -} + _parent: + actorRef._parent === undefined + ? undefined + : serialiseActorRef(actorRef._parent), + }; +}; type SerialisableScheduledEvent = { id: string; @@ -68,27 +74,51 @@ type SerialisableScheduledEvent = { source: SerialisableActorRef; target: SerialisableActorRef; uuid: string; -} +}; + +async function createSystem( + ctx: restate.KeyedContext, + api: XStateApi>, + systemName: string +): Promise> { + const events = + (await ctx.get<{ [key: string]: SerialisableScheduledEvent }>("events")) ?? + {}; + const childrenByID = + (await ctx.get<{ [key: string]: SerialisableActorRef }>("children")) ?? {}; -async function createSystem(ctx: restate.RpcContext, api: XStateApi>, systemName: string): Promise> { - const events = await ctx.get<{ [key: string]: SerialisableScheduledEvent }>("events") ?? {} - const childrenByID = await ctx.get<{ [key: string]: SerialisableActorRef }>("children") ?? {} - - let idCounter = 0; const children = new Map(); - const keyedActors = new Map(); - const reverseKeyedActors = new WeakMap(); + const keyedActors = new Map(); + const reverseKeyedActors = new WeakMap(); const observers = new Set>(); const scheduler = { - schedule(_source: AnyActorRef, _target: AnyActorRef, event: EventObject, delay: number, id: string | undefined): void { + schedule( + _source: AnyActorRef, + _target: AnyActorRef, + event: EventObject, + delay: number, + id: string | undefined + ): void { if (id === undefined) { - id = ctx.rand.random().toString(36).slice(2) + id = ctx.rand.random().toString(36).slice(2); } - const {source, target} = {source: serialiseActorRef(_source), target: serialiseActorRef(_target)} + const { source, target } = { + source: serialiseActorRef(_source), + target: serialiseActorRef(_target), + }; - console.log("schedule from", source.id, "to", target.id, "with id", id, "and delay", delay) + console.log( + "schedule from", + source.id, + "to", + target.id, + "with id", + id, + "and delay", + delay + ); const scheduledEvent: SerialisableScheduledEvent = { source, @@ -101,25 +131,32 @@ async function createSystem(ctx: restate.RpcContext, }; const scheduledEventId = createScheduledEventId(source, id); if (scheduledEventId in events) { - console.log("Ignoring duplicated schedule from", source.id, "to", target.id) - return + console.log( + "Ignoring duplicated schedule from", + source.id, + "to", + target.id + ); + return; } events[scheduledEventId] = scheduledEvent; - ctx.sendDelayed(api.actor, delay).send(systemName, {scheduledEvent, source, target, event}) - ctx.set("events", events) + ctx + .sendDelayed(api.actor, delay) + .send(systemName, { scheduledEvent, source, target, event }); + ctx.set("events", events); }, cancel(source: AnyActorRef, id: string): void { - console.log("cancel schedule from", source.id, "with id", id) + console.log("cancel schedule from", source.id, "with id", id); const scheduledEventId = createScheduledEventId(source, id); delete events[scheduledEventId]; - ctx.set("events", events) + ctx.set("events", events); }, cancelAll(actorRef: AnyActorRef): void { - console.log("cancel all for", actorRef.id) + console.log("cancel all for", actorRef.id); for (const scheduledEventId in events) { const scheduledEvent = events[scheduledEventId]; @@ -127,9 +164,9 @@ async function createSystem(ctx: restate.RpcContext, delete events[scheduledEventId]; } } - ctx.set("events", events) + ctx.set("events", events); }, - } + }; const system: RestateActorSystem = { ctx, @@ -140,26 +177,26 @@ async function createSystem(ctx: restate.RpcContext, _register: (sessionId, actorRef) => { if (actorRef.id in childrenByID) { // rehydration case; ensure session ID maintains continuity - sessionId = childrenByID[actorRef.id].sessionId - actorRef.sessionId = sessionId + sessionId = childrenByID[actorRef.id].sessionId; + actorRef.sessionId = sessionId; } else { // new actor case - childrenByID[actorRef.id] = serialiseActorRef(actorRef) - ctx.set("children", childrenByID) + childrenByID[actorRef.id] = serialiseActorRef(actorRef); + ctx.set("children", childrenByID); } - console.log("register", sessionId, actorRef.id) + console.log("register", sessionId, actorRef.id); children.set(sessionId, actorRef); return sessionId; }, _unregister: (actorRef) => { if (actorRef.id in childrenByID) { // rehydration case; ensure session ID maintains continuity - actorRef.sessionId = childrenByID[actorRef.id].sessionId + actorRef.sessionId = childrenByID[actorRef.id].sessionId; } children.delete(actorRef.sessionId); - delete childrenByID[actorRef.id] - ctx.set("children", childrenByID) + delete childrenByID[actorRef.id]; + ctx.set("children", childrenByID); const systemId = reverseKeyedActors.get(actorRef); if (systemId !== undefined) { @@ -175,10 +212,10 @@ async function createSystem(ctx: restate.RpcContext, observers.forEach((observer) => observer.next?.(resolvedInspectionEvent)); }, actor: (sessionId) => { - return children.get(sessionId) + return children.get(sessionId); }, get: (systemId) => { - return keyedActors.get(systemId) as T['actors'][any]; + return keyedActors.get(systemId) as T["actors"][any]; }, _set: (systemId, actorRef) => { const existing = keyedActors.get(systemId); @@ -195,17 +232,23 @@ async function createSystem(ctx: restate.RpcContext, observers.add(observer); }, _relay: (source, target, event) => { - console.log("Relaying message from", source?.id, "to", target.id, ":", event.type); + console.log( + "Relaying message from", + source?.id, + "to", + target.id, + ":", + event.type + ); (target as any)._send(event); }, scheduler, getSnapshot: () => { return { - _scheduledEvents: {} // unused + _scheduledEvents: {}, // unused }; }, - start: () => { - } + start: () => {}, }; return system; @@ -215,141 +258,212 @@ interface FakeParent extends AnyActorRef { _send: (event: EventFromLogic) => void; } -export async function createActor(ctx: restate.RpcContext, api: XStateApi, systemName: string, logic: TLogic extends AnyStateMachine ? AreAllImplementationsAssumedToBeProvided extends true ? TLogic : MissingImplementationsError : TLogic, options?: ActorOptions): Promise> { - const system = await createSystem(ctx, api, systemName) - const snapshot = await ctx.get>("snapshot") ?? undefined; +export async function createActor( + ctx: restate.KeyedContext, + api: XStateApi, + systemName: string, + logic: TLogic extends AnyStateMachine + ? AreAllImplementationsAssumedToBeProvided< + TLogic["__TResolvedTypesMeta"] + > extends true + ? TLogic + : MissingImplementationsError + : TLogic, + options?: ActorOptions +): Promise> { + const system = await createSystem(ctx, api, systemName); + const snapshot = (await ctx.get>("snapshot")) ?? undefined; const parent: FakeParent = { id: "fakeRoot", sessionId: "fakeRoot", - send: () => { - }, - _send: () => { - }, - start: () => { - }, + send: () => {}, + _send: () => {}, + start: () => {}, getSnapshot: (): null => { - return null + return null; }, // TODO getPersistedSnapshot: (): Snapshot => { return { - status: 'active', + status: "active", output: undefined, error: undefined, - } - }, // TODO - stop: () => { + }; }, // TODO + stop: () => {}, // TODO system, src: "fakeRoot", subscribe: (): Subscription => { return { - unsubscribe() { - } - } + unsubscribe() {}, + }; }, [Symbol.observable]: (): InteropSubscribable => { return { subscribe(): Subscription { return { - unsubscribe() { - } - } - } - } + unsubscribe() {}, + }; + }, + }; }, - } + }; if (options?.inspect) { // Always inspect at the system-level system.inspect(toObserver(options.inspect)); } - return createXActor(logic, {id: "root", ...options, parent, snapshot} as any); + return createXActor(logic, { + id: "root", + ...options, + parent, + snapshot, + } as any); } -const actorMethods = (path: string, logic: TLogic) => { - const api = xStateApi(path) +const actorMethods = ( + path: string, + logic: TLogic +) => { + const api = xStateApi(path); return { - create: async (ctx: restate.RpcContext, systemName: string, request?: { input?: InputFrom }): Promise> => { - ctx.clear("snapshot") - ctx.clear("events") - ctx.clear("children") - - const root = (await createActor(ctx, api, systemName, logic, { - input: request?.input, - })).start(); - - ctx.set("snapshot", root.getPersistedSnapshot()) - - return root.getPersistedSnapshot() + create: async ( + ctx: restate.KeyedContext, + systemName: string, + request?: { input?: InputFrom } + ): Promise> => { + ctx.clear("snapshot"); + ctx.clear("events"); + ctx.clear("children"); + + const root = ( + await createActor(ctx, api, systemName, logic, { + input: request?.input, + }) + ).start(); + + ctx.set("snapshot", root.getPersistedSnapshot()); + + return root.getPersistedSnapshot(); }, - send: async (ctx: restate.RpcContext, systemName: string, request?: { scheduledEvent?: SerialisableScheduledEvent, source?: SerialisableActorRef, target?: SerialisableActorRef, event: AnyEventObject }): Promise | undefined> => { + send: async ( + ctx: restate.KeyedContext, + systemName: string, + request?: { + scheduledEvent?: SerialisableScheduledEvent; + source?: SerialisableActorRef; + target?: SerialisableActorRef; + event: AnyEventObject; + } + ): Promise | undefined> => { if (!request) { - throw new TerminalError("Must provide a request") + throw new TerminalError("Must provide a request"); } if (request.scheduledEvent) { - const events = await ctx.get<{ [key: string]: SerialisableScheduledEvent }>("events") ?? {} - const scheduledEventId = createScheduledEventId(request.scheduledEvent.source, request.scheduledEvent.id) + const events = + (await ctx.get<{ [key: string]: SerialisableScheduledEvent }>( + "events" + )) ?? {}; + const scheduledEventId = createScheduledEventId( + request.scheduledEvent.source, + request.scheduledEvent.id + ); if (!(scheduledEventId in events)) { - console.log("Received now cancelled event", scheduledEventId, "for target", request.target) - return + console.log( + "Received now cancelled event", + scheduledEventId, + "for target", + request.target + ); + return; } if (events[scheduledEventId].uuid !== request.scheduledEvent.uuid) { - console.log("Received now replaced event", scheduledEventId, "for target", request.target) - return + console.log( + "Received now replaced event", + scheduledEventId, + "for target", + request.target + ); + return; } - delete events[scheduledEventId] - ctx.set("events", events) + delete events[scheduledEventId]; + ctx.set("events", events); } const root = (await createActor(ctx, api, systemName, logic)).start(); - let actor; if (request.target) { - actor = (root.system as RestateActorSystem).actor(request.target.sessionId) + actor = (root.system as RestateActorSystem).actor( + request.target.sessionId + ); if (!actor) { - throw new TerminalError(`Actor ${request.target.id} not found; it may have since stopped`) + throw new TerminalError( + `Actor ${request.target.id} not found; it may have since stopped` + ); } } else { - actor = root + actor = root; } - (root.system as RestateActorSystem)._relay(request.source, actor, request.event) + (root.system as RestateActorSystem)._relay( + request.source, + actor, + request.event + ); - const nextSnapshot = root.getPersistedSnapshot() - ctx.set("snapshot", nextSnapshot) + const nextSnapshot = root.getPersistedSnapshot(); + ctx.set("snapshot", nextSnapshot); - return nextSnapshot + return nextSnapshot; }, - snapshot: async (ctx: restate.RpcContext, systemName: string): Promise> => { - const root = (await createActor(ctx, api, systemName, logic)); + snapshot: async ( + ctx: restate.KeyedContext, + systemName: string + ): Promise> => { + const root = await createActor(ctx, api, systemName, logic); - return root.getPersistedSnapshot() + return root.getPersistedSnapshot(); }, - } -} + }; +}; -export const bindXStateRouter = < - TLogic extends AnyStateMachine, ->(server: restate.RestateServer, path: string, logic: TLogic): restate.RestateServer => { +export const bindXStateRouter = ( + server: restate.RestateEndpoint, + path: string, + logic: TLogic +): restate.RestateEndpoint => { return server .bindKeyedRouter(path, restate.keyedRouter(actorMethods(path, logic))) - .bindRouter(`${path}.promises`, restate.router(promiseMethods(path, logic))) -} - -export const xStateApi = (path: string): XStateApi => { - const actor: restate.ServiceApi> = {path} - const promise: restate.ServiceApi> = {path: `${path}.promises`} - return {actor, promise} -} - -type ActorRouter = restate.KeyedRouter>> -type PromiseRouter = restate.UnKeyedRouter>> -type XStateApi = { actor: restate.ServiceApi>, promise: restate.ServiceApi> } + .bindRouter( + `${path}.promises`, + restate.router(promiseMethods(path, logic)) + ); +}; + +export const xStateApi = ( + path: string +): XStateApi => { + const actor: restate.ServiceApi> = { path }; + const promise: restate.ServiceApi> = { + path: `${path}.promises`, + }; + return { actor, promise }; +}; + +type ActorRouter = restate.KeyedRouter< + ReturnType> +>; +type PromiseRouter = restate.UnKeyedRouter< + ReturnType> +>; +type XStateApi = { + actor: restate.ServiceApi>; + promise: restate.ServiceApi>; +}; function createScheduledEventId( actorRef: SerialisableActorRef, @@ -357,4 +471,3 @@ function createScheduledEventId( ): string { return `${actorRef.sessionId}.${id}`; } - diff --git a/patterns-use-cases/xstate/src/promise.ts b/patterns-use-cases/xstate/src/promise.ts index 2c43db33..c45b95b2 100644 --- a/patterns-use-cases/xstate/src/promise.ts +++ b/patterns-use-cases/xstate/src/promise.ts @@ -8,45 +8,56 @@ import { NonReducibleUnknown, Snapshot, } from "xstate"; -import {AnyActorSystem} from "xstate/dist/declarations/src/system"; -import {RestateActorSystem, SerialisableActorRef, serialiseActorRef, xStateApi} from "./lib"; +import { AnyActorSystem } from "xstate/dist/declarations/src/system"; +import { + RestateActorSystem, + SerialisableActorRef, + serialiseActorRef, + xStateApi, +} from "./lib"; import * as restate from "@restatedev/restate-sdk"; -import {TerminalError} from "@restatedev/restate-sdk"; +import { TerminalError } from "@restatedev/restate-sdk"; export type PromiseSnapshot = Snapshot & { input: TInput | undefined; sent: boolean; }; -const RESTATE_PROMISE_SENT = 'restate.promise.sent'; -const RESTATE_PROMISE_RESOLVE = 'restate.promise.resolve'; -const RESTATE_PROMISE_REJECT = 'restate.promise.reject'; -const XSTATE_STOP = 'xstate.stop'; +const RESTATE_PROMISE_SENT = "restate.promise.sent"; +const RESTATE_PROMISE_RESOLVE = "restate.promise.resolve"; +const RESTATE_PROMISE_REJECT = "restate.promise.reject"; +const XSTATE_STOP = "xstate.stop"; -type PromiseCreator = ({input, ctx}: { +type PromiseCreator = ({ + input, + ctx, +}: { input: TInput; - ctx: restate.RpcContext, -}) => PromiseLike + ctx: restate.Context; +}) => PromiseLike; export type PromiseActorLogic = ActorLogic< PromiseSnapshot, { type: string; [k: string]: unknown }, TInput, // input AnyActorSystem -> & { sentinel: "restate.promise.actor", config: PromiseCreator }; +> & { + sentinel: "restate.promise.actor"; + config: PromiseCreator; +}; export type PromiseActorRef = ActorRefFrom< PromiseActorLogic >; export function fromPromise( - promiseCreator: PromiseCreator, + promiseCreator: PromiseCreator ): PromiseActorLogic { const logic: PromiseActorLogic = { sentinel: "restate.promise.actor", config: promiseCreator, transition: (state, event) => { - if (state.status !== 'active') { + if (state.status !== "active") { return state; } @@ -61,30 +72,30 @@ export function fromPromise( const resolvedValue = (event as any).data; return { ...state, - status: 'done', + status: "done", output: resolvedValue, - input: undefined + input: undefined, }; } case RESTATE_PROMISE_REJECT: return { ...state, - status: 'error', + status: "error", error: (event as any).data, - input: undefined + input: undefined, }; case XSTATE_STOP: return { ...state, - status: 'stopped', - input: undefined + status: "stopped", + input: undefined, }; default: return state; } }, - start: (state, {self, system}) => { - if (state.status !== 'active') { + start: (state, { self, system }) => { + if (state.status !== "active") { return; } @@ -92,13 +103,13 @@ export function fromPromise( return; } - const rs = (system as RestateActorSystem) + const rs = system as RestateActorSystem; rs.ctx.send(rs.api.promise).invoke({ systemName: rs.systemName, self: serialiseActorRef(self), srcs: actorSrc(self), - input: state.input + input: state.input, }); // note that we sent off the promise so we don't do it again @@ -108,7 +119,7 @@ export function fromPromise( }, getInitialSnapshot: (_, input) => { return { - status: 'active', + status: "active", output: undefined, error: undefined, input, @@ -116,7 +127,7 @@ export function fromPromise( }; }, getPersistedSnapshot: (snapshot) => snapshot, - restoreSnapshot: (snapshot: any) => snapshot + restoreSnapshot: (snapshot: any) => snapshot, }; return logic; @@ -124,64 +135,97 @@ export function fromPromise( function actorSrc(actor?: AnyActorRef): string[] { if (actor === undefined) { - return [] + return []; } if (typeof actor.src !== "string") { - return [] + return []; } - return [actor.src, ...actorSrc(actor._parent)] + return [actor.src, ...actorSrc(actor._parent)]; } -export const promiseMethods = (path: string, logic: TLogic) => { - const api = xStateApi(path) +export const promiseMethods = ( + path: string, + logic: TLogic +) => { + const api = xStateApi(path); return { - invoke: async (ctx: restate.RpcContext, { - systemName, - self, - srcs, - input - }: { systemName: string, self: SerialisableActorRef, srcs: string[], input: NonReducibleUnknown }) => { - console.log("run promise with srcs", srcs, "in system", systemName, "with input", input) + invoke: async ( + ctx: restate.Context, + { + systemName, + self, + srcs, + input, + }: { + systemName: string; + self: SerialisableActorRef; + srcs: string[]; + input: NonReducibleUnknown; + } + ) => { + console.log( + "run promise with srcs", + srcs, + "in system", + systemName, + "with input", + input + ); - const [promiseSrc, ...machineSrcs] = srcs + const [promiseSrc, ...machineSrcs] = srcs; let stateMachine: AnyStateMachine = logic; for (const src of machineSrcs) { - let maybeSM + let maybeSM; try { - maybeSM = resolveReferencedActor(stateMachine, src) + maybeSM = resolveReferencedActor(stateMachine, src); } catch (e) { - throw new TerminalError(`Failed to resolve promise actor ${src}: ${e}`) + throw new TerminalError( + `Failed to resolve promise actor ${src}: ${e}` + ); } if (maybeSM === undefined) { - throw new TerminalError(`Couldn't find state machine actor with src ${src}`) + throw new TerminalError( + `Couldn't find state machine actor with src ${src}` + ); } if ("implementations" in maybeSM) { - stateMachine = maybeSM as AnyStateMachine + stateMachine = maybeSM as AnyStateMachine; } else { - throw new TerminalError(`Couldn't recognise machine actor with src ${src}`) + throw new TerminalError( + `Couldn't recognise machine actor with src ${src}` + ); } } - let promiseActor: PromiseActorLogic | undefined - let maybePA + let promiseActor: PromiseActorLogic | undefined; + let maybePA; try { - maybePA = resolveReferencedActor(stateMachine, promiseSrc) + maybePA = resolveReferencedActor(stateMachine, promiseSrc); } catch (e) { - throw new TerminalError(`Failed to resolve promise actor ${promiseSrc}: ${e}`) + throw new TerminalError( + `Failed to resolve promise actor ${promiseSrc}: ${e}` + ); } if (maybePA === undefined) { - throw new TerminalError(`Couldn't find promise actor with src ${promiseSrc}`) + throw new TerminalError( + `Couldn't find promise actor with src ${promiseSrc}` + ); } - if ("sentinel" in maybePA && maybePA.sentinel === "restate.promise.actor") { - promiseActor = maybePA as PromiseActorLogic + if ( + "sentinel" in maybePA && + maybePA.sentinel === "restate.promise.actor" + ) { + promiseActor = maybePA as PromiseActorLogic; } else { - throw new TerminalError(`Couldn't recognise promise actor with src ${promiseSrc}`) + throw new TerminalError( + `Couldn't recognise promise actor with src ${promiseSrc}` + ); } const resolvedPromise = Promise.resolve( - promiseActor.config({input, ctx}) + promiseActor.config({ input, ctx }) ); await resolvedPromise.then( @@ -191,8 +235,8 @@ export const promiseMethods = (path: string, log target: self, event: { type: RESTATE_PROMISE_RESOLVE, - data: response - } + data: response, + }, }); }, (errorData) => { @@ -201,17 +245,19 @@ export const promiseMethods = (path: string, log target: self, event: { type: RESTATE_PROMISE_REJECT, - data: errorData - } + data: errorData, + }, }); } ); - } - } -} - + }, + }; +}; -export function resolveReferencedActor(machine: AnyStateMachine, src: string): AnyActorLogic | undefined { +export function resolveReferencedActor( + machine: AnyStateMachine, + src: string +): AnyActorLogic | undefined { const match = src.match(/^xstate\.invoke\.(\d+)\.(.*)/)!; if (!match) { return machine.implementations.actors[src] as AnyActorLogic;