Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check approvalRequired reprocessing with offline entity queue #1162

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ const _holdSubmission = (run, submissionId, submissionDefId, branchId, branchBas
VALUES (${submissionId}, ${submissionDefId}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);

const _checkHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
const _checkHeldSubmission = (maybeOne, submissionId) => maybeOne(sql`
SELECT * FROM entity_submission_backlog
WHERE "submissionId"=${submissionId}`);

const _checkAndDeleteHeldSubmission = (maybeOne, branchId, branchBaseVersion) => maybeOne(sql`
DELETE FROM entity_submission_backlog
WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}
RETURNING *`);
Expand Down Expand Up @@ -367,6 +371,12 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
if (existingEntity.isDefined())
return null;

const existingHeldSubmission = await _checkHeldSubmission(maybeOne, submissionId);
// If the submission is being held for ordering offline entity processing,
// don't try to process it now, it will be dequeued and reprocessed elsewhere.
if (existingHeldSubmission.isDefined())
return null;

const submission = await Submissions.getSubAndDefById(submissionDefId);

// If Submission is deleted/purged - Safety check, will not be true at this line
Expand Down Expand Up @@ -425,7 +435,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
const { branchId, branchBaseVersion } = maybeEntity.aux.currentVersion;
// branchBaseVersion could be undefined if handling an offline create
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await _checkHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
const nextSub = await _checkAndDeleteHeldSubmission(maybeOne, branchId, currentBranchBaseVersion + 1);
if (nextSub.isDefined()) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId } = nextSub.get();
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
Expand Down
169 changes: 169 additions & 0 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -609,4 +609,173 @@ describe('Offline Entities', () => {
});
}));
});

describe('reprocessing submissions when toggling approvalRequired dataset flag', () => {
it('should not over-process a submission that is being held because it is later in a run', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

// Configure the entity list to create entities on submission approval
await asAlice.patch('/v1/projects/1/datasets/people')
.send({ approvalRequired: true })
.expect(200);

// This submission updates an existing entity.
// Trunk version is 1, but base version is higher than trunk version indicating it is later in the branch.
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('baseVersion="1"', 'baseVersion="2"')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// Observe that there is one held submission.
let count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(1);

// Observe that the submission was initially processed without error
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
.expect(200)
.then(({ body }) => {
should.not.exist(body[0].details.problem);
});

// Observe this update was not applied (there is no second version) because earlier update in branch is missing.
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
.then(({ body: versions }) => {
versions.length.should.equal(1);
});

// Trigger the submission reprocessing by updating the entity list settings
await asAlice.patch('/v1/projects/1/datasets/people?convert=true')
.send({ approvalRequired: false })
.expect(200);

await exhaust(container);

// Observe that there is still just one held submission.
count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(1);

// Observe that the submission still has no processing errors
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
});
ktuite marked this conversation as resolved.
Show resolved Hide resolved

// Observe that the update was still not applied.
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
.then(({ body: versions }) => {
versions.length.should.equal(1);
});

// Send in missing submission from earlier in the branch.
// Trunk version is 1, base version is 1
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('one', 'one-update')
.replace('<status>arrived</status>', '<status>waiting</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// Observe that both updates have now been applied.
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc/versions')
.then(({ body: versions }) => {
versions.length.should.equal(3);
});

// Observe that there are no longer any held submissions.
count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(0);
}));

it('should wait for approval of create submission in offline branch', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');

// Configure the entity list to create entities on submission approval
await asAlice.patch('/v1/projects/1/datasets/people')
.send({ approvalRequired: true })
.expect(200);

const branchId = uuid();

// First submission creates the entity, offline version is now 1
// But this submission requires approval so it wont get processed at first
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('branchId=""', `branchId="${branchId}"`)
)
.set('Content-Type', 'application/xml')
.expect(200);

// Second submission updates the entity
// but it should be waiting for first version to come through
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('create="1"', 'update="1"')
.replace('branchId=""', `branchId="${branchId}"`)
.replace('two', 'two-update')
.replace('baseVersion=""', 'baseVersion="1"')
.replace('<status>new</status>', '<status>checked in</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// Entity should not exist yet
await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd')
.expect(404);

// Neither submission should have a processing error
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
});
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
});

// There should be one submission (the second one) in the held submissions queue
let count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(1);

// Approving the first submission should start a chain that includes the second submission
await asAlice.patch('/v1/projects/1/forms/offlineEntity/submissions/two')
.send({ reviewState: 'approved' })
.expect(200);

await exhaust(container);

await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd')
.expect(200)
.then(({ body }) => {
body.currentVersion.version.should.equal(2);
body.currentVersion.baseVersion.should.equal(1);
body.currentVersion.data.should.eql({ age: '20', status: 'checked in', first_name: 'Megan' });

body.currentVersion.branchId.should.equal(branchId);
should.not.exist(body.currentVersion.trunkVersion);
body.currentVersion.branchBaseVersion.should.equal(1);
});

// Now there should be no submissions in the backlog.
count = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
count.should.equal(0);
}));
});
});