Skip to content
This repository has been archived by the owner on Aug 24, 2024. It is now read-only.

SaveConditionError when modifying PeriodicTask #78

Open
der-joel opened this issue Sep 30, 2023 · 1 comment
Open

SaveConditionError when modifying PeriodicTask #78

der-joel opened this issue Sep 30, 2023 · 1 comment

Comments

@der-joel
Copy link

Currently the scheduler tries to save all entries in its local copy of the schedule to the database in its sync()-method:

def sync(self):
    for entry in self._schedule.values():
        entry.save()

The schedule property, which is accessed by beat internally will update this local copy periodically via get_from_database().
However, get_from_database() calls sync() first before updating the dictionary:

def get_from_database(self):
    self.sync()
    d = {}
    for doc in self.Model.objects():
        d[doc.name] = self.Entry(doc)
    return d

If any Document is deleted from the PeriodicTask-Colleciton this will cause a SaveConditionError to be raised during the next get_from_database()-call. Mongoengine raises this error if an update did not update any documents (this is achieved by checking n_modified). This is always the case because the local copy is saved before fetching updates (even if its counterpart on the database no longer exists). This also causes similar race conditions when updating documents in the PeriodicTask-Collection (changes are sometimes overridden by the local copy).

Maybe the local copy should not be modified at all and therefore never be saved to the database. Instead it should only be periodically updated to avoid too many database requests.
Variables like total_run_count could be updated via atomic update so no race conditions occur.

@der-joel
Copy link
Author

der-joel commented Sep 30, 2023

I extended @Jean-PhilippeD's approach (see #77) to solve this:

class MongodbScheduleEntry(ScheduleEntry):
    """Schedule entry that does not update total_run_count, last_run_at and run_immediately during save"""

    def save(self):
        """Save changes"""
        try:
            self._task.save(save_condition={})
        except Exception as exc:
            get_logger(__name__).exception("Exception during save", exc_info=exc)

class MongoScheduler(Scheduler):

    @property
    def schedule(self):
        """The current schedule"""
        return self._schedule

    def sync(self):
        """Synchronize schedule"""
        # update total_run_count, last_run_at and run_immediately of all docs in the db
        for k, entry in self._schedule.items():
            update = {}
            if entry.total_run_count > entry._task.total_run_count:
                update["total_run_count"] = entry.total_run_count
            if entry.last_run_at and entry._task.last_run_at and entry.last_run_at > entry._task.last_run_at:
                update["last_run_at"] = entry.last_run_at
            if entry._task.run_immediately:
                update["run_immediately"] = False
            if update:
                try:
                    entry._task._get_collection().update_one(fiter={"_id": entry._task.id}, update={})
                except Exception as exc:
                    get_logger(__name__).exception("Exception during sync", exc_info=exc)
        # update _schedule
        self._schedule = {doc.name: self.Entry(doc) for doc in self.Model.objects}

This will update the necessary fields using an atomic update before fetching the new version of all schedule entries from the database. update_one will fail silently if a document with the given id does not exist.
Use beat_max_loop_interval and beat_sync_every to configure when sync should occur.

I can create a PR for this if wanted.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant