Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate input annotations to primary.cwlprov files #1678

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions cwltool/provenance_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ def evaluate(
self.prospective_prov(job)
customised_job = copy_job_order(job, job_order_object)
self.used_artefacts(customised_job, self.workflow_run_uri)
# if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place
# metadata = job_order_object[CWLPROV['prov'].uri] # change uri to CWLPROV['prov'].uri
# for item in metadata:
# # make a new entity with id
# # give it type additionalType value
# # add nested annotations
# # how much of this can we reuse from _add_nested_annotations?
# # how do we identify the correct file to write to? self.workflow_run_uri?
# #
# pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First step in propagating metadata under cwlprov:prov to provenance as well.

def record_process_start(
self, process: Process, job: JobsType, process_run_id: Optional[str] = None
Expand Down Expand Up @@ -301,6 +311,31 @@ def record_process_end(
self.generate_output_prov(outputs, process_run_id, process_name)
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)

def _add_nested_annotations(
self, annotation_key: str, annotation_value: Any, e: ProvEntity
) -> ProvEntity:
"""Propagate input data annotations to provenance."""
# Change https:// into http:// first
schema2_uri = "https://schema.org/"
if schema2_uri in annotation_key:
annotation_key = SCHEMA[annotation_key.replace(schema2_uri, "")].uri

if not isinstance(annotation_value, (MutableSequence, MutableMapping)):
e.add_attributes({annotation_key: str(annotation_value)})
elif isinstance(annotation_value, MutableSequence):
for item_value in annotation_value:
e = self._add_nested_annotations(annotation_key, item_value, e)
else:
nested_id = uuid.uuid4().urn
nested_entity = self.document.entity(nested_id)
e.add_attributes({annotation_key: nested_entity.identifier})
for nested_key in annotation_value:
nested_value = annotation_value[nested_key]
nested_entity = self._add_nested_annotations(
nested_key, nested_value, nested_entity
)
return e

def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
if value["class"] != "File":
raise ValueError("Must have class:File: %s" % value)
Expand Down Expand Up @@ -361,6 +396,27 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
)
self.document.specializationOf(file_entity, entity)

# Identify all schema annotations
schema_annotations = dict(
[(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")]
Fixed Show fixed Hide fixed
)

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
additional_type = cast(str, schema_annotations[s]).split(sep="/")[
-1
] # find better method?
file_entity.add_attributes({PROV_TYPE: SCHEMA[additional_type]})
else:
file_entity = self._add_nested_annotations(
s, schema_annotations[s], file_entity
)

# Transfer format annotations to provenance:
if "format" in value:
file_entity.add_attributes({SCHEMA["encodingFormat"]: value["format"]})

# Check for secondaries
for sec in cast(
MutableSequence[CWLObjectType], value.get("secondaryFiles", [])
Expand Down Expand Up @@ -406,6 +462,7 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
(PROV_TYPE, RO["Folder"]),
],
)

# ORE description of ro:Folder, saved separately
coll_b = dir_bundle.entity(
dir_id,
Expand Down Expand Up @@ -468,6 +525,21 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
coll.add_attributes(coll_attribs)
coll_b.add_attributes(coll_b_attribs)

# Identify all schema annotations
schema_annotations = dict(
[(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")]
Fixed Show fixed Hide fixed
)

# Transfer SCHEMA annotations to provenance
for s in schema_annotations:
if "additionalType" in s:
additional_type = cast(str, schema_annotations[s]).split(sep="/")[
-1
] # find better method?
coll.add_attributes({PROV_TYPE: SCHEMA[additional_type]})
elif "hasPart" not in s:
coll = self._add_nested_annotations(s, schema_annotations[s], coll)

# Also Save ORE Folder as annotation metadata
ore_doc = ProvDocument()
ore_doc.add_namespace(ORE)
Expand Down
22 changes: 22 additions & 0 deletions tests/test_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ def test_revsort_workflow(tmp_path: Path) -> None:
check_provenance(folder)


@needs_docker
def test_revsort_label_annotations(tmp_path: Path) -> None:
"""Affirm that EDAM file formats in the input object make it into CWLProv."""
base_path = cwltool(
tmp_path,
get_data("tests/wf/revsort.cwl"),
get_data("tests/wf/revsort-job.json"),
)
prov_file = base_path / "metadata" / "provenance" / "primary.cwlprov.nt"
arcp_root = find_arcp(base_path)
g = Graph()
with open(prov_file, "rb") as f:
g.parse(file=f, format="nt", publicID=arcp_root)
mime_having_objects = list(g.subjects(SCHEMA.encodingFormat))
assert len(mime_having_objects) == 2
for obj in mime_having_objects:
assert (
cast(Literal, list(g.objects(obj, SCHEMA.encodingFormat))[0]).value
== "https://www.iana.org/assignments/media-types/text/plain"
)


@needs_docker
def test_nested_workflow(tmp_path: Path) -> None:
check_provenance(cwltool(tmp_path, get_data("tests/wf/nested.cwl")), nested=True)
Expand Down