Skip to content

Commit

Permalink
Merge pull request #961 from ImageMarkup/refactor-log-processing
Browse files Browse the repository at this point in the history
Stop using pandas for log processing
  • Loading branch information
danlamanna committed Sep 13, 2024
2 parents 911ab68 + 930c8f9 commit 6f04cf2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 57 deletions.
1 change: 0 additions & 1 deletion isic/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ class DevelopmentConfiguration(IsicMixin, DevelopmentBaseConfiguration):
"from isic.stats.tasks import *",
"from isic.studies.tasks import *",
"from opensearchpy import OpenSearch",
"import pandas as pd",
]
SHELL_PLUS_PRINT_SQL_TRUNCATE = None
RUNSERVER_PLUS_PRINT_SQL_TRUNCATE = None
Expand Down
104 changes: 55 additions & 49 deletions isic/stats/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections import defaultdict
from collections.abc import Iterable
import csv
import datetime
from datetime import timedelta
import gzip
from io import BytesIO
Expand Down Expand Up @@ -123,49 +125,34 @@ def _cdn_log_objects(s3) -> Iterable[dict]:
yield from page.get("Contents", [])


def _cdn_access_log_records(s3, s3_log_object_key: str) -> Iterable[dict]:
import pandas as pd

try:
data = s3.get_object(Bucket=settings.CDN_LOG_BUCKET, Key=s3_log_object_key)
except s3.exceptions.NoSuchKey:
# ignore the case where it was already processed and deleted by another task
return

with gzip.GzipFile(fileobj=BytesIO(data["Body"].read())) as stream:
def _cdn_access_log_records(log_file_bytes: BytesIO) -> Iterable[dict]:
# See https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#access-logs-timing
# for the format of the log file.
with gzip.GzipFile(fileobj=log_file_bytes) as stream:
version_line, headers_line = stream.readlines()[0:2]
if not version_line.startswith(b"#Version: 1.0"):
raise Exception(f"Unknown version in log file {s3_log_object_key}")
raise Exception("Unknown version in log file")

headers = headers_line.decode("utf-8").replace("#Fields:", "").strip().split()
stream.seek(0)
logs = pd.read_table(
stream,
skiprows=2,
names=headers,
usecols=[
"date",
"time",
"cs-uri-stem",
"c-ip",
"cs(User-Agent)",
"x-edge-request-id",
"sc-status",
],
delimiter="\\s+",
)

logs["download_time"] = pd.to_datetime(logs["date"] + " " + logs["time"], utc=True)

for _, row in logs.iterrows():
yield {
"download_time": row["download_time"],
"path": row["cs-uri-stem"].lstrip("/"),
"ip_address": row["c-ip"],
"user_agent": urllib.parse.unquote(row["cs(User-Agent)"]),
"request_id": row["x-edge-request-id"],
"status": row["sc-status"],
}
reader = csv.DictReader(
(line.decode() for line in stream.readlines()[2:]),
fieldnames=headers,
delimiter="\t",
strict=True,
)
for row in reader:
yield {
"download_time": timezone.datetime.strptime(
f"{row['date']} {row['time']}", "%Y-%m-%d %H:%M:%S"
).replace(tzinfo=datetime.UTC),
"path": row["cs-uri-stem"].lstrip("/"),
"ip_address": row["c-ip"],
"user_agent": urllib.parse.unquote(row["cs(User-Agent)"]),
"request_id": row["x-edge-request-id"],
"status": int(row["sc-status"]),
}


@shared_task(soft_time_limit=60, time_limit=120)
Expand All @@ -182,27 +169,53 @@ def collect_image_download_records_task():
def process_s3_log_file_task(s3_log_object_key: str):
s3 = _s3_client()

try:
data = s3.get_object(Bucket=settings.CDN_LOG_BUCKET, Key=s3_log_object_key)
except s3.exceptions.NoSuchKey:
# ignore the case where it was already processed and deleted by another task
return

