Skip to content

Commit

Permalink
Handle offline entities with baseVersion, trunkVersion, and branchId (#…
Browse files Browse the repository at this point in the history
…1154)

* Parse trunkVersion and branchId from entity submissions

* Adding submission backlog, check for held submission following this one

* Improving out of order processing for complex branches

* Problem case in a test

* Improve logic to process following submission

* Make branchId, branchBaseVersion, trunkVersion readable thru api

* small changes to backlog schema based on feedback

* Fixing more edge cases

* unit tests about parsing branch and trunk values from submissions

* More tests

* Refactor offline entity tests to use more api

* Factor out baseVersion computation

* Updates from final code review
  • Loading branch information
ktuite authored Jun 24, 2024
1 parent 56c2ca9 commit cc0ac3d
Show file tree
Hide file tree
Showing 9 changed files with 937 additions and 25 deletions.
35 changes: 33 additions & 2 deletions lib/data/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ const odataToColumnMap = new Map([
['__system/conflict', 'entities.conflict']
]);

const _uuidPattern = /^(uuid:)?([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$/i;

////////////////////////////////////////////////////////////////////////////
// ENTITY PARSING

const normalizeUuid = (id) => {
if (!id || id.trim() === '')
throw Problem.user.missingParameter({ field: 'uuid' });

const uuidPattern = /^(uuid:)?([0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$/i;
const matches = uuidPattern.exec(id);
const matches = _uuidPattern.exec(id);
if (matches == null) throw Problem.user.invalidDataTypeOfParameter({ field: 'uuid', expected: 'valid version 4 UUID' });
return matches[2].toLowerCase();
};
Expand Down Expand Up @@ -66,6 +67,32 @@ const extractBaseVersionFromSubmission = (entity) => {
}
};

const extractBranchIdFromSubmission = (entity) => {
const { branchId } = entity.system;
if (branchId === '' || branchId == null)
return null;

const matches = _uuidPattern.exec(branchId);
if (matches == null) throw Problem.user.invalidDataTypeOfParameter({ field: 'branchId', expected: 'valid version 4 UUID' });
return matches[2].toLowerCase();
};

const extractTrunkVersionFromSubmission = (entity) => {
const { trunkVersion } = entity.system;
if (trunkVersion) {
// branchId must be present with trunk version
const branchId = extractBranchIdFromSubmission(entity);
if (!branchId)
throw Problem.user.missingParameter({ field: 'branchId' });

if (!/^\d+$/.test(trunkVersion))
throw Problem.user.invalidDataTypeOfParameter({ field: 'trunkVersion', expected: 'integer' });

return parseInt(entity.system.trunkVersion, 10);
}
return null;
};

// This works similarly to processing submissions for export, but also note:
// 1. this is expecting the entityFields to be filled in with propertyName attributes
// 2. the "meta/entity" structural field should be included to get necessary
Expand All @@ -81,6 +108,8 @@ const parseSubmissionXml = (entityFields, xml) => new Promise((resolve, reject)
entity.system.create = field.attrs.create;
entity.system.update = field.attrs.update;
entity.system.baseVersion = field.attrs.baseVersion;
entity.system.trunkVersion = field.attrs.trunkVersion;
entity.system.branchId = field.attrs.branchId;
} else if (field.path.indexOf('/meta/entity') === 0)
entity.system[field.name] = text;
else if (field.propertyName != null)
Expand Down Expand Up @@ -451,6 +480,8 @@ module.exports = {
normalizeUuid,
extractLabelFromSubmission,
extractBaseVersionFromSubmission,
extractTrunkVersionFromSubmission,
extractBranchIdFromSubmission,
extractBulkSource,
streamEntityCsv, streamEntityCsvAttachment,
streamEntityOdata, odataToColumnMap,
Expand Down
16 changes: 13 additions & 3 deletions lib/model/frames/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
/* eslint-disable no-multi-spaces */

const { embedded, fieldTypes, Frame, readable, table } = require('../frame');
const { extractEntity, normalizeUuid, extractLabelFromSubmission, extractBaseVersionFromSubmission } = require('../../data/entity');
const { extractEntity, normalizeUuid,
extractLabelFromSubmission, extractBaseVersionFromSubmission,
extractTrunkVersionFromSubmission, extractBranchIdFromSubmission } = require('../../data/entity');

class Entity extends Frame.define(
table('entities', 'entity'),
Expand Down Expand Up @@ -38,13 +40,17 @@ class Entity extends Frame.define(
const uuid = normalizeUuid(entityData.system.id);
const label = extractLabelFromSubmission(entityData, options);
const baseVersion = extractBaseVersionFromSubmission(entityData);
const branchId = extractBranchIdFromSubmission(entityData);
const trunkVersion = extractTrunkVersionFromSubmission(entityData);
const dataReceived = { ...data, ...(label && { label }) };
return new Entity.Partial({ uuid }, {
def: new Entity.Def({
data,
dataReceived,
...(label && { label }), // add label only if it's there
...(baseVersion && { baseVersion }), // add baseVersion only if it's there
...(label && { label }) // add label only if it's there
...(trunkVersion && { trunkVersion }), // add trunkVersion only if it's there
...(branchId && { branchId }), // add branchId only if it's there
}),
dataset
});
Expand Down Expand Up @@ -88,6 +94,9 @@ Entity.Def = Frame.define(
'version', readable, 'baseVersion', readable,
'dataReceived', readable, 'conflictingProperties', readable,
'createdAt', readable,
'branchId', readable,
'trunkVersion', readable,
'branchBaseVersion', readable,
embedded('creator'),
embedded('source'),
fieldTypes([
Expand All @@ -98,7 +107,8 @@ Entity.Def = Frame.define(
'jsonb', 'bool',
'int4', 'int4',
'jsonb', 'jsonb',
'timestamptz'
'timestamptz',
'uuid', 'int4', 'int4',
])
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const up = async (db) => {
await db.raw(`ALTER TABLE entity_defs
ADD COLUMN "branchId" UUID,
ADD COLUMN "trunkVersion" INT4,
ADD COLUMN "branchBaseVersion" INT4`);
};

const down = (db) => db.raw(`ALTER TABLE entity_defs
DROP COLUMN "branchId",
DROP COLUMN "trunkVersion",
DROP COLUMN "branchBaseVersion"
`);

module.exports = { up, down };
31 changes: 31 additions & 0 deletions lib/model/migrations/20240607-02-add-submission-backlog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const up = async (db) => {
await db.raw(`CREATE TABLE entity_submission_backlog (
"submissionId" INT4 NOT NULL,
"submissionDefId" INT4 NOT NULL,
"branchId" UUID NOT NULL,
"branchBaseVersion" INT4 NOT NULL,
"loggedAt" TIMESTAMPTZ(3) NOT NULL,
CONSTRAINT fk_submission_defs
FOREIGN KEY("submissionDefId")
REFERENCES submission_defs(id)
ON DELETE CASCADE,
CONSTRAINT fk_submissions
FOREIGN KEY("submissionId")
REFERENCES submissions(id)
ON DELETE CASCADE,
UNIQUE ("branchId", "branchBaseVersion")
)`);
};

const down = (db) => db.raw('DROP TABLE entity_submission_backlog');

module.exports = { up, down };
117 changes: 101 additions & 16 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ const _defInsert = (id, root, creatorId, userAgent, partial, version, sourceId =
"sourceId", "creatorId", "userAgent",
"label", "data", "dataReceived",
"version", "baseVersion",
"trunkVersion", "branchId", "branchBaseVersion",
"conflictingProperties")
values (${id}, clock_timestamp(),
${root}, true,
${sourceId}, ${creatorId}, ${userAgent},
${partial.def.label}, ${json}, ${dataReceived},
${version}, ${baseVersion},
${partial.def.trunkVersion ?? null}, ${partial.def.branchId ?? null}, ${partial.def.branchBaseVersion ?? null},
${conflictingProperties})
returning *`;
};
Expand Down Expand Up @@ -80,11 +82,12 @@ ins as (insert into entities (id, "datasetId", "uuid", "createdAt", "creatorId")
select def."entityId", ${dataset.id}, ${partial.uuid}, def."createdAt", ${creatorId} from def
returning entities.*)
select ins.*, def.id as "entityDefId" from ins, def;`)
.then(({ entityDefId, ...entityData }) => // TODO/HACK: reassemble just enough to log audit event
.then(({ entityDefId, ...entityData }) => // TODO/HACK: starting to need more reassembling
new Entity(entityData, {
currentVersion: new Entity.Def({
id: entityDefId,
entityId: entityData.id
entityId: entityData.id,
branchId: partial.def.branchId
})
}));
};
Expand Down Expand Up @@ -193,6 +196,39 @@ SELECT actions
FROM dataset_form_defs
WHERE "datasetId" = ${datasetId} AND "formDefId" = ${formDefId}`);

const _holdSubmission = (run, submissionId, submissionDefId, branchId, branchBaseVersion) => run(sql`
INSERT INTO entity_submission_backlog ("submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);

const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
DELETE FROM entity_submission_backlog
WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}
RETURNING *`);

// Used by _updateVerison below to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = async (maybeOne, run, dataset, clientEntity, submissionDef) => {
if (!clientEntity.def.trunkVersion || clientEntity.def.baseVersion === clientEntity.def.trunkVersion) {
// trunk and client baseVersion are the same, indicating the start of a batch
return clientEntity.def.baseVersion;
} else {
const condition = { datasetId: dataset.id, uuid: clientEntity.uuid,
branchId: clientEntity.def.branchId,
branchBaseVersion: clientEntity.def.baseVersion - 1 };

// eslint-disable-next-line no-use-before-define
const previousInBranch = (await _getDef(maybeOne, new QueryOptions({ condition })));
if (!previousInBranch.isDefined()) {
// not ready to process this submission. eventually hold it for later.
await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
} else {
return previousInBranch.get().version;
}
}
};

const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent) => async ({ Audits, Entities }) => {
// If dataset requires approval on submission to create an entity and this event is not
// an approval event, then don't create an entity
Expand All @@ -206,36 +242,64 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
const sourceId = await Entities.createSource(sourceDetails, submissionDefId, event.id);
const entity = await Entities.createNew(dataset, partial, submissionDef, sourceId);

return Audits.log({ id: event.actorId }, 'entity.create', { acteeId: dataset.acteeId },
await Audits.log({ id: event.actorId }, 'entity.create', { acteeId: dataset.acteeId },
{
entityId: entity.id, // Added in v2023.3 and backfilled
entityDefId: entity.aux.currentVersion.id, // Added in v2023.3 and backfilled
entity: { uuid: entity.uuid, dataset: dataset.name },
submissionId,
submissionDefId
});
return entity;
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne }) => {
if (!(event.action === 'submission.create' || event.action === 'submission.update.version'))
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities, maybeOne, run }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
return null;

// Get client version of entity
const clientEntity = await Entity.fromParseEntityData(entityData, { update: true }); // validation happens here

// Get version of entity on the server
const serverEntity = (await Entities.getById(dataset.id, clientEntity.uuid, QueryOptions.forUpdate))
.orThrow(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name }));
// If the entity doesn't exist, check branchId - maybe this is an update for an entity created offline
let serverEntity = await Entities.getById(dataset.id, clientEntity.uuid, QueryOptions.forUpdate);
if (!serverEntity.isDefined()) {
if (clientEntity.def.branchId == null) {
throw Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name });
} else {
await _holdSubmission(run, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
} else {
serverEntity = serverEntity.get();
}

// If the trunk version exists but is higher than current server version,
// that is a weird case that should not be processed OR held, and should log an error.
if (clientEntity.def.trunkVersion && clientEntity.def.trunkVersion > serverEntity.aux.currentVersion.version) {
throw Problem.user.entityVersionNotFound({ baseVersion: `trunkVersion=${clientEntity.def.trunkVersion}`, entityUuid: clientEntity.uuid, datasetName: dataset.name });
}

let { conflict } = serverEntity;
let conflictingProperties; // Maybe we don't need to persist this??? just compute at the read time

if (clientEntity.def.baseVersion !== serverEntity.aux.currentVersion.version) {
// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that
// offline context and we need to translate it to the correct base version within Central.
const baseVersion = await _computeBaseVersion(maybeOne, run, dataset, clientEntity, submissionDef);

// If baseVersion is null, we held a submission and will stop processing now.
if (baseVersion == null)
return null;

const condition = { datasetId: dataset.id, uuid: clientEntity.uuid, version: clientEntity.def.baseVersion };
if (baseVersion !== serverEntity.aux.currentVersion.version) {

const condition = { datasetId: dataset.id, uuid: clientEntity.uuid, version: baseVersion };
// eslint-disable-next-line no-use-before-define
const baseEntityVersion = (await _getDef(maybeOne, new QueryOptions({ condition })))
.orThrow(Problem.user.entityVersionNotFound({ baseVersion: clientEntity.def.baseVersion, entityUuid: clientEntity.uuid, datasetName: dataset.name }));
.orThrow(Problem.user.entityVersionNotFound({ baseVersion, entityUuid: clientEntity.uuid, datasetName: dataset.name }));

// we need to find what changed between baseVersion and lastVersion
// it is not the data we received in lastVersion
Expand Down Expand Up @@ -265,23 +329,30 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
data: mergedData,
label: mergedLabel,
dataReceived: clientEntity.def.dataReceived,
branchId: clientEntity.def.branchId,
trunkVersion: clientEntity.def.trunkVersion,
branchBaseVersion: (clientEntity.def.branchId != null) ? clientEntity.def.baseVersion : null,
conflictingProperties
})
});

const entity = await Entities.createVersion(dataset, partial, submissionDef, serverEntity.aux.currentVersion.version + 1, sourceId, clientEntity.def.baseVersion);
return Audits.log({ id: event.actorId }, 'entity.update.version', { acteeId: dataset.acteeId },
// Assign new version (increment latest server version)
const version = serverEntity.aux.currentVersion.version + 1;

const entity = await Entities.createVersion(dataset, partial, submissionDef, version, sourceId, baseVersion);
await Audits.log({ id: event.actorId }, 'entity.update.version', { acteeId: dataset.acteeId },
{
entityId: entity.id,
entityDefId: entity.aux.currentVersion.id,
entity: { uuid: entity.uuid, dataset: dataset.name },
submissionId,
submissionDefId
});
return entity;
};

// Entrypoint to where submissions (a specific version) become entities
const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, oneFirst }) => {
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, maybeOne, oneFirst }) => {
const { submissionId, submissionDefId } = event.details;

const form = await Forms.getByActeeId(event.acteeId);
Expand Down Expand Up @@ -334,19 +405,33 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entit
throw Problem.user.entityActionNotPermitted({ action, permitted: permittedActions });
}

let maybeEntity = null;
// Try update before create (if both are specified)
if (entityData.system.update === '1' || entityData.system.update === 'true')
try {
await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
} catch (err) {
if ((err.problemCode === 404.8) && (entityData.system.create === '1' || entityData.system.create === 'true')) {
await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
} else {
throw (err);
}
}
else if (entityData.system.create === '1' || entityData.system.create === 'true')
return Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);

// Check for held submissions that follow this one in the same branch
if (maybeEntity != null && maybeEntity.aux.currentVersion.branchId != null) {
const { branchId, branchBaseVersion } = maybeEntity.aux.currentVersion;
// branchBaseVersion could be undefined if handling an offline create
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await _checkHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
if (nextSub.isDefined()) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get();
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
{ submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId });
}
}

return null;
};
Expand Down
1 change: 1 addition & 0 deletions lib/worker/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const jobs = {
'submission.update.version': [ require('./submission').submissionUpdateVersion, require('./entity').createOrUpdateEntityFromSubmission ],

'submission.update': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],

'form.create': [ require('./form').create ],
'form.update.draft.set': [ require('./form').updateDraftSet ],
Expand Down
Loading

0 comments on commit cc0ac3d

Please sign in to comment.