Skip to content

Commit

Permalink
fix: cascade delete permissions (closes #200) (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonas-brr authored Aug 28, 2024
1 parent 098a1d1 commit ec959b1
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
26 changes: 23 additions & 3 deletions backend/auth/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum, Table
from sqlalchemy.orm import backref, relationship
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum, text
from sqlalchemy.event import listens_for
from sqlalchemy.orm import relationship
from sqlalchemy.sql.schema import UniqueConstraint

from ..base_models import Base
from .enum import RecipientType, PermissionTargetType, PermissionType
from ..base_models import Base


class Group(Base):
Expand Down Expand Up @@ -104,3 +105,22 @@ class UserGroup(Base):
__table_args__ = (
UniqueConstraint('user_id', 'group_id'),
)


def delete_dangling_permissions(connection, recipient_type: RecipientType, recipient_id: int):
connection.execute(
text("DELETE FROM permissions WHERE recipient_type = :type AND recipient_id = :id"),
{"type": recipient_type.name, "id": recipient_id}
)


# Since permissions table doesn't relate to its recipient tables we can't use sqlalchemy's cascade
# setting, but we have to delete those manually using event listeners.
@listens_for(User, "after_delete")
def delete_dangling_user_permissions(_, connection, target):
delete_dangling_permissions(connection, RecipientType.USER, target.id)


@listens_for(Group, "after_delete")
def delete_dangling_group_permissions(_, connection, target):
delete_dangling_permissions(connection, RecipientType.GROUP, target.id)
26 changes: 25 additions & 1 deletion backend/tests/test_crud_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ def test_has_permission(self, dbsession: Session, testuser: User):
add_members(su_testgroup.id, [other_user.id], dbsession)

assert has_permission(other_user, req_perm, dbsession) is True


def test_revoke_permission(self, dbsession: Session):
user, group, parent_group = self._create_user_group_with_perm(dbsession)
Expand All @@ -320,3 +319,28 @@ def test_revoke_permission__not_existent(self, dbsession: Session):
result = revoke_permissions([9999], dbsession)
assert result is False
assert dbsession.query(Permission.id).count() == 5

@pytest.mark.parametrize(["recipient_type", "attr_name"], [
(RecipientType.USER.name, 'user'),
(RecipientType.GROUP.name, 'group'),
])
def test_cascade_delete_permissions(self, dbsession: Session, recipient_type, attr_name):
user, group, _ = self._create_user_group_with_perm(dbsession)

permissions_count = (
dbsession.query(Permission)
.filter_by(recipient_type=recipient_type, recipient_id=locals().get(attr_name).id)
.count()
)
assert permissions_count > 0

dbsession.delete(locals().get(attr_name))
dbsession.commit()

permissions_count = (
dbsession.query(Permission)
.filter_by(recipient_type=recipient_type, recipient_id=locals().get(attr_name).id)
.count()
)
assert permissions_count == 0

0 comments on commit ec959b1

Please sign in to comment.