Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No listing workflow #1861

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions cwltool.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"folders": [
{
"path": "."
}
],
"settings": {}
}
18 changes: 18 additions & 0 deletions cwltool/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,24 @@ def arg_parser() -> argparse.ArgumentParser:
type=str,
)

# TO DO: Not yet implemented
provgroup.add_argument(
"--no-data", # Maybe change to no-input and no-intermediate to ignore those kind of files?...
default=False,
action="store_true",
help="Disables the storage of input and output data files",
dest="no_data",
)

# TO DO: Not yet implemented
provgroup.add_argument(
"--no-input", # Maybe change to no-input and no-intermediate to ignore those kind of files?...
default=False,
action="store_true",
help="Disables the storage of input data files",
dest="no_input",
)

printgroup = parser.add_mutually_exclusive_group()
printgroup.add_argument(
"--print-rdf",
Expand Down
4 changes: 4 additions & 0 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ def addsf(
datum = cast(CWLObjectType, datum)
ll = schema.get("loadListing") or self.loadListing
if ll and ll != "no_listing":
# Debug show
for k in datum:
_logger.debug("Datum: %s: %s" % (k, datum[k]))
_logger.debug("----------------------------------------")
get_listing(
self.fs_access,
datum,
Expand Down
37 changes: 35 additions & 2 deletions cwltool/cwlprov/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import re
import uuid
from getpass import getuser
from typing import IO, Any, Callable, Dict, List, Optional, Tuple, Union
from typing import IO, Any, Dict, List, Optional, Tuple, Union

from typing_extensions import TypedDict

from cwltool.cwlprov.provenance_constants import Hasher

from ..loghandler import _logger


def _whoami() -> Tuple[str, str]:
"""Return the current operating system account as (username, fullname)."""
Expand Down Expand Up @@ -135,7 +139,7 @@
def checksum_copy(
src_file: IO[Any],
dst_file: Optional[IO[Any]] = None,
hasher: Optional[Callable[[], "hashlib._Hash"]] = None,
hasher=Hasher, # type: Callable[[], hashlib._Hash]
buffersize: int = 1024 * 1024,
) -> str:
"""Compute checksums while copying a file."""
Expand All @@ -158,6 +162,35 @@
pass
if os.path.exists(temp_location):
os.rename(temp_location, dst_file.name) # type: ignore

return content_processor(contents, src_file, dst_file, checksum, buffersize)

Check warning on line 166 in cwltool/cwlprov/__init__.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/__init__.py#L166

Added line #L166 was not covered by tests


def checksum_only(
src_file: IO[Any],
dst_file: Optional[IO[Any]] = None,
hasher=Hasher, # type: Callable[[], hashlib._Hash]
buffersize: int = 1024 * 1024,
) -> str:
"""Calculate the checksum only, does not copy the data files."""
if dst_file is not None:
_logger.error("Destination file should be None but it is %s", dst_file)
"""Compute checksums while copying a file."""

Check warning on line 178 in cwltool/cwlprov/__init__.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/__init__.py#L177-L178

Added lines #L177 - L178 were not covered by tests
# TODO: Use hashlib.new(Hasher_str) instead?
checksum = hasher()
contents = src_file.read(buffersize)

Check warning on line 181 in cwltool/cwlprov/__init__.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/__init__.py#L180-L181

Added lines #L180 - L181 were not covered by tests
# TODO Could be a function for both checksum_only and checksum_copy?
return content_processor(contents, src_file, dst_file, checksum, buffersize)

Check warning on line 183 in cwltool/cwlprov/__init__.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/__init__.py#L183

Added line #L183 was not covered by tests


def content_processor(
contents: Any,
src_file: IO[Any],
dst_file: Optional[IO[Any]],
checksum: "hashlib._Hash",
buffersize: int,
) -> str:
"""Calculate the checksum based on the content."""
while contents != b"":
if dst_file is not None:
dst_file.write(contents)
Expand Down
9 changes: 6 additions & 3 deletions cwltool/cwlprov/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ..loghandler import _logger
from ..process import Process, shortname
from ..stdfsaccess import StdFsAccess
from ..utils import CWLObjectType, JobsType, get_listing, posix_path, versionstring
from ..utils import CWLObjectType, JobsType, posix_path, versionstring
from ..workflow_job import WorkflowJob
from .provenance_constants import (
ACCOUNT_UUID,
Expand Down Expand Up @@ -243,6 +243,7 @@ def evaluate(
# record provenance of workflow executions
self.prospective_prov(job)
customised_job = copy_job_order(job, job_order_object)
# Note to self: Listing goes ok here
self.used_artefacts(customised_job, self.workflow_run_uri)

def record_process_start(
Expand Down Expand Up @@ -287,6 +288,7 @@ def record_process_end(
process_run_id: str,
outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
when: datetime.datetime,
# load_listing: None,
) -> None:
self.generate_output_prov(outputs, process_run_id, process_name)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)
Expand Down Expand Up @@ -408,8 +410,8 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
# a later call to this method will sort that
is_empty = True

if "listing" not in value:
get_listing(self.fsaccess, value)
# if "listing" not in value:
# get_listing(self.fsaccess, value)
for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])):
is_empty = False
# Declare child-artifacts
Expand Down Expand Up @@ -604,6 +606,7 @@ def used_artefacts(
job_order: Union[CWLObjectType, List[CWLObjectType]],
process_run_id: str,
name: Optional[str] = None,
load_listing=None,
) -> None:
"""Add used() for each data artefact."""
if isinstance(job_order, list):
Expand Down
51 changes: 42 additions & 9 deletions cwltool/cwlprov/ro.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@
posix_path,
versionstring,
)
from . import Aggregate, Annotation, AuthoredBy, _valid_orcid, _whoami, checksum_copy
from . import (
Aggregate,
Annotation,
AuthoredBy,
_valid_orcid,
_whoami,
checksum_copy,
checksum_only,
)
from .provenance_constants import (
ACCOUNT_UUID,
CWLPROV_VERSION,
Expand Down Expand Up @@ -66,6 +74,8 @@
temp_prefix_ro: str = "tmp",
orcid: str = "",
full_name: str = "",
no_data: bool = False,
no_input: bool = False,
) -> None:
"""Initialize the ResearchObject."""
self.temp_prefix = temp_prefix_ro
Expand All @@ -88,6 +98,8 @@
self.cwltool_version = f"cwltool {versionstring().split()[-1]}"
self.has_manifest = False
self.relativised_input_object: CWLObjectType = {}
self.no_data = no_data
self.no_input = no_input

self._initialize()
_logger.debug("[provenance] Temporary research object: %s", self.folder)
Expand Down Expand Up @@ -180,13 +192,22 @@
# Below probably OK for now as metadata files
# are not too large..?

checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)
if self.no_input:
_logger.debug("NO INPUT DATA TO BE CAPTURED!!!")

