Skip to content

Commit

Permalink
explicit check if this is directory or not
Browse files Browse the repository at this point in the history
  • Loading branch information
jrybicki-jsc committed Sep 12, 2023
1 parent 533b53a commit 1a03ca4
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
9 changes: 5 additions & 4 deletions tests/test_webdavs.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ def test_noforce_runs(self, cpy, f_exist, walker, get_conn, get_prefix, get_webd
@patch('utils.get_webdav_prefix')
@patch('decors.get_connection')
@patch('utils.walk_dir')
def test_stageout(self, walk, g, get_prefix, getwebdav):
@patch('utils.is_dir', return_value=False)
def test_stageout(self, is_dir, walk, g, get_prefix, getwebdav):
getwebdav.return_value = MagicMock()
get_prefix.retrun_value = '/prefix/'
sft_client = MagicMock()
Expand All @@ -274,6 +275,7 @@ def test_stageout(self, walk, g, get_prefix, getwebdav):
tbl=['/home/foo/path/to/file.txt', '/home/foo/other/file.txt']

walk.return_value = tbl
#is_dir.return_value
dagbag = DagBag(".")

dag = dagbag.get_dag(dag_id="webdav_stageout")
Expand All @@ -298,16 +300,15 @@ def test_stageout(self, walk, g, get_prefix, getwebdav):
@patch('utils.get_webdav_prefix')
@patch('decors.get_connection')
@patch('utils.walk_dir', side_effect=IOError)
@patch('utils.file_exist')
def test_stageout_file(self, exists, walk, g, get_prefix, getwebdav):
@patch('utils.is_dir', return_value=True)
def test_stageout_file(self, isdir, walk, g, get_prefix, getwebdav):
getwebdav.return_value = MagicMock()
get_prefix.retrun_value = '/prefix/'
sft_client = MagicMock()

g.get_conn().__enter__().open_sftp().return_value = sft_client
tbl=['/home/foo/path/to/file.txt', '/home/foo/other/file.txt']

exists.return_value = True
dagbag = DagBag(".")

dag = dagbag.get_dag(dag_id="webdav_stageout")
Expand Down
4 changes: 4 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from airflow.settings import Session
from sqlalchemy import update

from stat import S_ISDIR

def get_mlflow_client():
try:
Expand Down Expand Up @@ -91,6 +92,9 @@ def file_exist(sftp, name):
return r.st_size
except:
return -1

def is_dir(sftp, name):
return S_ISDIR(sftp.stat(name).st_mode)


def http2ssh(url: str, ssh_client, remote_name: str, force=True, auth=None):
Expand Down
12 changes: 6 additions & 6 deletions webdav_stageout.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
resolve_oid,
get_webdav_client,
get_webdav_prefix,
walk_dir,
clean_up_vaultid,
file_exist
file_exist,
is_dir
)


Expand Down Expand Up @@ -69,14 +69,14 @@ def copy(**context):

copied = {}


try:
mappings = list(walk_dir(client=sclient, path=params["path"], prefix=""))
except IOError:
# single file?
if file_exist(sftp=sftp_client, name=params['path']):
if is_dir(sftp=sftp_client, name=params['path']):
mappings = [params['path']]
params['path'] = os.path.dirname(params['path'])
else:
mappings = walk_dir(client=sclient, path=params["path"], prefix="")
except IOError:
print("Invalid path or file name")
return -1

Expand Down

0 comments on commit 1a03ca4

Please sign in to comment.