Skip to content

Commit

Permalink
Improvements to offline entity backlog processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Aug 14, 2024
1 parent 748054d commit 231fe93
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 59 deletions.
131 changes: 76 additions & 55 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const { odataFilter, odataOrderBy } = require('../../data/odata-filter');
const { odataToColumnMap, parseSubmissionXml, getDiffProp, ConflictType } = require('../../data/entity');
const { isTrue } = require('../../util/http');
const Problem = require('../../util/problem');
const { runSequentially } = require('../../util/promise');
const { runSequentially, getOrReject } = require('../../util/promise');


/////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -196,10 +196,6 @@ SELECT actions
FROM dataset_form_defs
WHERE "datasetId" = ${datasetId} AND "formDefId" = ${formDefId}`);

const _holdSubmission = (run, eventId, submissionId, submissionDefId, branchId, branchBaseVersion) => run(sql`
INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);

const _checkHeldSubmission = (maybeOne, submissionId) => maybeOne(sql`
SELECT * FROM entity_submission_backlog
Expand All @@ -216,42 +212,71 @@ const _getHeldSubmissionsAsEvents = () => ({ all }) => all(sql`
ORDER BY "branchId", "branchBaseVersion"`)
.then(map(construct(Audit)));

const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql`
DELETE FROM entity_submission_backlog
WHERE "auditId"=${eventId}`);

// Used by _computeBaseVersion below to hold submissions that are not yet ready to be processed
const _holdSubmission = (eventId, submissionId, submissionDefId, branchId, branchBaseVersion) => async ({ run }) => run(sql`
INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);

// Used by _updateVerison below to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = async (maybeOne, run, eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing = false) => {
if (forceOutOfOrderProcessing) {
// we are forcefully applying an update of this submission
// and the correct base version does not exist in the database.
// we will be using trunk version instead
// but also removing the held submission from the backlog
await _checkAndDeleteHeldSubmission(maybeOne, clientEntity.def.branchId, clientEntity.def.baseVersion);
}
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing) => async ({ Entities }) => {
if (!clientEntity.def.branchId) {

// no offline branching to deal with, use baseVersion as is
const condition = { version: clientEntity.def.baseVersion };
const maybeDef = await Entities.getDef(dataset.id, clientEntity.uuid, new QueryOptions({ condition }));

if (!maybeDef.isDefined()) {
// If the def doesn't exist, check if the version doesnt exist or the whole entity doesnt exist
// There are different problems for each case
const maybeEntity = await Entities.getById(dataset.id, clientEntity.uuid);
if (maybeEntity.isDefined())
throw Problem.user.entityVersionNotFound({ baseVersion: clientEntity.def.baseVersion, entityUuid: clientEntity.uuid, datasetName: dataset.name });
else
throw Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name });
}

return maybeDef.get();

if (!clientEntity.def.trunkVersion || clientEntity.def.baseVersion === clientEntity.def.trunkVersion) {
// trunk and client baseVersion are the same, indicating the start of a batch
return clientEntity.def.baseVersion;
} else {
const condition = { datasetId: dataset.id, uuid: clientEntity.uuid,
branchId: clientEntity.def.branchId,
branchBaseVersion: clientEntity.def.baseVersion - 1 };
// there is a branchId, look up the appropriate base def

let condition;
if (clientEntity.def.baseVersion === 1) // Special case
condition = { version: 1 };
else if (clientEntity.def.baseVersion === clientEntity.def.trunkVersion) // Start of an offline branch
condition = { version: clientEntity.def.baseVersion };
else // middle of an offline branch
condition = { branchBaseVersion: clientEntity.def.baseVersion - 1 };

// eslint-disable-next-line no-use-before-define
const previousInBranch = (await _getDef(maybeOne, new QueryOptions({ condition })));
const baseEntityVersion = await Entities.getDef(dataset.id, clientEntity.uuid, new QueryOptions({ condition }));

if (!previousInBranch.isDefined()) {
if (!baseEntityVersion.isDefined()) {
if (forceOutOfOrderProcessing) {
return clientEntity.def.trunkVersion;
// if the base version doesn't exist but we forcing the update anyway,
// use the latest version on the server as the base.
// TODO: if that can't be found, maybe we're in the magic apply update as create case.
const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid)
.then(getOrReject(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name })));
return latestEntity.aux.currentVersion;
} else {
// not ready to process this submission. eventually hold it for later.
await _holdSubmission(run, eventId, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
// In this case, we are not ready to process this submission and it will be held in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
}

return previousInBranch.get().version;
// Return the base entity version
return baseEntityVersion.get();
}
};


