Skip to content

Commit

Permalink
Use a single Fracture in Sheaf.
Browse files Browse the repository at this point in the history
  • Loading branch information
flatheadmill committed Jan 10, 2021
1 parent 6425628 commit 4cb4a63
Showing 1 changed file with 39 additions and 54 deletions.
93 changes: 39 additions & 54 deletions sheaf.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,28 +168,25 @@ class Sheaf {
this._operationId = 0xffffffff
// Concurrency and work queues. One keyed queue for page writes, the
// other queue will only use a single key for all housekeeping.
this._fracture = {
appender: new Fracture(destructible.durable($ => $(), 'appender'), {
turnstile: options.turnstile,
entry: id => ({
id: this._operationId = (this._operationId + 1 & 0xffffffff) >>> 0,
writes: [],
cartridge: this.pages.hold(id),
future: new Future
}),
worker: this._append.bind(this)
}),
housekeeper: new Fracture(destructible.durable($ => $(), 'housekeeper'), {
turnstile: options.turnstile,
entry: () => ({ candidates: [] }),
worker: this._keephouse.bind(this)
})
}
this._fracture = new Fracture(destructible.durable($ => $(), 'appender'), {
turnstile: options.turnstile,
entry: key => {
switch (key) {
case 'keephouse':
return { candidates: [] }
default:
return {
id: this._operationId = (this._operationId + 1 & 0xffffffff) >>> 0,
writes: [],
cartridge: this.pages.hold(key),
future: new Future
}
}
},
worker: this._fractured.bind(this)
})

this._fracture.housekeeper.deferrable.increment()
this._fracture.appender.deferrable.increment()
this._fracture.appender.name = 'appender'
this._fracture.housekeeper.name = 'housekeeper'
this._fracture.deferrable.increment()

// **TODO** Not yet used, would `mkdir` any pages that need to be
// inspected for balance.
Expand All @@ -211,8 +208,7 @@ class Sheaf {
// **TODO** Really want to just push keys into a file for
// inspection when we reopen for housekeeping.
await this.drain()
this._fracture.housekeeper.deferrable.decrement()
this._fracture.appender.deferrable.decrement()
this._fracture.deferrable.decrement()
if (this._root != null) {
this._root.cartridge.remove()
this._root = null
Expand Down Expand Up @@ -470,7 +466,7 @@ class Sheaf {
// they are written before the split? Must be.

//
async _append ({ canceled, key, value: { writes, cartridge, future } }) {
async _append (canceled, key, { writes, cartridge, future }) {
await this.deferrable.copacetic($ => $(), 'append', null, async () => {
try {
this.deferrable.progress()
Expand All @@ -486,7 +482,7 @@ class Sheaf {
page.items.length <= this.leaf.merge
)
) {
this._fracture.housekeeper.enqueue('housekeeping').candidates.push(page.key || page.items[0].key)
this._fracture.enqueue('keephouse').candidates.push(page.key || page.items[0].key)
}
await this.storage.writeLeaf(page, writes)
} finally {
Expand All @@ -498,35 +494,15 @@ class Sheaf {

append (id, buffer, writes) {
this.deferrable.operational()
const append = this._fracture.appender.enqueue(id)
const append = this._fracture.enqueue(id)
append.writes.push(buffer)
if (writes[append.id] == null) {
writes[append.id] = append.future
}
}

_drain () {
return [
this._fracture.housekeeper.drain(),
this._fracture.appender.drain()
].filter(drain => drain != null)
}

_iteration = 0

drain () {
let drains = this._drain()
if (drains.length != 0) {
return (async () => {
do {
for (const promise of drains) {
await promise
}
drains = this._drain()
} while (drains.length != 0)
}) ()
}
return null
return this._fracture.drain()
}

recordify (header, parts = []) {
Expand Down Expand Up @@ -720,8 +696,8 @@ class Sheaf {
// Create our journaled tree alterations.
const pauses = []
try {
pauses.push(await this._fracture.appender.pause(left.page.id))
pauses.push(await this._fracture.appender.pause(right.page.id))
pauses.push(await this._fracture.pause(left.page.id))
pauses.push(await this._fracture.pause(right.page.id))
// Race is the wrong word, it's our synchronous time. We have to split
// the page and then write them out. Anyone writing to this leaf has to
// to be able to see the split so that they surrender their cursor if
Expand Down Expand Up @@ -784,7 +760,7 @@ class Sheaf {
page.items.length >= this.leaf.split &&
this.comparator.branch(page.items[0].key, page.items[page.items.length - 1].key) != 0
) {
this._fracture.housekeeper.enqueue('housekeeping').candidates.push(page.key || page.items[0].key)
this._fracture.enqueue('keephouse').candidates.push(page.key || page.items[0].key)
}
}
//
Expand Down Expand Up @@ -1054,8 +1030,8 @@ class Sheaf {

const pauses = []
try {
pauses.push(await this._fracture.appender.pause(left.page.id))
pauses.push(await this._fracture.appender.pause(right.page.id))
pauses.push(await this._fracture.pause(left.page.id))
pauses.push(await this._fracture.pause(right.page.id))

// Add the items in the right page to the end of the left page.
const items = left.page.items
Expand All @@ -1079,7 +1055,7 @@ class Sheaf {

// See if the merged page needs to split or merge further.
if (this._isDirty(left.page, this.leaf)) {
this._fracture.housekeeper.enqueue('housekeeping').candidates.push(left.entry.value.items[0].key)
this._fracture.enqueue('keephouse').candidates.push(left.entry.value.items[0].key)
}

// Replace the key of the pivot if necessary.
Expand Down Expand Up @@ -1144,7 +1120,7 @@ class Sheaf {
// of the `merged` files. If not we can delete the merged files.

//
async _keephouse ({ canceled, value: { candidates } }) {
async _keephouse (canceled, { candidates }) {
await this.deferrable.copacetic($ => $(), 'append', null, async () => {
this.deferrable.progress()
if (canceled) {
Expand Down Expand Up @@ -1173,6 +1149,15 @@ class Sheaf {
}
})
}

_fractured ({ canceled, key, value }) {
switch (key) {
case 'keephouse':
return this._keephouse(canceled, value)
default:
return this._append(canceled, key, value)
}
}
}

module.exports = Sheaf

0 comments on commit 4cb4a63

Please sign in to comment.