Skip to content

Commit

Permalink
Merge pull request #924 from ImageMarkup/fix-table-locking
Browse files Browse the repository at this point in the history
Properly lock longitudinal tables during metadata changes
  • Loading branch information
danlamanna committed Jul 2, 2024
2 parents 2aeb298 + c03e851 commit 316a1d9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 26 deletions.
4 changes: 2 additions & 2 deletions isic/core/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@


@contextmanager
def lock_table(cls: type[models.Model]):
def lock_table_for_writes(cls: type[models.Model]):
with transaction.atomic():
cursor = transaction.get_connection().cursor()
cursor.execute(f"LOCK TABLE {cls._meta.db_table}")
cursor.execute(f"LOCK TABLE {cls._meta.db_table} IN SHARE MODE")
try:
yield
finally:
Expand Down
4 changes: 2 additions & 2 deletions isic/ingest/services/cohort/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from isic.core.services.collection.image import collection_add_images
from isic.core.services.image import image_create
from isic.core.tasks import sync_elasticsearch_index_task
from isic.core.utils.db import lock_table
from isic.core.utils.db import lock_table_for_writes
from isic.ingest.models.accession import Accession
from isic.ingest.models.cohort import Cohort
from isic.ingest.models.metadata_file import MetadataFile
Expand Down Expand Up @@ -54,7 +54,7 @@ def cohort_publish(
)

# this creates a transaction
with lock_table(IsicId):
with lock_table_for_writes(IsicId):
for accession in cohort.accessions.publishable().iterator():
image = image_create(creator=publisher, accession=accession, public=public)
collection_add_images(collection=cohort.collection, image=image, ignore_lock=True)
Expand Down
46 changes: 24 additions & 22 deletions isic/ingest/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.template.loader import render_to_string
from isic_metadata.utils import get_unstructured_columns

from isic.core.utils.db import lock_table_for_writes
from isic.ingest.models import (
Accession,
AccessionStatus,
Expand Down Expand Up @@ -152,29 +153,30 @@ def update_metadata_task(user_pk: int, metadata_file_pk: int):
metadata_file = MetadataFile.objects.get(pk=metadata_file_pk)
user = User.objects.get(pk=user_pk)

with transaction.atomic():
with (
transaction.atomic(),
# Lock the longitudinal tables during metadata assignment
(_ for _ in Lesion.objects.select_for_update().all())
(_ for _ in Patient.objects.select_for_update().all())
(_ for _ in RcmCase.objects.select_for_update().all())

with metadata_file.blob.open("rb") as blob:
rows = MetadataFile.to_dict_reader(blob)

for chunk in itertools.batched(rows, 1_000):
accessions: QuerySet[Accession] = metadata_file.cohort.accessions.select_related(
"image", "review", "lesion", "patient", "rcm_case", "cohort"
).filter(original_blob_name__in=[row["filename"] for row in chunk])
accessions_by_filename: dict[str, Accession] = {
accession.original_blob_name: accession for accession in accessions
}

for row in chunk:
# filename doesn't need to be stored in the metadata since it's equal to
# original_blob_name
accession = accessions_by_filename[row["filename"]]
del row["filename"]
accession.update_metadata(user, row)
lock_table_for_writes(Lesion),
lock_table_for_writes(Patient),
lock_table_for_writes(RcmCase),
metadata_file.blob.open("rb") as blob,
):
rows = MetadataFile.to_dict_reader(blob)

for chunk in itertools.batched(rows, 1_000):
accessions: QuerySet[Accession] = metadata_file.cohort.accessions.select_related(
"image", "review", "lesion", "patient", "rcm_case", "cohort"
).filter(original_blob_name__in=[row["filename"] for row in chunk])
accessions_by_filename: dict[str, Accession] = {
accession.original_blob_name: accession for accession in accessions
}

for row in chunk:
# filename doesn't need to be stored in the metadata since it's equal to
# original_blob_name
accession = accessions_by_filename[row["filename"]]
del row["filename"]
accession.update_metadata(user, row)


@shared_task(soft_time_limit=3600, time_limit=3660)
Expand Down

0 comments on commit 316a1d9

Please sign in to comment.