Skip to content

Commit

Permalink
Add transaction to backlog processor task
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Aug 23, 2024
1 parent 9724f46 commit d1ac135
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
20 changes: 13 additions & 7 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,14 +534,19 @@ const _getHeldSubmissionsAsEvents = (force) => ({ all }) => all(sql`
ORDER BY "branchId", "branchBaseVersion"`)
.then(map(construct(Audit)));

const _processSingleBacklogEvent = (event) => (container) =>
container.db.transaction(async (trxn) => {
const { Entities } = container.with({ db: trxn });
await Entities._deleteHeldSubmissionByEventId(event.id);
await Entities.processSubmissionEvent(event, { details: { force: true } });
return true;
});

const processBacklog = (force = false) => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents(force);
return runSequentially(events.map(event =>
async () => {
await container.Entities._deleteHeldSubmissionByEventId(event.id);
return processSubmissionEvent(event, { details: { force: true } })(container);
}))
.then(() => events.length);
await runSequentially(events.map(event =>
() => container.Entities._processSingleBacklogEvent(event)));
return events.length;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -728,7 +733,8 @@ module.exports = {
_computeBaseVersion,
_holdSubmission, _checkHeldSubmission,
_getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId,
_getHeldSubmissionsAsEvents, processBacklog,
_getHeldSubmissionsAsEvents,
processBacklog, _processSingleBacklogEvent,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
createVersion,
Expand Down
3 changes: 0 additions & 3 deletions lib/task/process-backlog.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This task deletes expired sessions from the table so it does not become
// overladen and bogged down over time.

const { task } = require('./task');
const processBacklog = task.withContainer(({ Entities }) => Entities.processBacklog);
Expand Down

0 comments on commit d1ac135

Please sign in to comment.