From 6f5eb96b04270d450700a6c8526c71978f9ecae6 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Tue, 2 Jan 2024 09:08:04 +0000 Subject: [PATCH] Improve event cancellation --- typescript/xstate/src/app.ts | 101 +++++++------- typescript/xstate/src/lib.ts | 218 +++++++++++++++---------------- typescript/xstate/src/promise.ts | 62 +++++++-- 3 files changed, 209 insertions(+), 172 deletions(-) diff --git a/typescript/xstate/src/app.ts b/typescript/xstate/src/app.ts index 32713ab1..f9d8217c 100644 --- a/typescript/xstate/src/app.ts +++ b/typescript/xstate/src/app.ts @@ -11,56 +11,69 @@ import * as restate from "@restatedev/restate-sdk"; -import {createMachine} from 'xstate'; +import {createMachine, sendTo} from 'xstate'; import {bindXStateRouter} from "./lib"; import {fromPromise} from "./promise"; -export const workflow = createMachine( - { - id: 'async-function-invocation', - initial: 'Send email', - types: {} as { - context: { - customer: string; - }; - input: { - customer: string; - }; - }, - context: ({input}) => ({ - customer: input.customer - }), - states: { - 'Send email': { - invoke: { - src: 'sendEmail', - input: ({context}) => ({ - customer: context.customer - }), - onDone: 'Email sent' - } +const authServerMachine = createMachine({ + id: 'server', + initial: 'waitingForCode', + states: { + waitingForCode: { + on: { + CODE: { + target: "process" + }, }, - 'Email sent': { - type: 'final' + }, + process: { + invoke: { + id: 'process', + src: 'authorise', + onDone: { + actions: sendTo( + ({self}) => self._parent!, + { type: 'TOKEN' }, + { delay: 1000 }, + ), + }, } - } - }, - { - actors: { - sendEmail: fromPromise(async ({ input }) => { - console.log('Sending email to', input.customer); - - await new Promise((resolve) => - setTimeout(() => { - console.log('Email sent to', input.customer); - resolve(); - }, 1000) - ); - }) - } + }, + } +}, { + actors: { + authorise: fromPromise(() => new Promise((resolve) => setTimeout(resolve, 5000))), } -); +}); + +const authClientMachine = createMachine({ + id: 'client', + initial: 'idle', + states: { + idle: { + on: { + AUTH: {target: 'authorizing'}, + }, + }, + authorizing: { + invoke: { + id: 'auth-server', + src: authServerMachine, + }, + entry: sendTo('auth-server', ({self}) => ({ + type: 'CODE', + sender: self, + })), + on: { + TOKEN: {target: 'authorized'}, + }, + }, + authorized: { + type: 'final', + }, + }, +}); -bindXStateRouter(restate.createServer(), "foo", workflow).listen() +bindXStateRouter(restate.createServer(), "foo", authClientMachine).listen() diff --git a/typescript/xstate/src/lib.ts b/typescript/xstate/src/lib.ts index f36b0d2c..344b9aae 100644 --- a/typescript/xstate/src/lib.ts +++ b/typescript/xstate/src/lib.ts @@ -1,11 +1,12 @@ import { - Actor, ActorLogicFrom, + Actor, + ActorLogicFrom, ActorOptions, ActorSystem, ActorSystemInfo, - AnyActorLogic, AnyActorRef, - AnyEventObject, AnyStateMachine, + AnyEventObject, + AnyStateMachine, createActor as createXActor, EventFromLogic, EventObject, @@ -21,6 +22,10 @@ import { import * as restate from "@restatedev/restate-sdk"; import {TerminalError} from "@restatedev/restate-sdk"; import {promiseMethods} from "./promise"; +import { + AreAllImplementationsAssumedToBeProvided, + MissingImplementationsError +} from "xstate/dist/declarations/src/typegenTypes"; export interface RestateActorSystem extends ActorSystem { _bookId: () => string; @@ -29,7 +34,7 @@ export interface RestateActorSystem extends ActorSyst _sendInspectionEvent: ( event: HomomorphicOmit ) => void; - actorByID: (id: string) => AnyActorRef | undefined, + actor: (sessionId: string) => AnyActorRef | undefined, _set: (key: K, actorRef: T['actors'][K]) => void; _relay: ( source: AnyActorRef | SerialisableActorRef | undefined, @@ -62,36 +67,67 @@ type SerialisableScheduledEvent = { delay: number; source: SerialisableActorRef; target: SerialisableActorRef; + uuid: string; } -function createSystem(ctx: restate.RpcContext, api: XStateApi>, systemName: string): RestateActorSystem { +async function createSystem(ctx: restate.RpcContext, api: XStateApi>, systemName: string): Promise> { + const events = await ctx.get<{ [key: string]: SerialisableScheduledEvent }>("events") ?? {} + const childrenByID = await ctx.get<{ [key: string]: SerialisableActorRef }>("children") ?? {} + let idCounter = 0; const children = new Map(); - const childrenByID = new Map(); const keyedActors = new Map(); const reverseKeyedActors = new WeakMap(); const observers = new Set>(); const scheduler = { - schedule(source: AnyActorRef, target: AnyActorRef, event: EventObject, delay: number, id: string | undefined): void { - ctx.send(api.actor).schedule(systemName, { - source: serialiseActorRef(source), - target: serialiseActorRef(target), + schedule(_source: AnyActorRef, _target: AnyActorRef, event: EventObject, delay: number, id: string | undefined): void { + if (id === undefined) { + id = ctx.rand.random().toString(36).slice(2) + } + + const {source, target} = {source: serialiseActorRef(_source), target: serialiseActorRef(_target)} + + console.log("schedule from", source.id, "to", target.id, "with id", id, "and delay", delay) + + const scheduledEvent: SerialisableScheduledEvent = { + source, + target, event, delay, - id - }) + id, + startedAt: Date.now(), + uuid: ctx.rand.uuidv4(), + }; + const scheduledEventId = createScheduledEventId(source, id); + if (scheduledEventId in events) { + console.log("Ignoring duplicated schedule from", source.id, "to", target.id) + return + } + + events[scheduledEventId] = scheduledEvent; + + ctx.sendDelayed(api.actor, delay).send(systemName, {scheduledEvent, source, target, event}) + ctx.set("events", events) }, cancel(source: AnyActorRef, id: string): void { - ctx.send(api.actor).cancel(systemName, { - source: serialiseActorRef(source), - id - }) + console.log("cancel schedule from", source.id, "with id", id) + + const scheduledEventId = createScheduledEventId(source, id); + + delete events[scheduledEventId]; + ctx.set("events", events) }, cancelAll(actorRef: AnyActorRef): void { - ctx.send(api.actor).cancelAll(systemName, { - actorRef: serialiseActorRef(actorRef), - }) + console.log("cancel all for", actorRef.id) + + for (const scheduledEventId in events) { + const scheduledEvent = events[scheduledEventId]; + if (scheduledEvent.source.sessionId === actorRef.sessionId) { + delete events[scheduledEventId]; + } + } + ctx.set("events", events) }, } @@ -100,15 +136,30 @@ function createSystem(ctx: restate.RpcContext, api: X api, systemName, - _bookId: () => `x:${idCounter++}`, + _bookId: () => ctx.rand.uuidv4(), _register: (sessionId, actorRef) => { + if (actorRef.id in childrenByID) { + // rehydration case; ensure session ID maintains continuity + sessionId = childrenByID[actorRef.id].sessionId + actorRef.sessionId = sessionId + } else { + // new actor case + childrenByID[actorRef.id] = serialiseActorRef(actorRef) + ctx.set("children", childrenByID) + } + console.log("register", sessionId, actorRef.id) children.set(sessionId, actorRef); - childrenByID.set(actorRef.id, actorRef); return sessionId; }, _unregister: (actorRef) => { + if (actorRef.id in childrenByID) { + // rehydration case; ensure session ID maintains continuity + actorRef.sessionId = childrenByID[actorRef.id].sessionId + } + children.delete(actorRef.sessionId); - childrenByID.delete(actorRef.id); + delete childrenByID[actorRef.id] + ctx.set("children", childrenByID) const systemId = reverseKeyedActors.get(actorRef); if (systemId !== undefined) { @@ -119,12 +170,12 @@ function createSystem(ctx: restate.RpcContext, api: X _sendInspectionEvent: (event) => { const resolvedInspectionEvent: InspectionEvent = { ...event, - rootId: "x:0", + rootId: "root", }; observers.forEach((observer) => observer.next?.(resolvedInspectionEvent)); }, - actorByID: (id) => { - return childrenByID.get(id) + actor: (sessionId) => { + return children.get(sessionId) }, get: (systemId) => { return keyedActors.get(systemId) as T['actors'][any]; @@ -144,7 +195,7 @@ function createSystem(ctx: restate.RpcContext, api: X observers.add(observer); }, _relay: (source, target, event) => { - console.log("Relaying message from", source?.id, "to", target.id, ":", event); + console.log("Relaying message from", source?.id, "to", target.id, ":", event.type); (target as any)._send(event); }, scheduler, @@ -164,12 +215,13 @@ interface FakeParent extends AnyActorRef { _send: (event: EventFromLogic) => void; } -export async function createActor(ctx: restate.RpcContext, api: XStateApi, systemName: string, logic: TLogic, options?: ActorOptions): Promise> { - const system = createSystem(ctx, api, systemName) +export async function createActor(ctx: restate.RpcContext, api: XStateApi, systemName: string, logic: TLogic extends AnyStateMachine ? AreAllImplementationsAssumedToBeProvided extends true ? TLogic : MissingImplementationsError : TLogic, options?: ActorOptions): Promise> { + const system = await createSystem(ctx, api, systemName) + const snapshot = await ctx.get>("snapshot") ?? undefined; const parent: FakeParent = { id: "fakeRoot", - sessionId: "", + sessionId: "fakeRoot", send: () => { }, _send: () => { @@ -189,7 +241,7 @@ export async function createActor(ctx: restate.R stop: () => { }, // TODO system, - src: "root", + src: "fakeRoot", subscribe: (): Subscription => { return { unsubscribe() { @@ -213,7 +265,7 @@ export async function createActor(ctx: restate.R system.inspect(toObserver(options.inspect)); } - return createXActor(logic as any, {...options, parent} as any); + return createXActor(logic, {id: "root", ...options, parent, snapshot} as any); } const actorMethods = (path: string, logic: TLogic) => { @@ -221,6 +273,9 @@ const actorMethods = (path: string, logic: TLogi return { create: async (ctx: restate.RpcContext, systemName: string, request?: { input?: InputFrom }): Promise> => { + ctx.clear("snapshot") + ctx.clear("events") + ctx.clear("children") const root = (await createActor(ctx, api, systemName, logic, { input: request?.input, @@ -230,33 +285,34 @@ const actorMethods = (path: string, logic: TLogi return root.getPersistedSnapshot() }, - send: async (ctx: restate.RpcContext, systemName: string, request?: { scheduledEventId?: ScheduledEventId, source?: SerialisableActorRef, target?: SerialisableActorRef, event: EventFromLogic }): Promise | undefined> => { + send: async (ctx: restate.RpcContext, systemName: string, request?: { scheduledEvent?: SerialisableScheduledEvent, source?: SerialisableActorRef, target?: SerialisableActorRef, event: AnyEventObject }): Promise | undefined> => { if (!request) { throw new TerminalError("Must provide a request") } - const snapshot = await ctx.get>("snapshot") ?? undefined; - - if (request.scheduledEventId) { - const events = await ctx.get<{ [key: ScheduledEventId]: SerialisableScheduledEvent }>("events") ?? {} - if (!(request.scheduledEventId in events)) { - console.log("Received now cancelled event", request.scheduledEventId, "for target", request.target) + if (request.scheduledEvent) { + const events = await ctx.get<{ [key: string]: SerialisableScheduledEvent }>("events") ?? {} + const scheduledEventId = createScheduledEventId(request.scheduledEvent.source, request.scheduledEvent.id) + if (!(scheduledEventId in events)) { + console.log("Received now cancelled event", scheduledEventId, "for target", request.target) return } - delete events[request.scheduledEventId] + if (events[scheduledEventId].uuid !== request.scheduledEvent.uuid) { + console.log("Received now replaced event", scheduledEventId, "for target", request.target) + return + } + delete events[scheduledEventId] ctx.set("events", events) } - const root = (await createActor(ctx, api, systemName, logic, { - snapshot, - })).start(); + const root = (await createActor(ctx, api, systemName, logic)).start(); let actor; if (request.target) { - actor = (root.system as RestateActorSystem).actorByID(request.target.id) + actor = (root.system as RestateActorSystem).actor(request.target.sessionId) if (!actor) { - throw new TerminalError(`Actor ${request.target.id} not found`) + throw new TerminalError(`Actor ${request.target.id} not found; it may have since stopped`) } } else { actor = root @@ -270,74 +326,10 @@ const actorMethods = (path: string, logic: TLogi return nextSnapshot }, snapshot: async (ctx: restate.RpcContext, systemName: string): Promise> => { - const snapshot = await ctx.get>("snapshot") ?? undefined; - - const root = (await createActor(ctx, api, systemName, logic, { - snapshot, - })); + const root = (await createActor(ctx, api, systemName, logic)); return root.getPersistedSnapshot() }, - schedule: async (ctx: restate.RpcContext, system: string, {source, target, event, delay, id}: { - source: SerialisableActorRef, - target: SerialisableActorRef, - event: EventObject, - delay: number, - id?: string, - } - ) => { - if (id === undefined) { - id = ctx.rand.random().toString(36).slice(2) - } - - // TODO check for existing id here? - - console.log("schedule from", source.id, "to", target.id, "with id", id, "and delay", delay) - - const events = await ctx.get<{ [key: ScheduledEventId]: SerialisableScheduledEvent }>("events") ?? {} - - const scheduledEvent: SerialisableScheduledEvent = { - source, - target, - event, - delay, - id, - startedAt: Date.now() - }; - const scheduledEventId = createScheduledEventId(source, id); - events[scheduledEventId] = scheduledEvent; - - ctx.sendDelayed(api.actor, delay).send(system, {scheduledEventId, target, event}) - ctx.set("events", events) - }, - cancel: async (ctx: restate.RpcContext, system: string, { - source, - id - }: { source: SerialisableActorRef, id: string }) => { - console.log("cancel schedule from", source.id, "with id", id) - - const events = await ctx.get<{ [key: ScheduledEventId]: SerialisableScheduledEvent }>("events") ?? {} - - const scheduledEventId = createScheduledEventId(source, id); - - delete events[scheduledEventId]; - ctx.set("events", events) - }, - cancelAll: async (ctx: restate.RpcContext, system: string, {actorRef}: { actorRef: SerialisableActorRef }) => { - console.log("cancel all for", actorRef.id) - - const events = await ctx.get<{ [key: ScheduledEventId]: SerialisableScheduledEvent }>("events") ?? {} - for (const scheduledEventId in events) { - const scheduledEvent = - events[ - scheduledEventId as ScheduledEventId - ]; - if (scheduledEvent.source === actorRef) { - delete events[scheduledEventId as ScheduledEventId]; - } - } - ctx.set("events", events) - } } } @@ -357,14 +349,12 @@ export const xStateApi = (path: string): XStateA type ActorRouter = restate.KeyedRouter>> type PromiseRouter = restate.UnKeyedRouter>> -type XStateApi = {actor: restate.ServiceApi>, promise: restate.ServiceApi>} - -type ScheduledEventId = string & { __scheduledEventId: never }; +type XStateApi = { actor: restate.ServiceApi>, promise: restate.ServiceApi> } function createScheduledEventId( actorRef: SerialisableActorRef, id: string -): ScheduledEventId { - return `${actorRef.sessionId}.${id}` as ScheduledEventId; +): string { + return `${actorRef.sessionId}.${id}`; } diff --git a/typescript/xstate/src/promise.ts b/typescript/xstate/src/promise.ts index 80f7384b..2c43db33 100644 --- a/typescript/xstate/src/promise.ts +++ b/typescript/xstate/src/promise.ts @@ -2,6 +2,7 @@ import { ActorLogic, ActorRefFrom, AnyActorLogic, + AnyActorRef, AnyStateMachine, InvokeConfig, NonReducibleUnknown, @@ -96,7 +97,7 @@ export function fromPromise( rs.ctx.send(rs.api.promise).invoke({ systemName: rs.systemName, self: serialiseActorRef(self), - src: self.src as string, + srcs: actorSrc(self), input: state.input }); @@ -110,7 +111,8 @@ export function fromPromise( status: 'active', output: undefined, error: undefined, - input + input, + sent: false, }; }, getPersistedSnapshot: (snapshot) => snapshot, @@ -120,6 +122,16 @@ export function fromPromise( return logic; } +function actorSrc(actor?: AnyActorRef): string[] { + if (actor === undefined) { + return [] + } + if (typeof actor.src !== "string") { + return [] + } + return [actor.src, ...actorSrc(actor._parent)] +} + export const promiseMethods = (path: string, logic: TLogic) => { const api = xStateApi(path) @@ -127,27 +139,49 @@ export const promiseMethods = (path: string, log invoke: async (ctx: restate.RpcContext, { systemName, self, - src, + srcs, input - }: { systemName: string, self: SerialisableActorRef, src: string, input: NonReducibleUnknown }) => { - console.log("run promise with src", src, "in system", systemName, "with input", input) + }: { systemName: string, self: SerialisableActorRef, srcs: string[], input: NonReducibleUnknown }) => { + console.log("run promise with srcs", srcs, "in system", systemName, "with input", input) + + const [promiseSrc, ...machineSrcs] = srcs + + let stateMachine: AnyStateMachine = logic; + for (const src of machineSrcs) { + let maybeSM + try { + maybeSM = resolveReferencedActor(stateMachine, src) + } catch (e) { + throw new TerminalError(`Failed to resolve promise actor ${src}: ${e}`) + } + if (maybeSM === undefined) { + throw new TerminalError(`Couldn't find state machine actor with src ${src}`) + } + if ("implementations" in maybeSM) { + stateMachine = maybeSM as AnyStateMachine + } else { + throw new TerminalError(`Couldn't recognise machine actor with src ${src}`) + } + } - let actor: PromiseActorLogic | undefined + let promiseActor: PromiseActorLogic | undefined + let maybePA try { - actor = resolveReferencedActor(logic, src) as PromiseActorLogic + maybePA = resolveReferencedActor(stateMachine, promiseSrc) } catch (e) { - throw new TerminalError(`Failed to resolve promise actor ${src}: ${e}`) + throw new TerminalError(`Failed to resolve promise actor ${promiseSrc}: ${e}`) } - if (actor === undefined) { - throw new TerminalError(`Couldn't find promise actor ${src}`) + if (maybePA === undefined) { + throw new TerminalError(`Couldn't find promise actor with src ${promiseSrc}`) } - - if (actor.sentinel !== "restate.promise.actor") { - throw new TerminalError("Found an actor we don't recognise") + if ("sentinel" in maybePA && maybePA.sentinel === "restate.promise.actor") { + promiseActor = maybePA as PromiseActorLogic + } else { + throw new TerminalError(`Couldn't recognise promise actor with src ${promiseSrc}`) } const resolvedPromise = Promise.resolve( - actor.config({input, ctx: ctx}) + promiseActor.config({input, ctx}) ); await resolvedPromise.then(