Skip to content

Commit

Permalink
Bugfixes after testing:
Browse files Browse the repository at this point in the history
* Downloading of y documents from dexie-cloud didn't work properly
* Didn't create dexie-cloud-syncer on updates table until after a long time
* Garbage collecting yDocs didn't work properly when dexie-cloud addon had been syncing.
  • Loading branch information
David Fahlander committed Oct 9, 2024
1 parent 76b4dc9 commit f38451e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 46 deletions.
30 changes: 18 additions & 12 deletions addons/dexie-cloud/src/yjs/downloadYDocsFromServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ export async function downloadYDocsFromServer(
databaseUrl: string,
{ yDownloadedRealms, realms }: PersistedSyncState
) {
if (yDownloadedRealms && realms && realms.every(realmId => yDownloadedRealms[realmId] === '*')) {
if (
yDownloadedRealms &&
realms &&
realms.every((realmId) => yDownloadedRealms[realmId] === '*')
) {
return; // Already done!
}
console.debug('Downloading Y.Docs from added realms');
Expand Down Expand Up @@ -72,19 +76,21 @@ export async function downloadYDocsFromServer(
await yTable.bulkAdd(docsToInsert);
docsToInsert = [];
}
if (currentRealmId && currentTable && currentProp && (lastDoc || completedRealm)) {
await db.$syncState.update(
'syncState',
completedRealm
if (
currentRealmId &&
((currentTable && currentProp && lastDoc) || completedRealm)
) {
await db.$syncState.update('syncState', (syncState: PersistedSyncState) => {
const yDownloadedRealms = syncState.yDownloadedRealms || {};
yDownloadedRealms[currentRealmId!] = completedRealm
? '*'
: {
[`yDownloadedRealms.${currentRealmId}`]: {
tbl: currentTable,
prop: currentProp,
key: lastDoc.k,
},
}
);
tbl: currentTable!,
prop: currentProp!,
key: lastDoc.k!,
};
syncState.yDownloadedRealms = yDownloadedRealms;
});
}
}

Expand Down
56 changes: 34 additions & 22 deletions addons/dexie-cloud/src/yjs/updateYSyncStates.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { DexieCloudDB } from '../db/DexieCloudDB';
import { DEXIE_CLOUD_SYNCER_ID } from '../sync/DEXIE_CLOUD_SYNCER_ID';
import { YSyncState } from 'dexie';
import { YDexieCloudSyncState } from './YDexieCloudSyncState';

export async function updateYSyncStates(
Expand All @@ -15,7 +14,7 @@ export async function updateYSyncStates(
// we can safely store unsentFrom to a value of the last update + 1 here.
// We also want to update receivedUntil for each yTable to the value specified in the second argument,
// because that contains the highest resulted id of each update from server after storing it.
// We could do these two tasks separately, but that would require two update calls on the same YSyncState, so
// We could do these two tasks separately, but that would require two update calls on the same YSyncState, so
// to optimize the dexie calls, we merge these two maps into a single one so we can do a single update request
// per yTable.
const mergedSpec: {
Expand All @@ -27,39 +26,52 @@ export async function updateYSyncStates(
mergedSpec[yTable] ??= {};
mergedSpec[yTable].unsentFrom = lastUpdateId + 1;
}
for (const [yTable, lastUpdateId] of Object.entries(receivedUntilsAfterSync)) {
for (const [yTable, lastUpdateId] of Object.entries(
receivedUntilsAfterSync
)) {
mergedSpec[yTable] ??= {};
mergedSpec[yTable].receivedUntil = lastUpdateId;
}

// Now go through the merged map and update YSyncStates accordingly:
for (const [yTable, { unsentFrom, receivedUntil }] of Object.entries(
mergedSpec
)) {
// Now go through all yTables and update their YSyncStates:
const allYTables = Object.values(db.dx._dbSchema)
.filter((tblSchema) => tblSchema.yProps)
.map((tblSchema) => tblSchema.yProps!.map((yProp) => yProp.updatesTable))
.flat();
for (const yTable of allYTables) {
const mergedEntry = mergedSpec[yTable];
const unsentFrom = mergedEntry?.unsentFrom ?? 1;
const receivedUntil =
mergedEntry?.receivedUntil ?? // If not received anything on this table, pick the current last update id
// from local because we are in the same parent transaction (in sync.ts) that
// applied all updates from the server
((
await db
.table(yTable)
.where('i')
.between(1, Infinity) // Because i might be string DEXIE_CLOUD_SYNCER_ID if not a number.
.reverse()
.limit(1)
.primaryKeys()
)[0] as number) ??
0;
// We're already in a transaction, but for the sake of
// code readability and correctness, let's launch an atomic sub transaction:
await db.transaction('rw', yTable, async () => {
const state: YDexieCloudSyncState | undefined = await db.table(yTable).get(
DEXIE_CLOUD_SYNCER_ID
);
const state: YDexieCloudSyncState | undefined = await db
.table(yTable)
.get(DEXIE_CLOUD_SYNCER_ID);
if (!state) {
await db.table<YDexieCloudSyncState>(yTable).add({
i: DEXIE_CLOUD_SYNCER_ID,
unsentFrom: unsentFrom || 1,
receivedUntil: receivedUntil || 0,
unsentFrom,
receivedUntil,
serverRev: serverRevision,
});
} else {
if (unsentFrom) {
state.unsentFrom = Math.max(unsentFrom, state.unsentFrom || 1);
}
if (receivedUntil) {
state.receivedUntil = Math.max(
receivedUntil,
state.receivedUntil || 0
);
state.serverRev = serverRevision;
}
state.unsentFrom = Math.max(unsentFrom, state.unsentFrom || 1);
state.receivedUntil = Math.max(receivedUntil, state.receivedUntil || 0);
state.serverRev = serverRevision;
await db.table(yTable).put(state);
}
});
Expand Down
32 changes: 20 additions & 12 deletions src/yjs/compressYDocs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ function compressYDocsTable(
if (!stamp) return; // Skip. Already running.
const lastCompressedUpdate = stamp.lastCompressed;
const unsyncedFrom = Math.min(
...syncers.map((s) => Math.min(s.unsentFrom || Infinity, s.receivedUntil != null ? s.receivedUntil + 1 : Infinity))
...syncers.map((s) =>
Math.min(
s.unsentFrom || Infinity,
s.receivedUntil != null ? s.receivedUntil + 1 : Infinity
)
)
);
// Per updates-table:
// 1. Find all updates after lastCompressedId. Run toArray() on them.
Expand All @@ -74,23 +79,26 @@ function compressYDocsTable(
.where('i')
.between(lastCompressedUpdate, Infinity, false)
.toArray((addedUpdates: YUpdateRow[]) => {
if (addedUpdates.length <= 1) return; // For sure no updates to compress if there would be only 1.
if (addedUpdates.length === 0) return; // No more updates where added
const docsToCompress: { docId: any; updates: YUpdateRow[] }[] = [];
let lastUpdateToCompress = lastCompressedUpdate + 1;
let lastUpdateToCompress = lastCompressedUpdate;
for (let j = 0; j < addedUpdates.length; ++j) {
const updateRow = addedUpdates[j];
const { i, f, k } = updateRow;
if (i >= unsyncedFrom && (f & 0x01)) break; // An update that need to be synced was found. Stop here and let dontCompressFrom stay.
if (i >= unsyncedFrom && f & 0x01) break; // An update that need to be synced was found. Stop here and let dontCompressFrom stay.
const entry = docsToCompress.find(
(entry) => cmp(entry.docId, k) === 0
);
if (entry) entry.updates.push(updateRow);
else docsToCompress.push({ docId: k, updates: [updateRow] });
lastUpdateToCompress = i;
}
if (lastUpdateToCompress === lastCompressedUpdate) return; // No updates to compress
let p = Promise.resolve();
for (const { docId, updates } of docsToCompress) {
p = p.then(() => compressUpdatesForDoc(db, updatesTable, docId, updates));
p = p.then(() =>
compressUpdatesForDoc(db, updatesTable, docId, updates)
);
}
return p.then(() => {
// Update lastCompressed atomically to the value we computed.
Expand Down Expand Up @@ -123,16 +131,16 @@ export function compressUpdatesForDoc(
return db.transaction('rw', updatesTable, (tx) => {
const updTbl = tx.table(updatesTable);
return updTbl.where({ k: parentId }).first((mainUpdate: YUpdateRow) => {
const updates = [mainUpdate].concat(addedUpdatesToCompress); // in some situations, mainUpdate will be included twice here. But Y.js doesn't care!
const updates = [mainUpdate].concat(addedUpdatesToCompress.filter(u => u.i !== mainUpdate.i)); // avoid duplicating the main update (can happen sometimes)
const Y = getYLibrary(db);
const doc = new Y.Doc({ gc: true });
//Y.transact(doc, ()=>{
updates.forEach((update) => {
//if (cmp(update.k, docRowId) !== 0) {
// throw new Error('Invalid update');
//}
Y.applyUpdateV2(doc, update.u);
});
updates.forEach((update) => {
//if (cmp(update.k, docRowId) !== 0) {
// throw new Error('Invalid update');
//}
Y.applyUpdateV2(doc, update.u);
});
//}, "compressYDocs"); // Don't think anyone could be listening to this local doc.
const compressedUpdate = Y.encodeStateAsUpdateV2(doc);
const lastUpdate = updates.pop();
Expand Down

0 comments on commit f38451e

Please sign in to comment.