Skip to content

Commit

Permalink
Check approvalRequired reprocessing with offline entity queue (#1162)
Browse files Browse the repository at this point in the history
* Tests that check approvalRequired reprocessing with offline entity queue behavior

* improving tests and comments
  • Loading branch information
ktuite authored Jul 18, 2024
1 parent beea5c8 commit ca2df75
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 2 deletions.
14 changes: 12 additions & 2 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ const _holdSubmission = (run, submissionId, submissionDefId, branchId, branchBas
VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);

const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
const _checkHeldSubmission = (maybeOne, submissionId) => maybeOne(sql`
SELECT * FROM entity_submission_backlog
WHERE "submissionId"=${submissionId}`);

const _checkAndDeleteHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
DELETE FROM entity_submission_backlog
WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}
RETURNING *`);
Expand Down Expand Up @@ -367,6 +371,12 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
if (existingEntity.isDefined())
return null;

const existingHeldSubmission = await _checkHeldSubmission(maybeOne, submissionId);
// If the submission is being held for ordering offline entity processing,
// don't try to process it now, it will be dequeued and reprocessed elsewhere.
if (existingHeldSubmission.isDefined())
return null;

const submission = await Submissions.getSubAndDefById(submissionDefId);

// If Submission is deleted/purged - Safety check, will not be true at this line
Expand Down Expand Up @@ -425,7 +435,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
const { branchId, branchBaseVersion } = maybeEntity.aux.currentVersion;
// branchBaseVersion could be undefined if handling an offline create
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await _checkHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
const nextSub = await _checkAndDeleteHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
if (nextSub.isDefined()) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get();
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
Expand Down
169 changes: 169 additions & 0 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -609,4 +609,173 @@ describe('Offline Entities', () => {
});
}));
});

describe('reprocessing submissions when toggling approvalRequired dataset flag', () => {
it('should not over-process a submission that is being held because it is later in a run', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

// Configure the entity list to create entities on submission approval
await asAlice.patch('/v1/projects/1/datasets/people')
.send({ approvalRequired: true })
.expect(200);

// This submission updates an existing entity.
// Trunk version is 1, but base version is higher than trunk version indicating it is later in the branch.
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('baseVersion="1"', 'baseVersion="2"')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// Observe that there is one held submission.
let count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(1);

// Observe that the submission was initially processed without error
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
.expect(200)
.then(({ body }) => {
should.not.exist(body[0].details.problem);
});

// Observe this update was not applied (there is no second version) because earlier update in branch is missing.
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
.then(({ body: versions }) => {
versions.length.should.equal(1);
});

// Trigger the submission reprocessing by updating the entity list settings
await asAlice.patch('/v1/projects/1/datasets/people?convert=true')
.send({ approvalRequired: false })
.expect(200);

await exhaust(container);

// Observe that there is still just one held submission.
count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(1);

// Observe that the submission still has no processing errors
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
});

// Observe that the update was still not applied.
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
.then(({ body: versions }) => {
versions.length.should.equal(1);
});

// Send in missing submission from earlier in the branch.
// Trunk version is 1, base version is 1
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('one', 'one-update')
.replace('<status>arrived</status>', '<status>waiting</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// Observe that both updates have now been applied.
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
.then(({ body: versions }) => {
versions.length.should.equal(3);
});

// Observe that there are no longer any held submissions.
count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(0);
}));

it('should wait for approval of create submission in offline branch', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');

// Configure the entity list to create entities on submission approval
await asAlice.patch('/v1/projects/1/datasets/people')
.send({ approvalRequired: true })
.expect(200);

const branchId = uuid();

// First submission creates the entity, offline version is now 1
// But this submission requires approval so it wont get processed at first
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('branchId=""', `branchId="${branchId}"`)
)
.set('Content-Type', 'application/xml')
.expect(200);

// Second submission updates the entity
// but it should be waiting for first version to come through
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('create="1"', 'update="1"')
.replace('branchId=""', `branchId="${branchId}"`)
.replace('two', 'two-update')
.replace('baseVersion=""', 'baseVersion="1"')
.replace('<status>new</status>', '<status>checked in</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// Entity should not exist yet
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd')
.expect(404);

// Neither submission should have a processing error
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
});
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
});

// There should be one submission (the second one) in the held submissions queue
let count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(1);

// Approving the first submission should start a chain that includes the second submission
await asAlice.patch('/v1/projects/1/forms/offlineEntity/submissions/two')
.send({ reviewState: 'approved' })
.expect(200);

await exhaust(container);

await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd')
.expect(200)
.then(({ body }) => {
body.currentVersion.version.should.equal(2);
body.currentVersion.baseVersion.should.equal(1);
body.currentVersion.data.should.eql({ age: '20', status: 'checked in', first_name: 'Megan' });

body.currentVersion.branchId.should.equal(branchId);
should.not.exist(body.currentVersion.trunkVersion);
body.currentVersion.branchBaseVersion.should.equal(1);
});

// Now there should be no submissions in the backlog.
count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(0);
}));
});
});

0 comments on commit ca2df75

Please sign in to comment.