Check warning on line 196 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L196

Added line #L196 was not covered by tests

checksums[SHA1] = checksum_only(tag_file, hasher=hashlib.sha1)
tag_file.seek(0)
checksums[SHA256] = checksum_only(tag_file, hasher=hashlib.sha256)
tag_file.seek(0)
checksums[SHA512] = checksum_only(tag_file, hasher=hashlib.sha512)

Check warning on line 202 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L198-L202

Added lines #L198 - L202 were not covered by tests
else:
checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)

Check warning on line 204 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L204

Added line #L204 was not covered by tests

tag_file.seek(0)
checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)
tag_file.seek(0)
checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)

Check warning on line 207 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L206-L207

Added lines #L206 - L207 were not covered by tests

tag_file.seek(0)
checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)
tag_file.seek(0)
checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)

Check warning on line 210 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L209-L210

Added lines #L209 - L210 were not covered by tests

rel_path = posix_path(os.path.relpath(path, self.folder))
self.tagfiles.add(rel_path)
Expand Down Expand Up @@ -469,10 +490,14 @@
content_type: Optional[str] = None,
) -> str:
"""Copy inputs to data/ folder."""
# TODO Skip if no-input or no-data is used...?
self.self_check()
tmp_dir, tmp_prefix = os.path.split(self.temp_prefix)
with tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp:
checksum = checksum_copy(from_fp, tmp)
if self.no_data:
checksum = checksum_only(from_fp)

Check warning on line 498 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L498

Added line #L498 was not covered by tests
else:
checksum = checksum_copy(from_fp, tmp)

Check warning on line 500 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L500

Added line #L500 was not covered by tests

# Calculate hash-based file path
folder = os.path.join(self.folder, DATA, checksum[0:2])
Expand All @@ -493,7 +518,12 @@
_logger.warning("[provenance] Unknown hash method %s for bagit manifest", Hasher)
# Inefficient, bagit support need to checksum again
self._add_to_bagit(rel_path)
_logger.debug("[provenance] Added data file %s", path)
if "dir" in self.relativised_input_object:
_logger.debug(

Check warning on line 522 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L522

Added line #L522 was not covered by tests
"[provenance] Directory :%s", self.relativised_input_object["dir"]["basename"]
)
else:
_logger.debug("[provenance] Added data file %s", path)