const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
// If dataset requires approval on submission to create an entity and this event is not
// an approval event, then don't create an entity
Expand Down Expand Up @@ -280,7 +305,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
return entity;
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing = false) => async ({ Audits, Entities, maybeOne, run }) => {
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
Expand All @@ -289,16 +314,20 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
// Get client version of entity
const clientEntity = await Entity.fromParseEntityData(entityData, { update: true }); // validation happens here

// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that offline context.
const baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing);

// If baseEntityVersion is null, we held a submission and will stop processing now.
if (baseEntityDef == null)
return null;

// Get version of entity on the server
// If the entity doesn't exist, check branchId - maybe this is an update for an entity created offline
let serverEntity = await Entities.getById(dataset.id, clientEntity.uuid, QueryOptions.forUpdate);
if (!serverEntity.isDefined()) {
if (clientEntity.def.branchId == null || forceOutOfOrderProcessing) {
throw Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name });
} else {
await _holdSubmission(run, event.id, submissionDef.submissionId, submissionDef.id, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
// We probably never get into this case because computeBaseVersion also checks for the existence of the entity
// and returns an entity def associated with a top level entity.
throw Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name });
} else {
serverEntity = serverEntity.get();
}
Expand All @@ -312,27 +341,12 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
let { conflict } = serverEntity;
let conflictingProperties; // Maybe we don't need to persist this??? just compute at the read time

// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that
// offline context and we need to translate it to the correct base version within Central.
const baseVersion = await _computeBaseVersion(maybeOne, run, event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing);

// If baseVersion is null, we held a submission and will stop processing now.
if (baseVersion == null)
return null;

