From 98a009befb79fd7c6ddf5cfcfb9bc6cf625c5eda Mon Sep 17 00:00:00 2001 From: Kathleen Tuite Date: Mon, 19 Aug 2024 13:44:27 -0700 Subject: [PATCH] Tests of backlog processing time range --- lib/model/query/entities.js | 12 ++--- test/integration/api/offline-entities.js | 65 ++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/lib/model/query/entities.js b/lib/model/query/entities.js index 2e0203694..7351386b7 100644 --- a/lib/model/query/entities.js +++ b/lib/model/query/entities.js @@ -201,10 +201,9 @@ const _checkHeldSubmission = (maybeOne, submissionId) => maybeOne(sql` SELECT * FROM entity_submission_backlog WHERE "submissionId"=${submissionId}`); -const _checkAndDeleteHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql` - DELETE FROM entity_submission_backlog - WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion} - RETURNING *`); +const _getNextHeldSubmissionInBranch = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql` + SELECT * FROM entity_submission_backlog + WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}`); const DAY_RANGE = config.has('default.taskSchedule.forceProcess') ? config.get('default.taskSchedule.forceProcess') @@ -478,9 +477,10 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset const { branchId, branchBaseVersion } = maybeEntity.aux.currentVersion; // branchBaseVersion could be undefined if handling an offline create const currentBranchBaseVersion = branchBaseVersion ?? 0; - const nextSub = await _checkAndDeleteHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1); + const nextSub = await _getNextHeldSubmissionInBranch(maybeOne, branchId, currentBranchBaseVersion + 1); if (nextSub.isDefined() && !forceOutOfOrderProcessing) { - const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get(); + const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get(); + await Entities._deleteHeldSubmissionByEventId(auditId); await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId }, { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId }); } diff --git a/test/integration/api/offline-entities.js b/test/integration/api/offline-entities.js index 141f8bc4c..3fb2f563a 100644 --- a/test/integration/api/offline-entities.js +++ b/test/integration/api/offline-entities.js @@ -1088,5 +1088,70 @@ describe('Offline Entities', () => { body.currentVersion.version.should.equal(1); }); })); + + describe('only force-process submissions held in backlog for a certain amount of time', () => { + it('should process a submission from over 7 days ago', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Neither update below will be applied at first because the first + // update in the branch is missing. + + // Send the first submission, which will be held in the backlog + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + // Update the timestamp on this backlog + await container.run(sql`UPDATE entity_submission_backlog SET "loggedAt" = "loggedAt" - interval '8 days'`); + + // Send the next submission, which will also be held in the backlog. + // This submission immediately follows the previous one, but force-processing + // the first submission does not cause this one to be processed. + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('one', 'one-update') + .replace('baseVersion="1"', 'baseVersion="3"') + .replace('arrived', 'departed') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + // Both submissions should be in the backlog now + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(2); + + // Process submissions that have been in the backlog for a long time + await container.Entities.processHeldSubmissions(); + + await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc') + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(2); + body.currentVersion.baseVersion.should.equal(1); + body.currentVersion.data.should.eql({ age: '22', status: 'arrived', first_name: 'Johnny' }); + + body.currentVersion.branchId.should.equal(branchId); + body.currentVersion.trunkVersion.should.equal(1); + body.currentVersion.branchBaseVersion.should.equal(2); + }); + + // One submission should still be in the backlog + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + })); + }); }); });