Check warning on line 526 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L526

Added line #L526 was not covered by tests
if timestamp is not None:
createdOn, createdBy = self._self_made(timestamp)
self._file_provenance[rel_path] = cast(
Expand Down Expand Up @@ -557,7 +587,10 @@
checksums = dict(checksums)
with open(lpath, "rb") as file_path:
# FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile?
checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)
if self.data_option:
checksums[SHA1] = checksum_only(file_path, hasher=hashlib.sha1)

Check warning on line 591 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L591

Added line #L591 was not covered by tests
else:
checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)

Check warning on line 593 in cwltool/cwlprov/ro.py

View check run for this annotation

Codecov / codecov/patch

cwltool/cwlprov/ro.py#L593

Added line #L593 was not covered by tests

self.add_to_manifest(rel_path, checksums)

Expand Down
4 changes: 3 additions & 1 deletion cwltool/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@
):
process_run_id: Optional[str] = None
name = "primary"
process.parent_wf.generate_output_prov(self.final_output[0], process_run_id, name)
process.parent_wf.generate_output_prov(

Check warning on line 175 in cwltool/executors.py

View check run for this annotation

Codecov / codecov/patch

cwltool/executors.py#L175

Added line #L175 was not covered by tests
self.final_output[0], process_run_id, name
) # Note to self... # , "generate_output_prov")
process.parent_wf.document.wasEndedBy(
process.parent_wf.workflow_run_uri,
None,
Expand Down
6 changes: 5 additions & 1 deletion cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ def _execute(
and isinstance(job_order, (list, dict))
):
runtimeContext.prov_obj.used_artefacts(
job_order, runtimeContext.process_run_id, str(self.name)
job_order,
runtimeContext.process_run_id,
str(self.name),
load_listing=self.builder.loadListing,
)
else:
_logger.warning(
Expand Down Expand Up @@ -411,6 +414,7 @@ def stderr_stdout_log_path(
runtimeContext.process_run_id,
outputs,
datetime.datetime.now(),
# builder.loadListing # TODO FIX THIS
)
if processStatus != "success":
_logger.warning("[job %s] completed %s", self.name, processStatus)
Expand Down
25 changes: 22 additions & 3 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import signal
import subprocess # nosec
import sys
import tempfile
import time
import urllib
import warnings
Expand Down Expand Up @@ -693,6 +694,8 @@
temp_prefix_ro=args.tmpdir_prefix,
orcid=args.orcid,
full_name=args.cwl_full_name,
no_data=args.no_data,
no_input=args.no_input,
)
runtimeContext.research_obj = ro
log_file_io = open_log_file_for_activity(ro, ro.engine_uuid)
Expand Down Expand Up @@ -1138,12 +1141,28 @@
print(f"{args.workflow} is valid CWL.", file=stdout)
return 0

if args.print_rdf:
if args.print_rdf or args.provenance:
output = stdout
if args.provenance:
# Write workflow to temp directory
temp_workflow_dir = tempfile.TemporaryDirectory()
os.makedirs(temp_workflow_dir.name, exist_ok=True)
workflow_provenance = temp_workflow_dir.name + "/workflow.ttl"

Check warning on line 1150 in cwltool/main.py

View check run for this annotation

Codecov / codecov/patch

cwltool/main.py#L1148-L1150

Added lines #L1148 - L1150 were not covered by tests
# Sets up a turtle file for the workflow information
# (not yet in the provenance folder as it does
# not exist and creating it will give issues).
output = open(workflow_provenance, "w")
_logger.info("Writing workflow rdf to %s", workflow_provenance)

Check warning on line 1155 in cwltool/main.py

View check run for this annotation

Codecov / codecov/patch

cwltool/main.py#L1154-L1155

Added lines #L1154 - L1155 were not covered by tests
print(
printrdf(tool, loadingContext.loader.ctx, args.rdf_serializer),
file=stdout,
file=output,
)
return 0
# close the output
if args.provenance:
output.close()

Check warning on line 1162 in cwltool/main.py

View check run for this annotation

Codecov / codecov/patch

cwltool/main.py#L1162

Added line #L1162 was not covered by tests
# Only print_rdf exits this way
if args.print_rdf:
return 0

if args.print_dot:
printdot(tool, loadingContext.loader.ctx, stdout)
Expand Down
Loading
Loading