-
Notifications
You must be signed in to change notification settings - Fork 3
/
tasks.py
313 lines (257 loc) · 10.7 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""gdrive_sync tasks"""
import logging
from collections import Counter
from datetime import datetime
from typing import Optional
import celery
from celery import chain, chord
from django.conf import settings
from mitol.common.utils import chunks, now_in_utc
from content_sync.api import upsert_content_sync_state
from content_sync.decorators import single_task
from content_sync.tasks import sync_website_content
from gdrive_sync import api, utils
from gdrive_sync.constants import (
DRIVE_FILE_FIELDS,
DRIVE_FOLDER_FILES_FINAL,
DRIVE_FOLDER_VIDEOS_FINAL,
DRIVE_MIMETYPE_FOLDER,
WebsiteSyncStatus,
)
from gdrive_sync.models import DriveFile
from main.celery import app
from main.s3_utils import get_boto3_resource
from main.tasks import chord_finisher
from websites.constants import CONTENT_TYPE_RESOURCE
from websites.models import Website, WebsiteContent
# pylint:disable=unused-argument, raising-format-tuple
log = logging.getLogger(__name__)
@app.task()
def process_drive_file(drive_file_id: str):
"""
Run the necessary functions for processing a drive file
Returns:
drive_file_id (str | None): Returns the `drive_file_id`, None
if something goes wrong.
"""
drive_file = DriveFile.objects.get(file_id=drive_file_id)
try:
api.stream_to_s3(drive_file)
if drive_file.is_video():
api.transcode_gdrive_video(drive_file)
return drive_file_id # noqa: TRY300
except: # pylint:disable=bare-except # noqa: E722
log.exception("Error processing DriveFile %s", drive_file_id)
return None
@app.task()
def create_gdrive_resource_content_batch(drive_file_ids: list[Optional[str]]):
"""
Creates WebsiteContent resources from a Google Drive files identified by `drive_file_ids`.
`drive_file_ids` are expected to be results from `process_drive_file` tasks.
""" # noqa: D401, E501
for drive_file_id in drive_file_ids:
if drive_file_id is None:
continue
try:
drive_file = DriveFile.objects.get(file_id=drive_file_id)
except DriveFile.DoesNotExist as exc:
log.exception(
"Attempted to create resource for drive file %s which does not exist.",
drive_file_id,
exc_info=exc,
)
else:
api.create_gdrive_resource_content(drive_file)
@app.task()
def delete_drive_file(drive_file_id: str, sync_datetime: datetime):
"""
Delete the DriveFile if it is not being used in website page content.
See api.delete_drive_file for details.
"""
drive_file = DriveFile.objects.filter(file_id=drive_file_id).first()
if drive_file:
api.delete_drive_file(drive_file, sync_datetime=sync_datetime)
def _get_gdrive_files(website: Website) -> tuple[dict[str, list[dict]], list[str]]:
"""
Returns a tuple (files, errors).
`files` is a dict where keys are subfolder names and value is a
list of file objects.
`errors` is a list of errors while fetching files.
""" # noqa: D401
errors = []
gdrive_subfolder_files = {}
for subfolder in [DRIVE_FOLDER_FILES_FINAL, DRIVE_FOLDER_VIDEOS_FINAL]:
try:
query = f'parents = "{website.gdrive_folder}" and name="{subfolder}" and mimeType = "{DRIVE_MIMETYPE_FOLDER}" and not trashed' # noqa: E501
subfolder_list = list(
api.query_files(query=query, fields=DRIVE_FILE_FIELDS)
)
if not subfolder_list:
error_msg = f"Could not find drive subfolder {subfolder}"
log.error("%s for %s", error_msg, website.short_id)
errors.append(error_msg)
continue
gdrive_subfolder_files[subfolder] = list(
api.walk_gdrive_folder(
subfolder_list[0]["id"],
DRIVE_FILE_FIELDS,
)
)
except: # pylint:disable=bare-except # noqa: E722
error_msg = f"An error occurred when querying the {subfolder} google drive subfolder" # noqa: E501
errors.append(error_msg)
log.exception("%s for %s", error_msg, website.short_id)
return gdrive_subfolder_files, errors
@app.task(bind=True, acks_late=True, autoretry_for=(BlockingIOError,), retry_backoff=30)
@single_task(30)
def import_website_files(self, name: str):
"""Query the Drive API for all children of a website folder and import the files"""
if not api.is_gdrive_enabled():
return
website = Website.objects.get(name=name)
website.sync_status = WebsiteSyncStatus.PROCESSING
website.synced_on = now_in_utc()
website.sync_errors = []
gdrive_subfolder_files, errors = _get_gdrive_files(website)
deleted_drive_files = api.find_missing_files(
sum(gdrive_subfolder_files.values(), []), # noqa: RUF017
website,
)
delete_file_tasks = [
delete_drive_file.si(drive_file.file_id, website.synced_on)
for drive_file in deleted_drive_files
]
file_tasks = []
for gdrive_files in gdrive_subfolder_files.values():
occurrences = Counter([file.get("name") for file in gdrive_files])
for gdfile in gdrive_files:
try:
drive_file = api.process_file_result(
gdfile,
website=website,
sync_date=website.synced_on,
replace_file=occurrences[gdfile.get("name")] == 1,
)
if drive_file:
file_tasks.append(process_drive_file.s(drive_file.file_id))
except: # pylint:disable=bare-except # noqa: E722
errors.append(f"Error processing gdrive file {gdfile.get('name')}")
log.exception(
"Error processing gdrive file %s for %s",
gdfile.get("name"),
website.short_id,
)
website.sync_errors = errors
website.save()
workflow_steps = []
if file_tasks:
step = chord(
celery.group(*file_tasks), create_gdrive_resource_content_batch.s()
)
workflow_steps.append(step)
if delete_file_tasks:
step = chord(celery.group(*delete_file_tasks), chord_finisher.si())
workflow_steps.append(step)
if workflow_steps:
workflow_steps.append(update_website_status.si(website.pk, website.synced_on))
workflow_steps.append(sync_website_content.si(name))
workflow = chain(*workflow_steps)
raise self.replace(celery.group(workflow))
update_website_status(website.pk, website.synced_on)
@app.task()
def create_gdrive_folders(website_short_id: str):
"""Create gdrive folder for website if it doesn't already exist"""
if api.is_gdrive_enabled():
api.create_gdrive_folders(website_short_id)
@app.task()
def create_gdrive_folders_batch(short_ids: list[str]):
"""Create Google Drive folders for a batch of websites"""
errors = []
for short_id in short_ids:
try:
api.create_gdrive_folders(short_id)
except: # pylint:disable=bare-except # noqa: E722
log.exception("Could not create google drive folders for %s", short_id)
errors.append(short_id)
return errors or True
@app.task(bind=True)
def create_gdrive_folders_chunked(self, short_ids: list[str], chunk_size=500):
"""Chunk and group batches of calls to create google drive folders for sites"""
tasks = []
for website_subset in chunks(
sorted(short_ids),
chunk_size=chunk_size,
):
tasks.append(create_gdrive_folders_batch.s(website_subset)) # noqa: PERF401
raise self.replace(celery.group(tasks))
@app.task
def update_website_status(website_pk: str, sync_dt: datetime):
"""
Update the website gdrive sync status
"""
api.update_sync_status(Website.objects.get(pk=website_pk), sync_dt)
@app.task
def populate_file_sizes( # noqa: C901
website_name: str,
override_existing: bool = False, # noqa: FBT001, FBT002
):
"""Populate all resource content of `website` with the `file_size` metadata field.""" # noqa: E501
website = Website.objects.get(name=website_name)
log.info("Starting file size population for %s.", website_name)
updated_drive_files = []
updated_contents = []
s3 = get_boto3_resource("s3")
bucket = s3.Bucket(settings.AWS_STORAGE_BUCKET_NAME)
for content in website.websitecontent_set.filter(type=CONTENT_TYPE_RESOURCE):
if not override_existing and content.metadata.get("file_size"):
continue
try:
content.metadata["file_size"] = utils.fetch_content_file_size(
content, bucket=bucket
)
except Exception as ex: # pylint:disable=broad-except # noqa: BLE001
log.warning("Could not fetch file size for %s. %s", content, ex)
content.metadata["file_size"] = None
else:
if content.metadata["file_size"] is None:
log.info("Content %s has no file associated with it.", content)
log.debug(
"WebsiteContent %s now has file_size %s.",
content,
content.metadata["file_size"],
)
drive_files = content.drivefile_set.all()
for drive_file in drive_files:
if not override_existing and drive_file.size:
continue
try:
drive_file.size = utils.fetch_drive_file_size(drive_file, bucket)
except Exception as ex: # pylint:disable=broad-except # noqa: BLE001
log.warning("Could not fetch file size for %s. %s", drive_file, ex)
else:
if drive_file.size is None:
log.info("DriveFile %s has no file associated to it.", drive_file)
log.debug("DriveFile %s now has size %s.", drive_file, drive_file.size)
updated_drive_files.extend(drive_files)
updated_contents.append(content)
DriveFile.objects.bulk_update(updated_drive_files, ["size"])
WebsiteContent.objects.bulk_update(updated_contents, ["metadata"])
# bulk_update does not call pre/post_save signals.
# So we'll do the sync state update ourselves.
for content in updated_contents:
upsert_content_sync_state(content)
website.has_unpublished_draft = True
website.has_unpublished_live = True
website.save()
@app.task(bind=True)
def populate_file_sizes_bulk(
self,
website_names: list[str],
override_existing: bool = False, # noqa: FBT001, FBT002
):
"""Run populate_file_sizes for `website_names` sequentially."""
sub_tasks = [
populate_file_sizes.si(name, override_existing) for name in website_names
]
task_chain = chain(*sub_tasks)
raise self.replace(task_chain)