From df72240c1e296e1837fbd2b46f539a5f9841f3a7 Mon Sep 17 00:00:00 2001 From: David Khourshid Date: Tue, 13 Aug 2024 22:16:36 -0400 Subject: [PATCH] Refactor example to task queue. Fixes #5032 --- examples/mongodb-persisted-state/TaskQueue.ts | 20 +++++ examples/mongodb-persisted-state/main.ts | 73 ++++++++++--------- 2 files changed, 60 insertions(+), 33 deletions(-) create mode 100644 examples/mongodb-persisted-state/TaskQueue.ts diff --git a/examples/mongodb-persisted-state/TaskQueue.ts b/examples/mongodb-persisted-state/TaskQueue.ts new file mode 100644 index 0000000000..2387a0d2f8 --- /dev/null +++ b/examples/mongodb-persisted-state/TaskQueue.ts @@ -0,0 +1,20 @@ +export class TaskQueue { + private taskQueue: (() => Promise)[] = []; + private status: 'idle' | 'processing' = 'idle'; + + private async processQueue(): Promise { + if (this.status === 'processing') return; + this.status = 'processing'; + + while (this.taskQueue.length > 0) { + const task = this.taskQueue.shift(); + if (task) await task(); + } + this.status = 'idle'; + } + + async addTask(task: () => Promise): Promise { + this.taskQueue.push(task); + await this.processQueue(); + } +} diff --git a/examples/mongodb-persisted-state/main.ts b/examples/mongodb-persisted-state/main.ts index 1d5cf8905c..8472f04d55 100755 --- a/examples/mongodb-persisted-state/main.ts +++ b/examples/mongodb-persisted-state/main.ts @@ -1,6 +1,7 @@ import { __unsafe_getAllOwnEventDescriptors, createActor } from 'xstate'; import { MongoClient, ServerApiVersion } from 'mongodb'; import { donutMachine } from './donutMachine'; +import { TaskQueue } from './TaskQueue'; const uri = ''; @@ -26,44 +27,50 @@ try { state: restoredState?.persistedState }); + const taskQueue = new TaskQueue(); + actor.subscribe({ - async next(snapshot) { - // save persisted state to mongodb - const persistedState = actor.getPersistedSnapshot(); - const updateDoc = { - $set: { - persistedState - } - }; + next(snapshot) { + taskQueue.addTask(async () => { + // save persisted state to mongodb + const persistedState = actor.getPersistedSnapshot(); + const updateDoc = { + $set: { + persistedState + } + }; - const result = await donutCollection.updateOne( - filter, - updateDoc, - options - ); + const result = await donutCollection.updateOne( + filter, + updateDoc, + options + ); - // only log if the upsert occurred - if (result.modifiedCount > 0 || result.upsertedCount > 0) { - console.log('persisted state saved to db. ', result); - } + // only log if the upsert occurred + if (result.modifiedCount > 0 || result.upsertedCount > 0) { + console.log('persisted state saved to db. ', result); + } - const nextEvents = __unsafe_getAllOwnEventDescriptors(snapshot); - console.log( - 'Current state:', - // the current state, bolded - `\x1b[1m${JSON.stringify(snapshot.value)}\x1b[0m\n`, - 'Next events:', - // the next events, each of them bolded - nextEvents - .filter((event) => !event.startsWith('done.')) - .map((event) => `\n \x1b[1m${event}\x1b[0m`) - .join(''), - '\nEnter the next event to send:' - ); + const nextEvents = __unsafe_getAllOwnEventDescriptors(snapshot); + console.log( + 'Current state:', + // the current state, bolded + `\x1b[1m${JSON.stringify(snapshot.value)}\x1b[0m\n`, + 'Next events:', + // the next events, each of them bolded + nextEvents + .filter((event) => !event.startsWith('done.')) + .map((event) => `\n \x1b[1m${event}\x1b[0m`) + .join(''), + '\nEnter the next event to send:' + ); + }); }, - async complete() { - console.log('workflow completed', actor.getSnapshot().output); - await client.close(); + complete() { + taskQueue.addTask(async () => { + console.log('workflow completed', actor.getSnapshot().output); + await client.close(); + }); } });