diff --git a/addons/dexie-cloud/src/yjs/downloadYDocsFromServer.ts b/addons/dexie-cloud/src/yjs/downloadYDocsFromServer.ts index cc50ef219..5e430f04a 100644 --- a/addons/dexie-cloud/src/yjs/downloadYDocsFromServer.ts +++ b/addons/dexie-cloud/src/yjs/downloadYDocsFromServer.ts @@ -27,7 +27,11 @@ export async function downloadYDocsFromServer( databaseUrl: string, { yDownloadedRealms, realms }: PersistedSyncState ) { - if (yDownloadedRealms && realms && realms.every(realmId => yDownloadedRealms[realmId] === '*')) { + if ( + yDownloadedRealms && + realms && + realms.every((realmId) => yDownloadedRealms[realmId] === '*') + ) { return; // Already done! } console.debug('Downloading Y.Docs from added realms'); @@ -72,19 +76,21 @@ export async function downloadYDocsFromServer( await yTable.bulkAdd(docsToInsert); docsToInsert = []; } - if (currentRealmId && currentTable && currentProp && (lastDoc || completedRealm)) { - await db.$syncState.update( - 'syncState', - completedRealm + if ( + currentRealmId && + ((currentTable && currentProp && lastDoc) || completedRealm) + ) { + await db.$syncState.update('syncState', (syncState: PersistedSyncState) => { + const yDownloadedRealms = syncState.yDownloadedRealms || {}; + yDownloadedRealms[currentRealmId!] = completedRealm ? '*' : { - [`yDownloadedRealms.${currentRealmId}`]: { - tbl: currentTable, - prop: currentProp, - key: lastDoc.k, - }, - } - ); + tbl: currentTable!, + prop: currentProp!, + key: lastDoc.k!, + }; + syncState.yDownloadedRealms = yDownloadedRealms; + }); } } diff --git a/addons/dexie-cloud/src/yjs/updateYSyncStates.ts b/addons/dexie-cloud/src/yjs/updateYSyncStates.ts index 1c7f5fc54..80c660543 100644 --- a/addons/dexie-cloud/src/yjs/updateYSyncStates.ts +++ b/addons/dexie-cloud/src/yjs/updateYSyncStates.ts @@ -1,6 +1,5 @@ import { DexieCloudDB } from '../db/DexieCloudDB'; import { DEXIE_CLOUD_SYNCER_ID } from '../sync/DEXIE_CLOUD_SYNCER_ID'; -import { YSyncState } from 'dexie'; import { YDexieCloudSyncState } from './YDexieCloudSyncState'; export async function updateYSyncStates( @@ -15,7 +14,7 @@ export async function updateYSyncStates( // we can safely store unsentFrom to a value of the last update + 1 here. // We also want to update receivedUntil for each yTable to the value specified in the second argument, // because that contains the highest resulted id of each update from server after storing it. - // We could do these two tasks separately, but that would require two update calls on the same YSyncState, so + // We could do these two tasks separately, but that would require two update calls on the same YSyncState, so // to optimize the dexie calls, we merge these two maps into a single one so we can do a single update request // per yTable. const mergedSpec: { @@ -27,39 +26,52 @@ export async function updateYSyncStates( mergedSpec[yTable] ??= {}; mergedSpec[yTable].unsentFrom = lastUpdateId + 1; } - for (const [yTable, lastUpdateId] of Object.entries(receivedUntilsAfterSync)) { + for (const [yTable, lastUpdateId] of Object.entries( + receivedUntilsAfterSync + )) { mergedSpec[yTable] ??= {}; mergedSpec[yTable].receivedUntil = lastUpdateId; } - // Now go through the merged map and update YSyncStates accordingly: - for (const [yTable, { unsentFrom, receivedUntil }] of Object.entries( - mergedSpec - )) { + // Now go through all yTables and update their YSyncStates: + const allYTables = Object.values(db.dx._dbSchema) + .filter((tblSchema) => tblSchema.yProps) + .map((tblSchema) => tblSchema.yProps!.map((yProp) => yProp.updatesTable)) + .flat(); + for (const yTable of allYTables) { + const mergedEntry = mergedSpec[yTable]; + const unsentFrom = mergedEntry?.unsentFrom ?? 1; + const receivedUntil = + mergedEntry?.receivedUntil ?? // If not received anything on this table, pick the current last update id + // from local because we are in the same parent transaction (in sync.ts) that + // applied all updates from the server + (( + await db + .table(yTable) + .where('i') + .between(1, Infinity) // Because i might be string DEXIE_CLOUD_SYNCER_ID if not a number. + .reverse() + .limit(1) + .primaryKeys() + )[0] as number) ?? + 0; // We're already in a transaction, but for the sake of // code readability and correctness, let's launch an atomic sub transaction: await db.transaction('rw', yTable, async () => { - const state: YDexieCloudSyncState | undefined = await db.table(yTable).get( - DEXIE_CLOUD_SYNCER_ID - ); + const state: YDexieCloudSyncState | undefined = await db + .table(yTable) + .get(DEXIE_CLOUD_SYNCER_ID); if (!state) { await db.table(yTable).add({ i: DEXIE_CLOUD_SYNCER_ID, - unsentFrom: unsentFrom || 1, - receivedUntil: receivedUntil || 0, + unsentFrom, + receivedUntil, serverRev: serverRevision, }); } else { - if (unsentFrom) { - state.unsentFrom = Math.max(unsentFrom, state.unsentFrom || 1); - } - if (receivedUntil) { - state.receivedUntil = Math.max( - receivedUntil, - state.receivedUntil || 0 - ); - state.serverRev = serverRevision; - } + state.unsentFrom = Math.max(unsentFrom, state.unsentFrom || 1); + state.receivedUntil = Math.max(receivedUntil, state.receivedUntil || 0); + state.serverRev = serverRevision; await db.table(yTable).put(state); } }); diff --git a/src/yjs/compressYDocs.ts b/src/yjs/compressYDocs.ts index 16753000d..b1d354a23 100644 --- a/src/yjs/compressYDocs.ts +++ b/src/yjs/compressYDocs.ts @@ -61,7 +61,12 @@ function compressYDocsTable( if (!stamp) return; // Skip. Already running. const lastCompressedUpdate = stamp.lastCompressed; const unsyncedFrom = Math.min( - ...syncers.map((s) => Math.min(s.unsentFrom || Infinity, s.receivedUntil != null ? s.receivedUntil + 1 : Infinity)) + ...syncers.map((s) => + Math.min( + s.unsentFrom || Infinity, + s.receivedUntil != null ? s.receivedUntil + 1 : Infinity + ) + ) ); // Per updates-table: // 1. Find all updates after lastCompressedId. Run toArray() on them. @@ -74,13 +79,13 @@ function compressYDocsTable( .where('i') .between(lastCompressedUpdate, Infinity, false) .toArray((addedUpdates: YUpdateRow[]) => { - if (addedUpdates.length <= 1) return; // For sure no updates to compress if there would be only 1. + if (addedUpdates.length === 0) return; // No more updates where added const docsToCompress: { docId: any; updates: YUpdateRow[] }[] = []; - let lastUpdateToCompress = lastCompressedUpdate + 1; + let lastUpdateToCompress = lastCompressedUpdate; for (let j = 0; j < addedUpdates.length; ++j) { const updateRow = addedUpdates[j]; const { i, f, k } = updateRow; - if (i >= unsyncedFrom && (f & 0x01)) break; // An update that need to be synced was found. Stop here and let dontCompressFrom stay. + if (i >= unsyncedFrom && f & 0x01) break; // An update that need to be synced was found. Stop here and let dontCompressFrom stay. const entry = docsToCompress.find( (entry) => cmp(entry.docId, k) === 0 ); @@ -88,9 +93,12 @@ function compressYDocsTable( else docsToCompress.push({ docId: k, updates: [updateRow] }); lastUpdateToCompress = i; } + if (lastUpdateToCompress === lastCompressedUpdate) return; // No updates to compress let p = Promise.resolve(); for (const { docId, updates } of docsToCompress) { - p = p.then(() => compressUpdatesForDoc(db, updatesTable, docId, updates)); + p = p.then(() => + compressUpdatesForDoc(db, updatesTable, docId, updates) + ); } return p.then(() => { // Update lastCompressed atomically to the value we computed. @@ -123,16 +131,16 @@ export function compressUpdatesForDoc( return db.transaction('rw', updatesTable, (tx) => { const updTbl = tx.table(updatesTable); return updTbl.where({ k: parentId }).first((mainUpdate: YUpdateRow) => { - const updates = [mainUpdate].concat(addedUpdatesToCompress); // in some situations, mainUpdate will be included twice here. But Y.js doesn't care! + const updates = [mainUpdate].concat(addedUpdatesToCompress.filter(u => u.i !== mainUpdate.i)); // avoid duplicating the main update (can happen sometimes) const Y = getYLibrary(db); const doc = new Y.Doc({ gc: true }); //Y.transact(doc, ()=>{ - updates.forEach((update) => { - //if (cmp(update.k, docRowId) !== 0) { - // throw new Error('Invalid update'); - //} - Y.applyUpdateV2(doc, update.u); - }); + updates.forEach((update) => { + //if (cmp(update.k, docRowId) !== 0) { + // throw new Error('Invalid update'); + //} + Y.applyUpdateV2(doc, update.u); + }); //}, "compressYDocs"); // Don't think anyone could be listening to this local doc. const compressedUpdate = Y.encodeStateAsUpdateV2(doc); const lastUpdate = updates.pop();