diff --git a/base-node/index.d.ts b/base-node/index.d.ts index c52cbbd..7e167c7 100644 --- a/base-node/index.d.ts +++ b/base-node/index.d.ts @@ -15,6 +15,14 @@ interface LogMapper { (action: Action, meta: Meta): Promise<[AnyAction, Meta]> } +export interface ReceiveCallback { + ( + processAction: (action: Action, meta: Meta) => Promise, + action: Action, + meta: Meta + ): void +} + interface EmptyHeaders { [key: string]: undefined } @@ -131,6 +139,23 @@ export interface NodeOptions { */ inMap?: LogMapper + /** + * Function that will be called instead of `inMap`, `inFilter` and `Log#add`. + * + * Use it if you want more control over when an action's `access` callback will be called + * and when an action will be added to the log. + * + * ```js + * onReceive(processAction, action, meta) { + * // Process an action later + * myActionQueue.schedule(async () => { + * await processAction(action, meta) // will call `inMap`, `inFilter` and `Log#add` + * }) + * } + * ``` + */ + onReceive?: ReceiveCallback + /** * Filter function to select actions to synchronization. */ diff --git a/index.d.ts b/index.d.ts index 7bf9878..bdde433 100644 --- a/index.d.ts +++ b/index.d.ts @@ -5,6 +5,7 @@ export { Message, NodeOptions, NodeState, + ReceiveCallback, TokenGenerator } from './base-node/index.js' export { ClientNode } from './client-node/index.js' diff --git a/sync/index.js b/sync/index.js index b05ae56..c91d4ac 100644 --- a/sync/index.js +++ b/sync/index.js @@ -56,42 +56,55 @@ export async function syncMessage(added, ...data) { meta.time = meta.time + this.baseTime if (this.timeFix) meta.time = meta.time + this.timeFix - let process = Promise.resolve([action, meta]) - - if (this.options.inMap) { - process = process - .then(([action2, meta2]) => { - return this.options.inMap(action2, meta2) - }) - .catch(e => { - this.error(e) - }) + let process = processAction.bind(this) + if (this.options.onReceive) { + try { + this.options.onReceive(process, action, meta) + } catch (e) { + this.error(e) + } + } else { + await process(action, meta) } + } - await process - .then(filtered => { - if (filtered && this.options.inFilter) { - return this.options - .inFilter(...filtered) - .then(res => { - return res ? filtered : false - }) - .catch(e => { - this.error(e) - }) - } else { - return filtered - } + this.setLastReceived(added) + this.sendSynced(added) +} + +async function processAction(action, meta) { + let process = Promise.resolve([action, meta]) + + if (this.options.inMap) { + process = process + .then(([action2, meta2]) => { + return this.options.inMap(action2, meta2) }) - .then(changed => { - if (!changed) return false - if (this.received) this.received[changed[1].id] = true - return this.log.add(changed[0], changed[1]) + .catch(e => { + this.error(e) }) } - this.setLastReceived(added) - this.sendSynced(added) + await process + .then(filtered => { + if (filtered && this.options.inFilter) { + return this.options + .inFilter(...filtered) + .then(res => { + return res ? filtered : false + }) + .catch(e => { + this.error(e) + }) + } else { + return filtered + } + }) + .then(changed => { + if (!changed) return false + if (this.received) this.received[changed[1].id] = true + return this.log.add(changed[0], changed[1]) + }) } export function syncedMessage(synced) { diff --git a/sync/index.test.ts b/sync/index.test.ts index a0338b3..ef06c03 100644 --- a/sync/index.test.ts +++ b/sync/index.test.ts @@ -2,6 +2,7 @@ import { delay } from 'nanodelay' import { test } from 'uvu' import { equal, is, type } from 'uvu/assert' +import type { Action } from '../index.js' import { ClientNode, ServerNode, TestPair, TestTime } from '../index.js' let destroyable: TestPair @@ -226,6 +227,79 @@ test('maps input actions', async () => { equal(pair.rightNode.log.actions(), [{ type: 'a1' }]) }) +test('handles error in onReceive', async () => { + let error = new Error('test') + let catched: Error[] = [] + + let pair = await createTest() + pair.rightNode.options.onReceive = () => { + throw error + } + pair.rightNode.catch(e => { + catched.push(e) + }) + pair.leftNode.log.add({ type: 'a' }) + + await delay(50) + equal(catched, [error]) +}) + +test('onReceive is called instead of `inMap`, `inFilter` and `Log#add`', async () => { + let actions: Action[] = [] + let pair = await createTest(created => { + created.rightNode.options.inFilter = async action => { + return action.type !== 'c1' + } + created.rightNode.options.inMap = async (action, meta) => { + return [{ type: action.type + '1' }, meta] + } + created.rightNode.options.onReceive = (_, action) => { + actions.push(action) + } + created.leftNode.log.add({ type: 'a' }) + created.leftNode.log.add({ type: 'b' }) + created.leftNode.log.add({ type: 'c' }) + }) + equal(pair.leftNode.log.actions(), [ + { type: 'a' }, + { type: 'b' }, + { type: 'c' } + ]) + equal(pair.rightNode.log.actions(), []) + equal(actions, [{ type: 'a' }, { type: 'b' }, { type: 'c' }]) +}) + +test('onReceive processAction calls `inMap`, `inFilter` and `Log#add`', async () => { + let actions: Action[] = [] + let finish: any = null + let promise = new Promise(resolve => { + finish = resolve + }) + let pair = await createTest(created => { + created.rightNode.options.inFilter = async action => { + return action.type !== 'c1' + } + created.rightNode.options.inMap = async (action, meta) => { + return [{ type: action.type + '1' }, meta] + } + created.rightNode.options.onReceive = (processAction, action, meta) => { + actions.push(action) + processAction(action, meta).then(finish) + } + created.leftNode.log.add({ type: 'a' }) + created.leftNode.log.add({ type: 'b' }) + created.leftNode.log.add({ type: 'c' }) + }) + await promise + equal(pair.leftNode.log.actions(), [ + { type: 'a' }, + { type: 'b' }, + { type: 'c' } + ]) + equal(actions, [{ type: 'a' }, { type: 'b' }, { type: 'c' }]) + equal(pair.rightNode.log.actions(), [{ type: 'a1' }, { type: 'b1' }]) +}) + test('uses input map before filter', async () => { let calls: string[] = [] let pair = await createTest()