Skip to content

Commit

Permalink
Refactor example to task queue. Fixes #5032
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkpiano committed Aug 14, 2024
1 parent 758a787 commit df72240
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 33 deletions.
20 changes: 20 additions & 0 deletions examples/mongodb-persisted-state/TaskQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export class TaskQueue {
private taskQueue: (() => Promise<void>)[] = [];
private status: 'idle' | 'processing' = 'idle';

private async processQueue(): Promise<void> {
if (this.status === 'processing') return;
this.status = 'processing';

while (this.taskQueue.length > 0) {
const task = this.taskQueue.shift();
if (task) await task();
}
this.status = 'idle';
}

async addTask(task: () => Promise<void>): Promise<void> {
this.taskQueue.push(task);
await this.processQueue();
}
}
73 changes: 40 additions & 33 deletions examples/mongodb-persisted-state/main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { __unsafe_getAllOwnEventDescriptors, createActor } from 'xstate';
import { MongoClient, ServerApiVersion } from 'mongodb';
import { donutMachine } from './donutMachine';
import { TaskQueue } from './TaskQueue';

const uri = '<your mongodb connection string>';

Expand All @@ -26,44 +27,50 @@ try {
state: restoredState?.persistedState
});

const taskQueue = new TaskQueue();

actor.subscribe({
async next(snapshot) {
// save persisted state to mongodb
const persistedState = actor.getPersistedSnapshot();
const updateDoc = {
$set: {
persistedState
}
};
next(snapshot) {
taskQueue.addTask(async () => {
// save persisted state to mongodb
const persistedState = actor.getPersistedSnapshot();
const updateDoc = {
$set: {
persistedState
}
};

const result = await donutCollection.updateOne(
filter,
updateDoc,
options
);
const result = await donutCollection.updateOne(
filter,
updateDoc,
options
);

// only log if the upsert occurred
if (result.modifiedCount > 0 || result.upsertedCount > 0) {
console.log('persisted state saved to db. ', result);
}
// only log if the upsert occurred
if (result.modifiedCount > 0 || result.upsertedCount > 0) {
console.log('persisted state saved to db. ', result);
}

const nextEvents = __unsafe_getAllOwnEventDescriptors(snapshot);
console.log(
'Current state:',
// the current state, bolded
`\x1b[1m${JSON.stringify(snapshot.value)}\x1b[0m\n`,
'Next events:',
// the next events, each of them bolded
nextEvents
.filter((event) => !event.startsWith('done.'))
.map((event) => `\n \x1b[1m${event}\x1b[0m`)
.join(''),
'\nEnter the next event to send:'
);
const nextEvents = __unsafe_getAllOwnEventDescriptors(snapshot);
console.log(
'Current state:',
// the current state, bolded
`\x1b[1m${JSON.stringify(snapshot.value)}\x1b[0m\n`,
'Next events:',
// the next events, each of them bolded
nextEvents
.filter((event) => !event.startsWith('done.'))
.map((event) => `\n \x1b[1m${event}\x1b[0m`)
.join(''),
'\nEnter the next event to send:'
);
});
},
async complete() {
console.log('workflow completed', actor.getSnapshot().output);
await client.close();
complete() {
taskQueue.addTask(async () => {
console.log('workflow completed', actor.getSnapshot().output);
await client.close();
});
}
});

Expand Down

0 comments on commit df72240

Please sign in to comment.