Skip to content

Commit

Permalink
Merge pull request #29 from monarch-initiative/ftp-ingest
Browse files Browse the repository at this point in the history
Added ftp download feature with glob capability
  • Loading branch information
hrshdhgd committed Mar 20, 2024
2 parents 9b15f47 + 2f533a7 commit 1f438ef
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 2 deletions.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ For an example, see [example/download.yaml](example/download.yaml)
Available options are:
- \***url**: The URL to download from. Currently supported:
- `http(s)`
- `ftp`
- with `glob:` option to download files with specific extensions (only with ftp as of now and looks recursively).
- Google Cloud Storage (`gs://`)
- Google Drive (`gdrive://` or https://drive.google.com/...). The file must be publicly accessible.
- Amazon AWS S3 bucket (`s3://`)
Expand Down
68 changes: 67 additions & 1 deletion kghub_downloader/download_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import ftplib
import json
import logging
import os
import pathlib
import re
from fnmatch import fnmatch
from ftplib import error_perm
from multiprocessing.sharedctypes import Value
from typing import List, Optional
from urllib.error import URLError
Expand Down Expand Up @@ -42,14 +45,14 @@ def download_from_yaml(
snippet_only: Downloads only the first 5 kB of each uncompressed source, for testing and file checks
tags: Limit to only downloads with this tag
mirror: Optional remote storage URL to mirror download to. Supported buckets: Google Cloud Storage
glob: Optional glob pattern to limit downloading to
Returns:
None.
"""

pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
with open(yaml_file) as f:
data = yaml.load(f, Loader=yaml.FullLoader)

# Limit to only tagged downloads, if tags are passed in
if tags:
data = [
Expand Down Expand Up @@ -110,6 +113,21 @@ def download_from_yaml(
bucket_name = url.split("/")[2]
remote_file = "/".join(url.split("/")[3:])
s3.download_file(bucket_name, remote_file, outfile)
elif url.startswith("ftp"):
glob = None
if "glob" in item:
glob = item["glob"]
ftp_username = (
os.getenv("FTP_USERNAME") if os.getenv("FTP_USERNAME") else None
)
ftp_password = (
os.getenv("FTP_PASSWORD") if os.getenv("FTP_PASSWORD") else None
)
host = url.split("/")[0]
path = "/".join(url.split("/")[1:])
ftp = ftplib.FTP(host)
ftp.login(ftp_username, ftp_password)
download_via_ftp(ftp, path, outfile, glob)
elif any(
url.startswith(str(i))
for i in list(GDOWN_MAP.keys()) + list(GDOWN_MAP.values())
Expand Down Expand Up @@ -292,3 +310,51 @@ def parse_url(url: str):
)
url = url.replace("{" + i + "}", secret)
return url


def download_via_ftp(ftp_server, current_dir, local_dir, glob_pattern=None):
"""Recursively download files from an FTP server matching the glob pattern."""
try:
# Change to the current directory on the FTP server
ftp_server.cwd(current_dir)

# List items in the current directory
items = ftp_server.nlst()

for item in items:
# Check if the item is a directory
if is_directory(ftp_server, item):
# Recursively download from the found directory
download_via_ftp(
ftp_server, item, os.path.join(local_dir, item), glob_pattern
)
# Go back to the parent directory
ftp_server.cwd("..")
else:
# Check if the file matches the pattern
if is_matching_filename(item, glob_pattern):
# Download the file
local_filepath = os.path.join(local_dir, item)
os.makedirs(os.path.dirname(local_filepath), exist_ok=True)
with open(local_filepath, "wb") as f:
ftp_server.retrbinary(f"RETR {item}", f.write)
except error_perm as e:
# Handle permission errors
print(f"Permission denied: {e}")


def is_directory(ftp_server, name):
"""Check if the given name is a directory on the FTP server."""
current = ftp_server.pwd()
try:
ftp_server.cwd(name) # Try changing to the directory
return True
except error_perm:
return False
finally:
ftp_server.cwd(current) # Always change back to the original directory


def is_matching_filename(filename, glob_pattern):
"""Check if the filename matches the glob pattern."""
return fnmatch(filename, glob_pattern) if glob_pattern else True
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 93 additions & 0 deletions test/unit/test_ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import ftplib
import os
import unittest
from ftplib import error_perm
from pathlib import Path
from unittest.mock import MagicMock, patch

from kghub_downloader.download_utils import (
download_via_ftp,
is_directory,
is_matching_filename,
)


class TestFTPDownload(unittest.TestCase):
def setUp(self):
# Set up a mock FTP server object
self.mock_ftp = MagicMock()

@patch("ftplib.FTP") # Mock the FTP class
def test_download_files(self, mock_ftp):
# Set up the mock FTP instance
ftp_instance = mock_ftp.return_value
ftp_instance.nlst.side_effect = [
["file1.txt", "dir1"], # Root directory listing
["file2.txt", "file3.txt"], # dir1 directory listing
]
ftp_instance.pwd.side_effect = ["/", "/dir1"]
ftp_instance.cwd.side_effect = lambda x: x

# Mock is_directory to return True for directories and False for files
with patch(
"kghub_downloader.download_utils.is_directory",
side_effect=lambda ftp, name: name == "dir1",
):
# Mock os.makedirs to prevent actual directory creation
with patch("os.makedirs") as makedirs_mock:
# Mock open to prevent actual file writing
with patch(
"builtins.open", new_callable=unittest.mock.mock_open()
) as mock_file:
# Call the function to be tested
download_via_ftp(ftp_instance, "/", "local_dir", "*.txt")

# Check that makedirs was called for the local directory structure
makedirs_mock.assert_called_with("local_dir/dir1", exist_ok=True)

# Check that the file was opened for writing
mock_file.assert_any_call("local_dir/file1.txt", "wb")
mock_file.assert_any_call("local_dir/dir1/file2.txt", "wb")
mock_file.assert_any_call("local_dir/dir1/file3.txt", "wb")

# Check that the correct number of files were attempted to be downloaded
self.assertEqual(mock_file.call_count, 3)

def test_is_directory_true(self):
# Mock the pwd and cwd methods for a directory
self.mock_ftp.pwd.return_value = "/"
self.mock_ftp.cwd.side_effect = lambda x: x

# Assert that is_directory returns True for a directory
self.assertTrue(is_directory(self.mock_ftp, "some_directory"))

def test_is_matching_filename(self):
# Test with matching pattern
self.assertTrue(is_matching_filename("file.txt", "*.txt"))

# Test with non-matching pattern
self.assertFalse(is_matching_filename("file.jpg", "*.txt"))

# Test with no pattern provided (should always return True)
self.assertTrue(is_matching_filename("file.jpg", None))

@unittest.skipIf(
os.getenv("GITHUB_ACTIONS") == "true", "This test needs credentials to run."
)
def test_actual_upload_download(self):
# Credentials available at: https://dlptest.com/ftp-test/
pwd = Path.cwd()
output_dir = pwd / "test/output"
resources_dir = pwd / "test/resources"
# Set up a real FTP server
ftp = ftplib.FTP("ftp.dlptest.com")
ftp.login(os.environ["FTP_USERNAME"], os.environ["FTP_PASSWORD"])
# upload the file ../resources/test_file.txt to the server
ftp.storbinary(
"STOR test_file.txt", open(f"{resources_dir}/testfile.txt", "rb")
)
# download the file test_file.txt from the server
download_via_ftp(ftp, "/", f"{output_dir}", "*.txt")
# Check that the file was downloaded correctly
self.assertTrue(os.path.exists(f"{output_dir}/test_file.txt"))
os.remove(f"{output_dir}/test_file.txt")

0 comments on commit 1f438ef

Please sign in to comment.