diff --git a/lib/model/query/entities.js b/lib/model/query/entities.js
index fad3e6762..2ce8b6832 100644
--- a/lib/model/query/entities.js
+++ b/lib/model/query/entities.js
@@ -201,7 +201,11 @@ const _holdSubmission = (run, submissionId, submissionDefId, branchId, branchBas
VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);
-const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
+const _checkHeldSubmission = (maybeOne, submissionId) => maybeOne(sql`
+ SELECT * FROM entity_submission_backlog
+ WHERE "submissionId"=${submissionId}`);
+
+const _checkAndDeleteHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
DELETE FROM entity_submission_backlog
WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}
RETURNING *`);
@@ -367,6 +371,12 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
if (existingEntity.isDefined())
return null;
+ const existingHeldSubmission = await _checkHeldSubmission(maybeOne, submissionId);
+ // If the submission is being held for ordering offline entity processing,
+ // don't try to process it now, it will be dequeued and reprocessed elsewhere.
+ if (existingHeldSubmission.isDefined())
+ return null;
+
const submission = await Submissions.getSubAndDefById(submissionDefId);
// If Submission is deleted/purged - Safety check, will not be true at this line
@@ -425,7 +435,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
const { branchId, branchBaseVersion } = maybeEntity.aux.currentVersion;
// branchBaseVersion could be undefined if handling an offline create
const currentBranchBaseVersion = branchBaseVersion ?? 0;
- const nextSub = await _checkHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
+ const nextSub = await _checkAndDeleteHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
if (nextSub.isDefined()) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get();
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
diff --git a/test/integration/api/offline-entities.js b/test/integration/api/offline-entities.js
index 337a1cb2c..df805c833 100644
--- a/test/integration/api/offline-entities.js
+++ b/test/integration/api/offline-entities.js
@@ -609,4 +609,173 @@ describe('Offline Entities', () => {
});
}));
});
+
+ describe('reprocessing submissions when toggling approvalRequired dataset flag', () => {
+ it('should not over-process a submission that is being held because it is later in a run', testOfflineEntities(async (service, container) => {
+ const asAlice = await service.login('alice');
+ const branchId = uuid();
+
+ // Configure the entity list to create entities on submission approval
+ await asAlice.patch('/v1/projects/1/datasets/people')
+ .send({ approvalRequired: true })
+ .expect(200);
+
+ // This submission updates an existing entity.
+ // Trunk version is 1, but base version is higher than trunk version indicating it is later in the branch.
+ await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
+ .send(testData.instances.offlineEntity.one
+ .replace('branchId=""', `branchId="${branchId}"`)
+ .replace('baseVersion="1"', 'baseVersion="2"')
+ )
+ .set('Content-Type', 'application/xml')
+ .expect(200);
+
+ await exhaust(container);
+
+ // Observe that there is one held submission.
+ let count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
+ count.should.equal(1);
+
+ // Observe that the submission was initially processed without error
+ await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
+ .expect(200)
+ .then(({ body }) => {
+ should.not.exist(body[0].details.problem);
+ });
+
+ // Observe this update was not applied (there is no second version) because earlier update in branch is missing.
+ await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
+ .then(({ body: versions }) => {
+ versions.length.should.equal(1);
+ });
+
+ // Trigger the submission reprocessing by updating the entity list settings
+ await asAlice.patch('/v1/projects/1/datasets/people?convert=true')
+ .send({ approvalRequired: false })
+ .expect(200);
+
+ await exhaust(container);
+
+ // Observe that there is still just one held submission.
+ count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
+ count.should.equal(1);
+
+ // Observe that the submission still has no processing errors
+ await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
+ .expect(200)
+ .then(({ body }) => {
+ body.length.should.equal(1);
+ should.not.exist(body[0].details.problem);
+ });
+
+ // Observe that the update was still not applied.
+ await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
+ .then(({ body: versions }) => {
+ versions.length.should.equal(1);
+ });
+
+ // Send in missing submission from earlier in the branch.
+ // Trunk version is 1, base version is 1
+ await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
+ .send(testData.instances.offlineEntity.one
+ .replace('branchId=""', `branchId="${branchId}"`)
+ .replace('one', 'one-update')
+ .replace('arrived', 'waiting')
+ )
+ .set('Content-Type', 'application/xml')
+ .expect(200);
+
+ await exhaust(container);
+
+ // Observe that both updates have now been applied.
+ await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
+ .then(({ body: versions }) => {
+ versions.length.should.equal(3);
+ });
+
+ // Observe that there are no longer any held submissions.
+ count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
+ count.should.equal(0);
+ }));
+
+ it('should wait for approval of create submission in offline branch', testOfflineEntities(async (service, container) => {
+ const asAlice = await service.login('alice');
+
+ // Configure the entity list to create entities on submission approval
+ await asAlice.patch('/v1/projects/1/datasets/people')
+ .send({ approvalRequired: true })
+ .expect(200);
+
+ const branchId = uuid();
+
+ // First submission creates the entity, offline version is now 1
+ // But this submission requires approval so it wont get processed at first
+ await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
+ .send(testData.instances.offlineEntity.two
+ .replace('branchId=""', `branchId="${branchId}"`)
+ )
+ .set('Content-Type', 'application/xml')
+ .expect(200);
+
+ // Second submission updates the entity
+ // but it should be waiting for first version to come through
+ await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
+ .send(testData.instances.offlineEntity.two
+ .replace('create="1"', 'update="1"')
+ .replace('branchId=""', `branchId="${branchId}"`)
+ .replace('two', 'two-update')
+ .replace('baseVersion=""', 'baseVersion="1"')
+ .replace('new', 'checked in')
+ )
+ .set('Content-Type', 'application/xml')
+ .expect(200);
+
+ await exhaust(container);
+
+ // Entity should not exist yet
+ await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd')
+ .expect(404);
+
+ // Neither submission should have a processing error
+ await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two/audits')
+ .expect(200)
+ .then(({ body }) => {
+ body.length.should.equal(1);
+ should.not.exist(body[0].details.problem);
+ });
+ await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
+ .expect(200)
+ .then(({ body }) => {
+ body.length.should.equal(1);
+ should.not.exist(body[0].details.problem);
+ });
+
+ // There should be one submission (the second one) in the held submissions queue
+ let count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
+ count.should.equal(1);
+
+ // Approving the first submission should start a chain that includes the second submission
+ await asAlice.patch('/v1/projects/1/forms/offlineEntity/submissions/two')
+ .send({ reviewState: 'approved' })
+ .expect(200);
+
+ await exhaust(container);
+
+ await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd')
+ .expect(200)
+ .then(({ body }) => {
+ body.currentVersion.version.should.equal(2);
+ body.currentVersion.baseVersion.should.equal(1);
+ body.currentVersion.data.should.eql({ age: '20', status: 'checked in', first_name: 'Megan' });
+
+ body.currentVersion.branchId.should.equal(branchId);
+ should.not.exist(body.currentVersion.trunkVersion);
+ body.currentVersion.branchBaseVersion.should.equal(1);
+ });
+
+ // Now there should be no submissions in the backlog.
+ count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
+ count.should.equal(0);
+ }));
+ });
});