Skip to content

Commit

Permalink
Add onSync callback for implementing action queues (#52)
Browse files Browse the repository at this point in the history
* add onActions callback for implementing action queues

* add ignoreDestroying to meta in order to support waiting for actions to finish in queue before destroy

* add test for ignoreDestroying, export ActionsCallback type

* handle error in onActions

* improve onActions description

* add 1 more test for onActions

* remove ignoreDestroying, move related logic to logux server

* use different code example for onActions

* fix typo

* call inMap and inFilter outside of onActions

* review docs issue

* Update base-node/index.d.ts

Co-authored-by: Andrey Sitnik <[email protected]>

* Update base-node/index.d.ts

Co-authored-by: Andrey Sitnik <[email protected]>

* rename onActions to onSync, improve onSync tests

* add inMap and inFilter to the onSync test

* change onSync api to support calling access inside of queue

* rename onSync to onReceive

---------

Co-authored-by: Andrey Sitnik <[email protected]>
  • Loading branch information
VladBrok and ai authored Sep 10, 2023
1 parent 8346e06 commit 8cb9d03
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 30 deletions.
25 changes: 25 additions & 0 deletions base-node/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ interface LogMapper {
(action: Action, meta: Meta): Promise<[AnyAction, Meta]>
}

export interface ReceiveCallback {
(
processAction: (action: Action, meta: Meta) => Promise<void>,
action: Action,
meta: Meta
): void
}

interface EmptyHeaders {
[key: string]: undefined
}
Expand Down Expand Up @@ -131,6 +139,23 @@ export interface NodeOptions<Headers extends object = {}> {
*/
inMap?: LogMapper

/**
* Function that will be called instead of `inMap`, `inFilter` and `Log#add`.
*
* Use it if you want more control over when an action's `access` callback will be called
* and when an action will be added to the log.
*
* ```js
* onReceive(processAction, action, meta) {
* // Process an action later
* myActionQueue.schedule(async () => {
* await processAction(action, meta) // will call `inMap`, `inFilter` and `Log#add`
* })
* }
* ```
*/
onReceive?: ReceiveCallback

/**
* Filter function to select actions to synchronization.
*/
Expand Down
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export {
Message,
NodeOptions,
NodeState,
ReceiveCallback,
TokenGenerator
} from './base-node/index.js'
export { ClientNode } from './client-node/index.js'
Expand Down
73 changes: 43 additions & 30 deletions sync/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,42 +56,55 @@ export async function syncMessage(added, ...data) {
meta.time = meta.time + this.baseTime
if (this.timeFix) meta.time = meta.time + this.timeFix

let process = Promise.resolve([action, meta])

if (this.options.inMap) {
process = process
.then(([action2, meta2]) => {
return this.options.inMap(action2, meta2)
})
.catch(e => {
this.error(e)
})
let process = processAction.bind(this)
if (this.options.onReceive) {
try {
this.options.onReceive(process, action, meta)
} catch (e) {
this.error(e)
}
} else {
await process(action, meta)
}
}

await process
.then(filtered => {
if (filtered && this.options.inFilter) {
return this.options
.inFilter(...filtered)
.then(res => {
return res ? filtered : false
})
.catch(e => {
this.error(e)
})
} else {
return filtered
}
this.setLastReceived(added)
this.sendSynced(added)
}

async function processAction(action, meta) {
let process = Promise.resolve([action, meta])

if (this.options.inMap) {
process = process
.then(([action2, meta2]) => {
return this.options.inMap(action2, meta2)
})
.then(changed => {
if (!changed) return false
if (this.received) this.received[changed[1].id] = true
return this.log.add(changed[0], changed[1])
.catch(e => {
this.error(e)
})
}

this.setLastReceived(added)
this.sendSynced(added)
await process
.then(filtered => {
if (filtered && this.options.inFilter) {
return this.options
.inFilter(...filtered)
.then(res => {
return res ? filtered : false
})
.catch(e => {
this.error(e)
})
} else {
return filtered
}
})
.then(changed => {
if (!changed) return false
if (this.received) this.received[changed[1].id] = true
return this.log.add(changed[0], changed[1])
})
}

export function syncedMessage(synced) {
Expand Down
74 changes: 74 additions & 0 deletions sync/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { delay } from 'nanodelay'
import { test } from 'uvu'
import { equal, is, type } from 'uvu/assert'

import type { Action } from '../index.js'
import { ClientNode, ServerNode, TestPair, TestTime } from '../index.js'

let destroyable: TestPair
Expand Down Expand Up @@ -226,6 +227,79 @@ test('maps input actions', async () => {
equal(pair.rightNode.log.actions(), [{ type: 'a1' }])
})

test('handles error in onReceive', async () => {
let error = new Error('test')
let catched: Error[] = []

let pair = await createTest()
pair.rightNode.options.onReceive = () => {
throw error
}
pair.rightNode.catch(e => {
catched.push(e)
})
pair.leftNode.log.add({ type: 'a' })

await delay(50)
equal(catched, [error])
})

test('onReceive is called instead of `inMap`, `inFilter` and `Log#add`', async () => {
let actions: Action[] = []
let pair = await createTest(created => {
created.rightNode.options.inFilter = async action => {
return action.type !== 'c1'
}
created.rightNode.options.inMap = async (action, meta) => {
return [{ type: action.type + '1' }, meta]
}
created.rightNode.options.onReceive = (_, action) => {
actions.push(action)
}
created.leftNode.log.add({ type: 'a' })
created.leftNode.log.add({ type: 'b' })
created.leftNode.log.add({ type: 'c' })
})
equal(pair.leftNode.log.actions(), [
{ type: 'a' },
{ type: 'b' },
{ type: 'c' }
])
equal(pair.rightNode.log.actions(), [])
equal(actions, [{ type: 'a' }, { type: 'b' }, { type: 'c' }])
})

test('onReceive processAction calls `inMap`, `inFilter` and `Log#add`', async () => {
let actions: Action[] = []
let finish: any = null
let promise = new Promise(resolve => {
finish = resolve
})
let pair = await createTest(created => {
created.rightNode.options.inFilter = async action => {
return action.type !== 'c1'
}
created.rightNode.options.inMap = async (action, meta) => {
return [{ type: action.type + '1' }, meta]
}
created.rightNode.options.onReceive = (processAction, action, meta) => {
actions.push(action)
processAction(action, meta).then(finish)
}
created.leftNode.log.add({ type: 'a' })
created.leftNode.log.add({ type: 'b' })
created.leftNode.log.add({ type: 'c' })
})
await promise
equal(pair.leftNode.log.actions(), [
{ type: 'a' },
{ type: 'b' },
{ type: 'c' }
])
equal(actions, [{ type: 'a' }, { type: 'b' }, { type: 'c' }])
equal(pair.rightNode.log.actions(), [{ type: 'a1' }, { type: 'b1' }])
})

test('uses input map before filter', async () => {
let calls: string[] = []
let pair = await createTest()
Expand Down

0 comments on commit 8cb9d03

Please sign in to comment.