diff --git a/sheaf.js b/sheaf.js index f7c2a08a..8ebec759 100644 --- a/sheaf.js +++ b/sheaf.js @@ -168,28 +168,25 @@ class Sheaf { this._operationId = 0xffffffff // Concurrency and work queues. One keyed queue for page writes, the // other queue will only use a single key for all housekeeping. - this._fracture = { - appender: new Fracture(destructible.durable($ => $(), 'appender'), { - turnstile: options.turnstile, - entry: id => ({ - id: this._operationId = (this._operationId + 1 & 0xffffffff) >>> 0, - writes: [], - cartridge: this.pages.hold(id), - future: new Future - }), - worker: this._append.bind(this) - }), - housekeeper: new Fracture(destructible.durable($ => $(), 'housekeeper'), { - turnstile: options.turnstile, - entry: () => ({ candidates: [] }), - worker: this._keephouse.bind(this) - }) - } + this._fracture = new Fracture(destructible.durable($ => $(), 'appender'), { + turnstile: options.turnstile, + entry: key => { + switch (key) { + case 'keephouse': + return { candidates: [] } + default: + return { + id: this._operationId = (this._operationId + 1 & 0xffffffff) >>> 0, + writes: [], + cartridge: this.pages.hold(key), + future: new Future + } + } + }, + worker: this._fractured.bind(this) + }) - this._fracture.housekeeper.deferrable.increment() - this._fracture.appender.deferrable.increment() - this._fracture.appender.name = 'appender' - this._fracture.housekeeper.name = 'housekeeper' + this._fracture.deferrable.increment() // **TODO** Not yet used, would `mkdir` any pages that need to be // inspected for balance. @@ -211,8 +208,7 @@ class Sheaf { // **TODO** Really want to just push keys into a file for // inspection when we reopen for housekeeping. await this.drain() - this._fracture.housekeeper.deferrable.decrement() - this._fracture.appender.deferrable.decrement() + this._fracture.deferrable.decrement() if (this._root != null) { this._root.cartridge.remove() this._root = null @@ -470,7 +466,7 @@ class Sheaf { // they are written before the split? Must be. // - async _append ({ canceled, key, value: { writes, cartridge, future } }) { + async _append (canceled, key, { writes, cartridge, future }) { await this.deferrable.copacetic($ => $(), 'append', null, async () => { try { this.deferrable.progress() @@ -486,7 +482,7 @@ class Sheaf { page.items.length <= this.leaf.merge ) ) { - this._fracture.housekeeper.enqueue('housekeeping').candidates.push(page.key || page.items[0].key) + this._fracture.enqueue('keephouse').candidates.push(page.key || page.items[0].key) } await this.storage.writeLeaf(page, writes) } finally { @@ -498,35 +494,15 @@ class Sheaf { append (id, buffer, writes) { this.deferrable.operational() - const append = this._fracture.appender.enqueue(id) + const append = this._fracture.enqueue(id) append.writes.push(buffer) if (writes[append.id] == null) { writes[append.id] = append.future } } - _drain () { - return [ - this._fracture.housekeeper.drain(), - this._fracture.appender.drain() - ].filter(drain => drain != null) - } - - _iteration = 0 - drain () { - let drains = this._drain() - if (drains.length != 0) { - return (async () => { - do { - for (const promise of drains) { - await promise - } - drains = this._drain() - } while (drains.length != 0) - }) () - } - return null + return this._fracture.drain() } recordify (header, parts = []) { @@ -720,8 +696,8 @@ class Sheaf { // Create our journaled tree alterations. const pauses = [] try { - pauses.push(await this._fracture.appender.pause(left.page.id)) - pauses.push(await this._fracture.appender.pause(right.page.id)) + pauses.push(await this._fracture.pause(left.page.id)) + pauses.push(await this._fracture.pause(right.page.id)) // Race is the wrong word, it's our synchronous time. We have to split // the page and then write them out. Anyone writing to this leaf has to // to be able to see the split so that they surrender their cursor if @@ -784,7 +760,7 @@ class Sheaf { page.items.length >= this.leaf.split && this.comparator.branch(page.items[0].key, page.items[page.items.length - 1].key) != 0 ) { - this._fracture.housekeeper.enqueue('housekeeping').candidates.push(page.key || page.items[0].key) + this._fracture.enqueue('keephouse').candidates.push(page.key || page.items[0].key) } } // @@ -1054,8 +1030,8 @@ class Sheaf { const pauses = [] try { - pauses.push(await this._fracture.appender.pause(left.page.id)) - pauses.push(await this._fracture.appender.pause(right.page.id)) + pauses.push(await this._fracture.pause(left.page.id)) + pauses.push(await this._fracture.pause(right.page.id)) // Add the items in the right page to the end of the left page. const items = left.page.items @@ -1079,7 +1055,7 @@ class Sheaf { // See if the merged page needs to split or merge further. if (this._isDirty(left.page, this.leaf)) { - this._fracture.housekeeper.enqueue('housekeeping').candidates.push(left.entry.value.items[0].key) + this._fracture.enqueue('keephouse').candidates.push(left.entry.value.items[0].key) } // Replace the key of the pivot if necessary. @@ -1144,7 +1120,7 @@ class Sheaf { // of the `merged` files. If not we can delete the merged files. // - async _keephouse ({ canceled, value: { candidates } }) { + async _keephouse (canceled, { candidates }) { await this.deferrable.copacetic($ => $(), 'append', null, async () => { this.deferrable.progress() if (canceled) { @@ -1173,6 +1149,15 @@ class Sheaf { } }) } + + _fractured ({ canceled, key, value }) { + switch (key) { + case 'keephouse': + return this._keephouse(canceled, value) + default: + return this._append(canceled, key, value) + } + } } module.exports = Sheaf