log_file_bytes = BytesIO(data["Body"].read())

_process_s3_log_file_task(log_file_bytes)

delete = s3.delete_object(
Bucket=settings.CDN_LOG_BUCKET,
Key=s3_log_object_key,
)

if delete["ResponseMetadata"]["HTTPStatusCode"] != 204:
raise Exception(f"Failed to delete s3 log file {s3_log_object_key}")


def _process_s3_log_file_task(log_file_bytes: BytesIO):
# This batch size is important because it bounds the size of the bulk creations
# AND (implicitly) the number of items in the "where in" clause.
BATCH_SIZE = 1_000 # noqa: N806

with transaction.atomic():
# go through only the images that mapped onto request paths (this ignores thumbnails and
# other files). this can create a query with tens of thousands of elements in the "where in"
# clause, so it needs to be batched.
# note that the COG images return a 206 partial content status, so this doesn't count
# the individual tiles that are downloaded.
for download_logs in itertools.batched(
filter(lambda r: r["status"] == 200, _cdn_access_log_records(s3, s3_log_object_key)),
1_000,
filter(lambda r: r["status"] == 200, _cdn_access_log_records(log_file_bytes)),
BATCH_SIZE,
):
# if any request_id has already been processed, it means the entire file has been.
# this means the task is being executed again, and should avoid processing log files
# but needs to delete the log file from s3.
if ImageDownload.objects.filter(request_id=download_logs[0]["request_id"]).exists():
logger.info("Skipping already processed log file %s", s3_log_object_key)
logger.info("Skipping already processed log file")
break

downloaded_paths_to_image_id: dict[str, int] = dict(
Image.objects.filter(
accession__blob__in=[download_log["path"] for download_log in download_logs]
).values_list("accession__blob", "id")
)
.order_by()
.values_list("accession__blob", "id")
)

image_downloads: list[ImageDownload] = []
Expand All @@ -220,17 +233,10 @@ def process_s3_log_file_task(s3_log_object_key: str):
)

try:
ImageDownload.objects.bulk_create(image_downloads)
ImageDownload.objects.bulk_create(image_downloads, batch_size=BATCH_SIZE)
except IntegrityError as e:
# Ignore duplicate entries, this is necessary because another transaction can be
# committed between the time of the earlier check and now.
# See https://www.postgresql.org/docs/current/errcodes-appendix.html
if e.__cause__.pgcode != "23505":
raise

delete = s3.delete_object(
Bucket=settings.CDN_LOG_BUCKET,
Key=s3_log_object_key,
)
if delete["ResponseMetadata"]["HTTPStatusCode"] != 204:
raise Exception(f"Failed to delete s3 log file {s3_log_object_key}")
10 changes: 3 additions & 7 deletions isic/stats/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,8 @@ def test_collect_google_analytics_task(mocker, settings):


def test_cdn_access_log_parsing(mocker):
def get_object(*args, **kwargs):
with pathlib.Path(data_dir / "cloudfront_log.gz").open("rb") as f:
return {"Body": io.BytesIO(f.read())}

records = list(
_cdn_access_log_records(mocker.MagicMock(get_object=get_object), mocker.MagicMock())
)
with pathlib.Path(data_dir / "cloudfront_log.gz").open("rb") as f:
records = list(_cdn_access_log_records(io.BytesIO(f.read())))

assert len(records) == 24
assert records[0] == {
Expand Down Expand Up @@ -92,6 +87,7 @@ def mock_client(*args, **kwargs):

mocker.patch("isic.stats.tasks.boto3", mocker.MagicMock(client=mock_client))
mocker.patch("isic.stats.tasks._cdn_log_objects", return_value=[{"Key": "foo"}])
mocker.patch("isic.stats.tasks.BytesIO", mocker.MagicMock())
mocker.patch(
"isic.stats.tasks._cdn_access_log_records",
return_value=[
Expand Down

0 comments on commit 6f04cf2

Please sign in to comment.