if (baseVersion !== serverEntity.aux.currentVersion.version && !forceOutOfOrderProcessing) {

const condition = { datasetId: dataset.id, uuid: clientEntity.uuid, version: baseVersion };
// eslint-disable-next-line no-use-before-define
const baseEntityVersion = (await _getDef(maybeOne, new QueryOptions({ condition })))
.orThrow(Problem.user.entityVersionNotFound({ baseVersion, entityUuid: clientEntity.uuid, datasetName: dataset.name }));

if (baseEntityDef.version !== serverEntity.aux.currentVersion.version) {
// we need to find what changed between baseVersion and lastVersion
// it is not the data we received in lastVersion
const serverVersionDiff = getDiffProp(serverEntity.aux.currentVersion.data, baseEntityVersion.data);
const serverVersionDiff = getDiffProp(serverEntity.aux.currentVersion.data, baseEntityDef.data);
const serverDiffData = pickAll(serverVersionDiff, serverEntity.aux.currentVersion.data);
if (serverEntity.aux.currentVersion.label !== baseEntityVersion.label)
if (serverEntity.aux.currentVersion.label !== baseEntityDef.label)
serverDiffData.label = serverEntity.aux.currentVersion.label;

conflictingProperties = Object.keys(clientEntity.def.dataReceived).filter(key => key in serverDiffData && clientEntity.def.dataReceived[key] !== serverDiffData[key]);
Expand Down Expand Up @@ -366,7 +380,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
// Assign new version (increment latest server version)
const version = serverEntity.aux.currentVersion.version + 1;

const entity = await Entities.createVersion(dataset, partial, submissionDef, version, sourceId, baseVersion);
const entity = await Entities.createVersion(dataset, partial, submissionDef, version, sourceId, baseEntityDef.version);
await Audits.log({ id: event.actorId }, 'entity.update.version', { acteeId: dataset.acteeId },
{
entityId: entity.id,
Expand Down Expand Up @@ -498,7 +512,10 @@ const createEntitiesFromPendingSubmissions = (submissionEvents, parentEvent) =>
const processHeldSubmissions = () => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents();
return runSequentially(events.map(event =>
() => processSubmissionEvent(event, { details: { force: true } })(container)));
async () => {
await container.Entities._deleteHeldSubmissionByEventId(event.id);
return processSubmissionEvent(event, { details: { force: true } })(container);
}));
};


Expand Down Expand Up @@ -575,6 +592,9 @@ const _getDef = extender(Entity.Def, Entity.Def.Source)(Actor.into('creator'))((
order by entity_defs."createdAt", entity_defs.id
`);

const getDef = (datasetId, uuid, options = QueryOptions.none) => ({ maybeOne }) =>
_getDef(maybeOne, options.withCondition({ datasetId, uuid }));

const getAllDefs = (datasetId, uuid, options = QueryOptions.none) => ({ all }) =>
_getDef(all, options.withCondition({ datasetId, uuid }))
.then(map((v) => new Entity.Def(v, { creator: v.aux.creator, source: v.aux.source })));
Expand Down Expand Up @@ -658,10 +678,11 @@ module.exports = {
createSource,
createMany,
_createEntity, _updateEntity,
_computeBaseVersion, _holdSubmission, _deleteHeldSubmissionByEventId,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
createVersion,
countByDatasetId, getById,
countByDatasetId, getById, getDef,
getAll, getAllDefs, del,
createEntitiesFromPendingSubmissions,
resolveConflict,
Expand Down
47 changes: 43 additions & 4 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ describe('Offline Entities', () => {
const asAlice = await service.login('alice');
const branchId = uuid();

// Second submission updates the entity but entity hasn't been created yet
// Send the second submission that updates an entity (before the entity has been created)
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('create="1"', 'update="1"')
Expand All @@ -520,7 +520,7 @@ describe('Offline Entities', () => {
const backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(1);

// First submission creating the entity comes in later
// Send the second submission to create the entity
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('branchId=""', `branchId="${branchId}"`)
Expand Down Expand Up @@ -825,6 +825,9 @@ describe('Offline Entities', () => {

await exhaust(container);

let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(1);

await container.Entities.processHeldSubmissions();

await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc')
Expand All @@ -839,7 +842,7 @@ describe('Offline Entities', () => {
body.currentVersion.branchBaseVersion.should.equal(2);
});

const backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(0);
}));

Expand Down Expand Up @@ -974,13 +977,49 @@ describe('Offline Entities', () => {
body.currentVersion.data.should.eql({ status: 'checked in', first_name: 'Dana' });
body.currentVersion.label.should.eql('auto generated');
body.currentVersion.branchId.should.equal(branchId);
body.currentVersion.baseVersion.should.equal(2); // TODO: fix, this doesnt really make sense
body.currentVersion.baseVersion.should.equal(1);
body.currentVersion.branchBaseVersion.should.equal(2);
should.not.exist(body.currentVersion.trunkVersion);
});

backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(0);

// send in another update much later in the same branch
// base version is 10 now (many missing intermediate updates)
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('one', 'one-update10')
.replace('id="12345678-1234-4123-8234-123456789abc"', `id="${newUuid}"`)
.replace('branchId=""', `branchId="${branchId}"`)
.replace('baseVersion="1"', 'baseVersion="10"')
.replace('trunkVersion="1"', 'trunkVersion=""')
.replace('<status>arrived</status>', '<name>Dana</name><status>registered</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(1);

await container.Entities.processHeldSubmissions();

await asAlice.get(`/v1/projects/1/datasets/people/entities/${newUuid}`)
.expect(200)
.then(({ body }) => {
body.currentVersion.version.should.equal(3);
body.currentVersion.data.should.eql({ status: 'registered', first_name: 'Dana' });
body.currentVersion.label.should.eql('auto generated');
body.currentVersion.branchId.should.equal(branchId);
body.currentVersion.baseVersion.should.equal(2);
body.currentVersion.branchBaseVersion.should.equal(10);
should.not.exist(body.currentVersion.trunkVersion);
});

backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(0);
}));
});
});

0 comments on commit 231fe93

Please sign in to comment.