Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add individual download progress #42

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 85 additions & 32 deletions kghub_downloader/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,99 @@
"""

import ftplib
import logging
import os
import sys
from contextlib import contextmanager
from fnmatch import fnmatch
from pathlib import Path
from urllib.error import URLError
from urllib.request import Request, urlopen

import boto3 # type: ignore
import gdown # type: ignore
import requests
from google.cloud import storage # type: ignore
from google.cloud.storage.blob import Blob # type: ignore
from tqdm.auto import tqdm
from tqdm import tqdm

from kghub_downloader.model import DownloadableResource
from kghub_downloader.schemes import register_scheme

GOOGLE_DRIVE_PREFIX = "https://drive.google.com/uc?id="

SNIPPET_SIZE = 1024 * 5
CHUNK_SIZE = 1024


def log_result(fn):
"""Log the result of a download function."""

def wrapped(item: DownloadableResource, *args, **kwargs):
try:
ret = fn(item, *args, **kwargs)
tqdm.write(f"OK: Downloaded {item.expanded_url}")
return ret
except BaseException as e:
tqdm.write(f"ERROR: Failed to download {item.expanded_url}")
raise e

return wrapped


@contextmanager
def open_with_write_progress(item: DownloadableResource, outfile_path: Path, size: int = 0, open_mode: str = "wb"):
"""Open the given file and wrap its write method in a tqdm progress bar."""
outfile_fd = outfile_path.open(open_mode)
try:
with tqdm.wrapattr(
outfile_fd,
"write",
desc=f"{item.expanded_url}",
total=size,
leave=False,
unit="B",
unit_scale=True,
) as file:
yield file
finally:
outfile_fd.close()


@register_scheme("gs")
@log_result
def google_cloud_storage(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> None:
"""Download from Google Cloud Storage."""
url = item.expanded_url
Blob.from_string(url, client=storage.Client()).download_to_filename(str(outfile_path))
blob = Blob.from_string(url, client=storage.Client())
with open_with_write_progress(item, outfile_path, blob.size) as outfile:
blob.download_to_file(outfile)


@register_scheme("gdrive")
@log_result
def google_drive(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> None:
"""Download from Google Drive."""
url = item.expanded_url
if url.startswith("gdrive:"):
url = GOOGLE_DRIVE_PREFIX + url[7:]
url = GOOGLE_DRIVE_PREFIX + url[7:]
gdown.download(url, output=str(outfile_path))


@register_scheme("s3")
@log_result
def s3(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> None:
"""Download from S3 bucket."""
url = item.expanded_url
s3 = boto3.client("s3")
s3 = boto3.resource("s3")
bucket_name = url.split("/")[2]
remote_file = "/".join(url.split("/")[3:])
s3.download_file(bucket_name, remote_file, str(outfile_path))

s3_object = s3.Object(bucket_name, remote_file)
object_size = s3_object.content_length

with open_with_write_progress(item, outfile_path, object_size) as outfile:
s3_object.download_fileobj(outfile)


@register_scheme("ftp")
@log_result
def ftp(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> None:
"""Download from an FTP server."""
url = item.expanded_url
Expand All @@ -82,6 +127,7 @@ def ftp(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> N


@register_scheme("git")
@log_result
def git(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> None:
"""Download from Git."""
url = item.url
Expand Down Expand Up @@ -133,40 +179,47 @@ def git(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> N
# Download the asset
response = requests.get(asset_url, stream=True, timeout=10)
response.raise_for_status()
with open(str(outfile_path), "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"Downloaded {asset_name}")
size = int(response.headers.get("Content-Length", 0))
with open_with_write_progress(item, outfile_path, size) as outfile:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
outfile.write(chunk)


@register_scheme("http")
@register_scheme("https")
@log_result
def http(item: DownloadableResource, outfile_path: Path, snippet_only: bool) -> None:
"""Download via HTTP. Google Drive URLs will be downloaded specially."""
url = item.expanded_url

if url.startswith(GOOGLE_DRIVE_PREFIX):
return google_drive(item, outfile_path, snippet_only)
gdown.download(url, output=str(outfile_path))
return

req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) # noqa: S310
try:
with urlopen(req) as response: # noqa: S310
response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, stream=True, timeout=10)
response.raise_for_status()

size = int(response.headers.get("Content-Length", 0))
if snippet_only and size > SNIPPET_SIZE:
size = SNIPPET_SIZE

with open_with_write_progress(item, outfile_path, size) as outfile:
size = 0
for chunk in response.iter_content(CHUNK_SIZE):
outfile.write(chunk)
if snippet_only:
data = response.read(5120) # first 5 kB of a `bytes` object
else:
data = response.read() # a `bytes` object

with open(str(outfile_path), "wb") as out_file:
out_file.write(data)
if snippet_only: # Need to clean up the outfile
with open(str(outfile_path), "r+") as fd:
data = fd.readlines()
fd.seek(0)
fd.write("\n".join(data[:-1]))
fd.truncate()
except URLError:
logging.error(f"Failed to download: {url}")
raise
size += CHUNK_SIZE
if size >= SNIPPET_SIZE:
response.close()
break

# Remove last line from output if snippet was downloaded
if snippet_only:
with open(str(outfile_path), "r+") as fd:
data = fd.readlines()
fd.seek(0)
fd.write("\n".join(data[:-1]))
fd.truncate()


def is_directory(ftp_server, name):
Expand Down
55 changes: 49 additions & 6 deletions kghub_downloader/download_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import logging
import pathlib
import time
import traceback
from typing import List, Optional
from urllib.parse import urlparse

Expand All @@ -12,14 +14,13 @@
from kghub_downloader.elasticsearch import download_from_elastic_search
from kghub_downloader.model import DownloadableResource

# from compress_json import compress_json


def download_from_yaml(
yaml_file: str,
output_dir: str,
ignore_cache: Optional[bool] = False,
snippet_only: bool = False,
verbose: bool = False,
tags: Optional[List] = None,
mirror: Optional[str] = None,
) -> None:
Expand All @@ -31,12 +32,15 @@ def download_from_yaml(
output_dir: A string pointing to where to write out downloaded files.
ignore_cache: Ignore cache and download files even if they exist. [false]
snippet_only: Downloads only the first 5 kB of each uncompressed source, for testing and file checks
verbose: Show verbose output
tags: Limit to only downloads with this tag
mirror: Optional remote storage URL to mirror download to. Supported buckets: Google Cloud Storage
Returns:
None.

"""
start_time = time.time()

pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

with open(yaml_file) as f:
Expand All @@ -48,7 +52,21 @@ def download_from_yaml(
if tags:
resources = [item for item in resources if item.tag in tags]

for item in tqdm(resources, desc="Downloading files"):
successful_ct = 0
unsuccessful_ct = 0
skipped_ct = 0

pbar = tqdm(
resources,
position=2,
leave=False,
bar_format="Downloading {n_fmt}/{total_fmt} [{bar:20}]",
ascii=".█",
)

for item in resources:
pbar.update()
pbar.refresh()
url = item.expanded_url
outfile_path = output_dir / item.path
outfile_dir = outfile_path.parent
Expand All @@ -70,6 +88,8 @@ def download_from_yaml(
outfile_path.unlink()
else:
logging.info("Using cached version of {outfile_path")
tqdm.write(f"SKIPPING: {outfile_path} already exists")
skipped_ct += 1
continue

# Download file
Expand All @@ -88,14 +108,37 @@ def download_from_yaml(

try:
download_fn(item, outfile_path, snippet_only)
successful_ct += 1
except BaseException as e:
unsuccessful_ct += 1
if outfile_path.exists():
outfile_path.unlink()

raise e
# If this was cancelled with Ctrl-C, re-raise the exception and let Typer handle it
if isinstance(e, KeyboardInterrupt):
pbar.close()
raise e

if verbose:
message = traceback.format_exception(e)[-1]
tqdm.write(f"{message}")

# Otherwise, continue downloading
# logging.error(f"Failed to download {item.expanded_url}")
continue

# If mirror, upload to remote storage
if mirror:
upload.mirror_to_bucket(outfile_path, mirror, item.path)

return None
pbar.close()
exec_time = time.time() - start_time

tqdm.write(
f"\n\nDownload completed in {exec_time:.2f} seconds.\n\n"
f" successful: {successful_ct}\n"
f" skipped: {skipped_ct}\n"
f" unsuccessful: {unsuccessful_ct}\n"
)

if (not verbose) and unsuccessful_ct > 0:
tqdm.write("Some downloads were unsuccessful. Run with --verbose to see errors\n")
2 changes: 2 additions & 0 deletions kghub_downloader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def main(
yaml_file: Optional[str] = typer.Argument("download.yaml", help="List of files to download in YAML format"),
output_dir: Optional[str] = typer.Option(".", help="Path to output directory"),
ignore_cache: Optional[bool] = typer.Option(False, help="Ignoring already downloaded files and download again"),
verbose: Optional[bool] = typer.Option(False, help="Show verbose output"),
tags: Optional[List[str]] = typer.Option(None, help="Optional list of tags to limit downloading to"), # noqa: B008
mirror: Optional[str] = typer.Option(
None,
Expand All @@ -25,6 +26,7 @@ def main(
yaml_file=yaml_file,
output_dir=output_dir,
ignore_cache=ignore_cache,
verbose=verbose,
tags=tags,
mirror=mirror,
)
Expand Down