Skip to content

Commit

Permalink
Add task to process held submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Aug 16, 2024
1 parent 387549a commit 9dd0c9a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
16 changes: 16 additions & 0 deletions lib/bin/process-held-submissions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This script re-processes submissions containing offline entity actions that
// were previously held in a backlog due to submissions coming in out of order.

const { run } = require('../task/task');
const { processHeldSubmissions } = require('../task/process-held-submissions');
run(processHeldSubmissions());

13 changes: 9 additions & 4 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const config = require('config');
const { sql } = require('slonik');
const { Actor, Audit, Entity, Submission, Form } = require('../frames');
const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db');
Expand Down Expand Up @@ -196,7 +197,6 @@ SELECT actions
FROM dataset_form_defs
WHERE "datasetId" = ${datasetId} AND "formDefId" = ${formDefId}`);


const _checkHeldSubmission = (maybeOne, submissionId) => maybeOne(sql`
SELECT * FROM entity_submission_backlog
WHERE "submissionId"=${submissionId}`);
Expand All @@ -206,9 +206,14 @@ const _checkAndDeleteHeldSubmission = (maybeOne, branchId, branchBaseVersion) =>
WHERE "branchId"=${branchId} AND "branchBaseVersion" = ${branchBaseVersion}
RETURNING *`);

const _getHeldSubmissionsAsEvents = () => ({ all }) => all(sql`
const DAY_RANGE = config.has('default.taskSchedule.forceProcess')
? config.get('default.taskSchedule.forceProcess')
: 7; // Default is 7 days

const _getHeldSubmissionsAsEvents = (force) => ({ all }) => all(sql`
SELECT audits.* FROM entity_submission_backlog
JOIN audits on entity_submission_backlog."auditId" = audits.id
${force ? sql`` : sql`WHERE entity_submission_backlog."loggedAt" < current_date - cast(${DAY_RANGE} as int)`}
ORDER BY "branchId", "branchBaseVersion"`)
.then(map(construct(Audit)));

Expand Down Expand Up @@ -509,8 +514,8 @@ const createEntitiesFromPendingSubmissions = (submissionEvents, parentEvent) =>
() => processSubmissionEvent(event, parentEvent)(container)));


const processHeldSubmissions = () => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents();
const processHeldSubmissions = (force = false) => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents(force);
return runSequentially(events.map(event =>
async () => {
await container.Entities._deleteHeldSubmissionByEventId(event.id);
Expand Down
16 changes: 16 additions & 0 deletions lib/task/process-held-submissions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2019 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This task deletes expired sessions from the table so it does not become
// overladen and bogged down over time.

const { task } = require('./task');
const processHeldSubmissions = task.withContainer(({ Entities }) => Entities.processHeldSubmissions);
module.exports = { processHeldSubmissions };

12 changes: 6 additions & 6 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ describe('Offline Entities', () => {
let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(1);

await container.Entities.processHeldSubmissions();
await container.Entities.processHeldSubmissions(true);

await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc')
.expect(200)
Expand Down Expand Up @@ -874,7 +874,7 @@ describe('Offline Entities', () => {
let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(2);

await container.Entities.processHeldSubmissions();
await container.Entities.processHeldSubmissions(true);

await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc')
.expect(200)
Expand Down Expand Up @@ -913,7 +913,7 @@ describe('Offline Entities', () => {
let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(1);

await container.Entities.processHeldSubmissions();
await container.Entities.processHeldSubmissions(true);

await asAlice.get(`/v1/projects/1/datasets/people/entities/${newUuid}`)
.expect(200)
Expand Down Expand Up @@ -968,7 +968,7 @@ describe('Offline Entities', () => {
let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(2);

await container.Entities.processHeldSubmissions();
await container.Entities.processHeldSubmissions(true);

await asAlice.get(`/v1/projects/1/datasets/people/entities/${newUuid}`)
.expect(200)
Expand Down Expand Up @@ -1004,7 +1004,7 @@ describe('Offline Entities', () => {
backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`);
backlogCount.should.equal(1);

await container.Entities.processHeldSubmissions();
await container.Entities.processHeldSubmissions(true);

await asAlice.get(`/v1/projects/1/datasets/people/entities/${newUuid}`)
.expect(200)
Expand Down Expand Up @@ -1048,7 +1048,7 @@ describe('Offline Entities', () => {
backlogCount.should.equal(1);

// Force the update submission to be processed as a create
await container.Entities.processHeldSubmissions();
await container.Entities.processHeldSubmissions(true);

await asAlice.get(`/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd`)
.expect(200)
Expand Down

0 comments on commit 9dd0c9a

Please sign in to comment.