Skip to content

Commit

Permalink
fix(sync): prevent race condition by relying on autoincrement
Browse files Browse the repository at this point in the history
Prevent a possible race condition when two clients add steps at the same time.

See #4600.

Rely on the autoincrementing id in order to provide a canonical order
that steps can be retrieved in.

When two clients push steps at the same time
the entries receive destinct ids that increment.
So if another client fetches steps in between
it will see the smaller id as the version of the fetched step
and fetch the other step later on.

Transition:
In the future we can drop the version column entirely
but currently there are still steps stored in the database
that make use of the old column.
So we need to transition away from that.

In order to find entries that are newer than version x
we select those that have both a version and an id larger than x.

Entries of the new format are newer than any entry of the old format.
So we set their version to the largest possible value.
This way they will always fulfill the version condition
and the condition on the id is more strict and therefore effective.

For the old format the version will be smaller than the id
as it's incremented per document while the id is unique accross documents.
Therefore the version condition is the more strict one and effective.

The only scenario where the version might be larger than the id
would be if there's very few documents in the database
and they have had a lot of steps stored in single database entries.

Signed-off-by: Max <[email protected]>
Signed-off-by: Jonas <[email protected]>
  • Loading branch information
max-nextcloud authored and mejo- committed Nov 9, 2023
1 parent 888cbf7 commit f671311
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 24 deletions.
14 changes: 7 additions & 7 deletions cypress/e2e/SessionApi.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ describe('The session Api', function() {
const version = 0
cy.pushSteps({ connection, steps, version })
.its('version')
.should('eql', 1)
.should('be.at.least', 1)
cy.syncSteps(connection)
.its('steps[0].data')
.should('eql', steps)
Expand Down Expand Up @@ -170,7 +170,7 @@ describe('The session Api', function() {
it('saves', function() {
cy.pushSteps({ connection, steps: [messages.update], version })
.its('version')
.should('eql', 1)
.should('be.at.least', 1)
cy.syncSteps(connection, { version: 1, autosaveContent: '# Heading 1', manualSave: true })
cy.downloadFile(filePath)
.its('data')
Expand All @@ -181,7 +181,7 @@ describe('The session Api', function() {
const documentState = 'Base64 encoded string'
cy.pushSteps({ connection, steps: [messages.update], version })
.its('version')
.should('eql', 1)
.should('be.at.least', 1)
cy.syncSteps(connection, {
version: 1,
autosaveContent: '# Heading 1',
Expand Down Expand Up @@ -244,7 +244,7 @@ describe('The session Api', function() {
it('saves public', function() {
cy.pushSteps({ connection, steps: [messages.update], version })
.its('version')
.should('eql', 1)
.should('be.at.least', 1)
cy.syncSteps(connection, { version: 1, autosaveContent: '# Heading 1', manualSave: true })
cy.login(user)
cy.prepareSessionApi()
Expand All @@ -257,7 +257,7 @@ describe('The session Api', function() {
const documentState = 'Base64 encoded string'
cy.pushSteps({ connection, steps: [messages.update], version })
.its('version')
.should('eql', 1)
.should('be.at.least', 1)
cy.syncSteps(connection, {
version: 1,
autosaveContent: '# Heading 1',
Expand Down Expand Up @@ -315,7 +315,7 @@ describe('The session Api', function() {
let joining
cy.pushSteps({ connection, steps: [messages.update], version })
.its('version')
.should('eql', 1)
.should('be.at.least', 1)
cy.createTextSession(undefined, { filePath: '', shareToken })
.then(con => {
joining = con
Expand All @@ -332,7 +332,7 @@ describe('The session Api', function() {
cy.log('Initial user pushes steps')
cy.pushSteps({ connection, steps: [messages.update], version })
.its('version')
.should('eql', 1)
.should('be.at.least', 1)
cy.log('Other user creates session')
cy.createTextSession(undefined, { filePath: '', shareToken })
.then(con => {
Expand Down
16 changes: 14 additions & 2 deletions lib/Db/Step.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@
* @method setDocumentId(int $documentId): void
*/
class Step extends Entity implements JsonSerializable {

/*
* Transition: We now use the auto-incrementing id as the version.
* To ensure that new steps always have a larger version than those that
* used the version field, use the largest possible value for BIGINT.
*/
public const VERSION_STORED_IN_ID = 18446744073709551615;

public $id = null;
protected string $data = '';
protected int $version = 0;
protected int $version = self::VERSION_STORED_IN_ID;
protected int $sessionId = 0;
protected int $documentId = 0;

Expand All @@ -54,10 +63,13 @@ public function jsonSerialize(): array {
if (\json_last_error() !== JSON_ERROR_NONE) {
throw new \InvalidArgumentException('Failed to parse step data');
}
$version = $this->version === self::VERSION_STORED_IN_ID
? $this->id
: $this->getVersion();
return [
'id' => $this->id,
'data' => $jsonData,
'version' => $this->version,
'version' => $version,
'sessionId' => $this->sessionId
];
}
Expand Down
19 changes: 9 additions & 10 deletions lib/Db/StepMapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,16 @@ public function __construct(IDBConnection $db) {
parent::__construct($db, 'text_steps', Step::class);
}

public function find($documentId, $fromVersion, $lastAckedVersion = null) {
public function find($documentId, $fromVersion) {
/* @var $qb IQueryBuilder */
$qb = $this->db->getQueryBuilder();
$qb->select('*')
->from($this->getTableName())
->where($qb->expr()->eq('document_id', $qb->createNamedParameter($documentId)))
->andWhere($qb->expr()->gt('version', $qb->createNamedParameter($fromVersion)));
if ($lastAckedVersion) {
$qb->andWhere($qb->expr()->lte('version', $qb->createNamedParameter($lastAckedVersion)));
}
->andWhere($qb->expr()->gt('version', $qb->createNamedParameter($fromVersion)))
->andWhere($qb->expr()->gt('id', $qb->createNamedParameter($fromVersion)));
$qb
->setMaxResults(1000)
->orderBy('version')
->orderBy('id');

return $this->findEntities($qb);
Expand All @@ -54,19 +51,19 @@ public function find($documentId, $fromVersion, $lastAckedVersion = null) {
public function getLatestVersion($documentId): ?int {
/* @var $qb IQueryBuilder */
$qb = $this->db->getQueryBuilder();
$result = $qb->select('version')
$result = $qb->select('id')
->from($this->getTableName())
->where($qb->expr()->eq('document_id', $qb->createNamedParameter($documentId)))
->setMaxResults(1)
->orderBy('version', 'DESC')
->orderBy('id', 'DESC')
->execute();

$data = $result->fetch();
if ($data === false) {
return null;
}

return $data['version'];
return $data['id'];
}

public function deleteAll($documentId): void {
Expand All @@ -76,11 +73,12 @@ public function deleteAll($documentId): void {
->executeStatement();
}

// not in use right now
public function deleteBeforeVersion($documentId, $version): void {
$qb = $this->db->getQueryBuilder();
$qb->delete($this->getTableName())
->where($qb->expr()->eq('document_id', $qb->createNamedParameter($documentId)))
->andWhere($qb->expr()->lte('version', $qb->createNamedParameter($version)))
->andWhere($qb->expr()->lte('id', $qb->createNamedParameter($version)))
->executeStatement();
}

Expand All @@ -89,6 +87,7 @@ public function deleteAfterVersion($documentId, $version): int {
return $qb->delete($this->getTableName())
->where($qb->expr()->eq('document_id', $qb->createNamedParameter($documentId)))
->andWhere($qb->expr()->gt('version', $qb->createNamedParameter($version)))
->andWhere($qb->expr()->gt('id', $qb->createNamedParameter($version)))
->executeStatement();
}
}
9 changes: 4 additions & 5 deletions lib/Service/DocumentService.php
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,14 @@ private function insertSteps($documentId, $sessionId, $steps, $version): int {
throw new InvalidArgumentException('Failed to encode steps');
}
$stepsVersion = $this->stepMapper->getLatestVersion($document->getId());
$newVersion = $stepsVersion + count($steps);
$this->logger->debug("Adding steps to $documentId: bumping version from $stepsVersion to $newVersion");
$this->cache->set('document-version-' . $document->getId(), $newVersion);
$step = new Step();
$step->setData($stepsJson);
$step->setSessionId($sessionId);
$step->setDocumentId($documentId);
$step->setVersion($newVersion);
$this->stepMapper->insert($step);
$step = $this->stepMapper->insert($step);
$newVersion = $step->getId();
$this->logger->debug("Adding steps to " . $documentId . ": bumping version from $stepsVersion to $newVersion");
$this->cache->set('document-version-' . $documentId, $newVersion);
// TODO write steps to cache for quicker reading
return $newVersion;
} catch (DoesNotExistException $e) {
Expand Down

0 comments on commit f671311

Please sign in to comment.