From f501617dcad2ce8dbb8e015e9ef79b222777d344 Mon Sep 17 00:00:00 2001 From: orionarcher Date: Thu, 5 Sep 2024 14:07:25 -0400 Subject: [PATCH] Add Solvation and Benchmarking builders with their supporting utility files. --- emmet-builders/emmet/builders/openmm/core.py | 361 ++++++++++++++++++ .../emmet/builders/openmm/openmm_utils.py | 130 +++++++ emmet-builders/emmet/builders/openmm/utils.py | 341 +++++++++++++++++ 3 files changed, 832 insertions(+) create mode 100644 emmet-builders/emmet/builders/openmm/core.py create mode 100644 emmet-builders/emmet/builders/openmm/openmm_utils.py create mode 100644 emmet-builders/emmet/builders/openmm/utils.py diff --git a/emmet-builders/emmet/builders/openmm/core.py b/emmet-builders/emmet/builders/openmm/core.py new file mode 100644 index 0000000000..f71db9227e --- /dev/null +++ b/emmet-builders/emmet/builders/openmm/core.py @@ -0,0 +1,361 @@ +from typing import Optional, List, Union +from pathlib import Path + +import numpy as np + +from maggma.core import Builder, Store +from maggma.stores import MemoryStore +from emmet.builders.openmm.utils import ( + create_solute, + identify_solute, + identify_networking_solvents, +) +from emmet.core.openff.solvation import SolvationDoc +from emmet.core.openff.benchmarking import SolventBenchmarkingDoc +from emmet.core.openmm import OpenMMTaskDocument +from emmet.core.openmm.calculations import CalculationsDoc +from emmet.core.utils import jsanitize + +from emmet.builders.openmm.openmm_utils import ( + insert_blobs, + instantiate_universe, + resolve_traj_path, + task_doc_to_universe, +) + + +class ElectrolyteBuilder(Builder): + """ + Builder to create solvation and calculations documents from OpenMM task documents. + + This class processes molecular dynamics (MD) simulations and generates + comprehensive reports including solvation properties and calculation results. + It leverages the OpenFF toolkit and MDAnalysis for molecular topology and trajectory + handling, respectively. + """ + + def __init__( + self, + md_docs: Store, + blobs: Store, + solute: Optional[Store] = None, + calculations: Optional[Store] = None, + query: Optional[dict] = None, + solute_analysis_classes: Union[List[str], str] = "all", + solvation_fallback_radius: float = 3, + chunk_size: int = 10, + ): + self.md_docs = md_docs + self.blobs = blobs + self.solute = solute or MemoryStore() + self.calculations = calculations or MemoryStore() + self.query = query or {} + self.solute_analysis_classes = solute_analysis_classes + self.solvation_fallback_radius = solvation_fallback_radius + + self.md_docs.key = "uuid" + self.blobs.key = "blob_uuid" + if self.solute: + self.solute.key = "job_uuid" + if self.calculations: + self.calculations.key = "job_uuid" + + super().__init__( + sources=[md_docs, blobs], + targets=[self.solute, self.calculations], + chunk_size=chunk_size, + ) + + # def prechunk(self, number_splits: int): # pragma: no cover + # """ + # Prechunk method to perform chunking by the key field + # """ + # q = dict(self.query) + # keys = self.electronic_structure.newer_in( + # self.materials, criteria=q, exhaustive=True + # ) + # N = ceil(len(keys) / number_splits) + # for split in grouper(keys, N): + # yield {"query": {self.materials.key: {"$in": list(split)}}} + + def get_items(self, local_trajectories=False): + self.logger.info("Electrolyte builder started.") + + hosts = self.md_docs.query(self.query, ["hosts"]) + flow_ids = {doc["hosts"][-1] for doc in hosts} # top level flows + + job_groups = [] + for flow_id in flow_ids: + # the last item in hosts should be the top level workflow + host_match = {"$expr": {"$eq": [{"$arrayElemAt": ["$hosts", -1]}, flow_id]}} + job_groups.append(list(self.md_docs.query(criteria=host_match))) + + items = [] + for jobs in job_groups: + # find the job with the most calcs in the flow, presumably the last + len_calcs = [len(job["output"]["calcs_reversed"] or []) for job in jobs] + last_job = jobs[np.argmax(len_calcs)] + + insert_blobs( + self.blobs, last_job["output"], include_traj=not local_trajectories + ) + + items.append(last_job) + + return items + + def get_items_from_directories(self): + # query will be ignored + return + + def process_items( + self, + items: List, + local_trajectories: bool = False, + rebase_traj_path: Optional[tuple[Path, Path]] = None, + ): + """ + + Parameters + ---------- + items: the items from get_items + local_trajectories: whether to look for files locally in lieu of downloading + rebase_traj_path: useful if the launch directory has moved + + Returns + ------- + + """ + self.logger.info(f"Processing {len(items)} materials for electrolyte builder.") + + processed_items = [] + for item in items: + # create task_doc + task_doc = OpenMMTaskDocument.parse_obj(item["output"]) + + # _ is needed bc traj_path may be a tmpfile and a reference must be in scope + traj_path, _ = resolve_traj_path( + task_doc, local_trajectories, rebase_traj_path + ) + + u = task_doc_to_universe(task_doc, traj_path) + + # create solute_doc + solute = create_solute( + u, + solute_name=identify_solute(u), + networking_solvents=identify_networking_solvents(u), + fallback_radius=self.solvation_fallback_radius, + analysis_classes=self.solute_analysis_classes, + ) + solute.run() + solvation_doc = SolvationDoc.from_solute( + solute, job_uuid=item["uuid"], flow_uuid=item["hosts"][-1] + ) + calculations_doc = CalculationsDoc.from_calcs_reversed( + task_doc.calcs_reversed, + job_uuid=item["uuid"], + flow_uuid=item["hosts"][-1], + ) + + # create docs + # TODO: what cleanup do I need? + docs = { + "solute": jsanitize(solvation_doc.model_dump()), + "calculations": jsanitize(calculations_doc.model_dump()), + } + + processed_items.append(docs) + + return processed_items + + def update_targets(self, items: List): + if len(items) > 0: + self.logger.info(f"Found {len(items)} electrolyte docs to update.") + + solutes = [item["solute"] for item in items] + self.solute.update(solutes) + + calculations = [item["calculations"] for item in items] + self.calculations.update(calculations) + + else: + self.logger.info("No items to update.") + + def instantiate_universe( + self, + job_uuid: str, + traj_directory: Union[str, Path] = ".", + overwrite_local_traj: bool = True, + ): + """ + Instantiate a MDAnalysis universe from a task document. + + This is useful if you want to analyze a small number of systems + without running the whole build pipeline. + + To get a solute, call create_solute using the universe. See + the body of process_items for the appropriate syntax. + + Args: + job_uuid: str + The UUID of the job. + traj_directory: str + Name of the DCD file to write. + overwrite_local_traj: bool + Whether to overwrite the local trajectory if it exists. + """ + return instantiate_universe( + self.md_docs, self.blobs, job_uuid, traj_directory, overwrite_local_traj + ) + + +class BenchmarkingBuilder(Builder): + """ + Builder to create solvation and calculations documents from OpenMM task documents. + + This class processes molecular dynamics (MD) simulations and generates + comprehensive reports including solvation properties and calculation results. + It leverages the OpenFF toolkit and MDAnalysis for molecular topology and trajectory + handling, respectively. + """ + + def __init__( + self, + md_docs: Store, + blobs: Store, + benchmarking: Optional[Store] = None, + query: Optional[dict] = None, + chunk_size: int = 10, + ): + self.md_docs = md_docs + self.blobs = blobs + self.benchmarking = benchmarking or MemoryStore() + self.query = query or {} + + self.md_docs.key = "uuid" + self.blobs.key = "blob_uuid" + self.benchmarking.key = "job_uuid" + + super().__init__( + sources=[md_docs, blobs], + targets=[self.benchmarking], + chunk_size=chunk_size, + ) + + # def prechunk(self, number_splits: int): # pragma: no cover + # """ + # Prechunk method to perform chunking by the key field + # """ + # q = dict(self.query) + # keys = self.electronic_structure.newer_in( + # self.materials, criteria=q, exhaustive=True + # ) + # N = ceil(len(keys) / number_splits) + # for split in grouper(keys, N): + # yield {"query": {self.materials.key: {"$in": list(split)}}} + + def get_items(self, local_trajectories=False): + self.logger.info("Electrolyte builder started.") + + hosts = self.md_docs.query(self.query, ["hosts"]) + flow_ids = {doc["hosts"][-1] for doc in hosts} # top level flows + + job_groups = [] + for flow_id in flow_ids: + # the last item in hosts should be the top level workflow + host_match = {"$expr": {"$eq": [{"$arrayElemAt": ["$hosts", -1]}, flow_id]}} + job_groups.append(list(self.md_docs.query(criteria=host_match))) + + items = [] + for jobs in job_groups: + # find the job with the most calcs in the flow, presumably the last + len_calcs = [len(job["output"]["calcs_reversed"] or []) for job in jobs] + last_job = jobs[np.argmax(len_calcs)] + + insert_blobs( + self.blobs, last_job["output"], include_traj=not local_trajectories + ) + + items.append(last_job) + + return items + + def get_items_from_directories(self): + # query will be ignored + return + + def process_items( + self, + items, + local_trajectories: bool = False, + rebase_traj_path: Optional[tuple[Path, Path]] = None, + **benchmarking_kwargs, + ): + self.logger.info(f"Processing {len(items)} materials for electrolyte builder.") + + processed_items = [] + for item in items: + # create task_doc + task_doc = OpenMMTaskDocument.parse_obj(item["output"]) + + # _ is needed bc traj_path may be a tmpfile and a reference must be in scope + traj_path, _ = resolve_traj_path( + task_doc, local_trajectories, rebase_traj_path + ) + + u = task_doc_to_universe(task_doc, traj_path) + + benchmarking_doc = SolventBenchmarkingDoc.from_universe( + u, + temperature=task_doc.calcs_reversed[0].input.temperature, + density=task_doc.calcs_reversed[0].output.density[-1], + job_uuid=item["uuid"], + flow_uuid=item["hosts"][-1], + tags=task_doc.tags, + **benchmarking_kwargs, + ) + + del u + + docs = { + "benchmarking": jsanitize(benchmarking_doc.model_dump()), + } + + processed_items.append(docs) + + return processed_items + + def update_targets(self, items: List): + if len(items) > 0: + self.logger.info(f"Found {len(items)} electrolyte docs to update.") + + calculations = [item["benchmarking"] for item in items] + self.benchmarking.update(calculations) + + else: + self.logger.info("No items to update.") + + def instantiate_universe( + self, + job_uuid: str, + traj_directory: Union[str, Path] = ".", + overwrite_local_traj: bool = True, + ): + """ + Instantiate a MDAnalysis universe from a task document. + + This is useful if you want to analyze a small number of systems + without running the whole build pipeline. + + Args: + job_uuid: str + The UUID of the job. + traj_directory: str + Name of the DCD file to write. + overwrite_local_traj: bool + Whether to overwrite the local trajectory if it exists. + """ + return instantiate_universe( + self.md_docs, self.blobs, job_uuid, traj_directory, overwrite_local_traj + ) diff --git a/emmet-builders/emmet/builders/openmm/openmm_utils.py b/emmet-builders/emmet/builders/openmm/openmm_utils.py new file mode 100644 index 0000000000..0da25d12b4 --- /dev/null +++ b/emmet-builders/emmet/builders/openmm/openmm_utils.py @@ -0,0 +1,130 @@ +from typing import Union +from pathlib import Path + +from openff.interchange import Interchange +from maggma.core import Store + +from emmet.core.openmm import OpenMMTaskDocument, OpenMMInterchange +from emmet.builders.openmm.utils import create_universe + +import tempfile + + +def insert_blobs(blobs_store: Store, task_doc: dict, include_traj: bool = True): + """Insert blobs into a task document.""" + interchange_uuid = task_doc["interchange"]["blob_uuid"] + interchange_blob = blobs_store.query_one({"blob_uuid": interchange_uuid}) + task_doc["interchange"] = interchange_blob["data"] + + if not task_doc["calcs_reversed"] or len(task_doc["calcs_reversed"]) == 0: + raise ValueError("No calculations found in job output.") + + for calc in task_doc["calcs_reversed"]: + if not include_traj: + calc["output"]["traj_blob"] = None + + traj_blob = calc["output"]["traj_blob"] + + if traj_blob: + traj_uuid = calc["output"]["traj_blob"]["blob_uuid"] + traj_blob = blobs_store.query_one({"blob_uuid": traj_uuid}) + calc["output"]["traj_blob"] = traj_blob["data"] + + +def instantiate_universe( + md_docs: Store, + blobs: Store, + job_uuid: str, + traj_directory: Union[str, Path] = ".", + overwrite_local_traj: bool = True, +): + """ + Instantiate a MDAnalysis universe from a task document. + + This is useful if you want to analyze a small number of systems + without running the whole build pipeline. + + Args: + md_docs: Store + The store containing the task documents. + blobs: Store + The store containing the blobs. + job_uuid: str + The UUID of the job. + traj_directory: str + Name of the DCD file to write. + overwrite_local_traj: bool + Whether to overwrite the local trajectory if it exists. + """ + + # pull job + docs = list(md_docs.query(criteria={"uuid": job_uuid})) + if len(docs) != 1: + raise ValueError( + f"The job_uuid, {job_uuid}, must be unique. Found {len(docs)} documents." + ) + task_doc = docs[0]["output"] + traj_file_type = task_doc["calcs_reversed"][0]["input"]["traj_file_type"] + + # define path to trajectory + traj_directory = Path(traj_directory) + traj_directory.mkdir(parents=True, exist_ok=True) + traj_path = traj_directory / f"{job_uuid}.{traj_file_type}" + + # download and insert blobs if necessary + # they must be included before task_doc is instantiated + new_traj = not traj_path.exists() or overwrite_local_traj + insert_blobs(blobs, task_doc, include_traj=new_traj) + task_doc = OpenMMTaskDocument.parse_obj(task_doc) + if new_traj: + with open(traj_path, "wb") as f: + f.write(task_doc.calcs_reversed[0].output.traj_blob) + + # create interchange + interchange_str = task_doc.interchange.decode("utf-8") + interchange = Interchange.parse_raw(interchange_str) + + return create_universe( + interchange, + task_doc.interchange_meta, + traj_path, + traj_format=traj_file_type, + ) + + +def resolve_traj_path(task_doc, local_trajectories, rebase_traj_path): + calc = task_doc.calcs_reversed[0] + + if local_trajectories: + traj_file = calc.output.traj_file + traj_path = Path(calc.output.dir_name) / traj_file + if rebase_traj_path: + old, new = rebase_traj_path + traj_path = new / traj_path.relative_to(old) + else: + traj_file = tempfile.NamedTemporaryFile() + traj_path = Path(traj_file.name) + with open(traj_path, "wb") as f: + if calc.output.traj_blob is None: + raise ValueError("No trajectory blob included in the task doc.") + f.write(calc.output.traj_blob) + return traj_path, traj_file + + +def task_doc_to_universe(task_doc, traj_path): + calc = task_doc.calcs_reversed[0] + + # create interchange + interchange_str = task_doc.interchange.decode("utf-8") + try: + interchange = Interchange.parse_raw(interchange_str) + except: # noqa: E722 + # parse with openmm instead + interchange = OpenMMInterchange.parse_raw(interchange_str) + + return create_universe( + interchange, + task_doc.interchange_meta, + traj_path, + traj_format=calc.input.traj_file_type, + ) diff --git a/emmet-builders/emmet/builders/openmm/utils.py b/emmet-builders/emmet/builders/openmm/utils.py new file mode 100644 index 0000000000..08a0e54dc1 --- /dev/null +++ b/emmet-builders/emmet/builders/openmm/utils.py @@ -0,0 +1,341 @@ +import warnings +from typing import Optional, Union +from pathlib import Path +import io + +import numpy as np +from MDAnalysis import Universe +from solvation_analysis.solute import Solute + +from openmm.app.pdbfile import PDBFile + +from openff.interchange import Interchange +import openff.toolkit as tk + +from maggma.core import Store + +from emmet.core.openff import MoleculeSpec +from emmet.core.openmm import OpenMMTaskDocument, OpenMMInterchange + + +def create_universe( + interchange: Interchange | OpenMMInterchange, + mol_specs: Optional[list[MoleculeSpec]], + traj_file: Union[Path, str], + traj_format=None, +): + """ + Create a Universe object from an Interchange object and a trajectory file. + + Parameters + ---------- + interchange : Interchange + The Interchange object containing the topology and parameters. + mol_specs : list[MoleculeSpec] or None + A list of MoleculeSpec objects or None. + traj_file : Path or str + The path to the trajectory file. + traj_format : str, optional + The format of the trajectory file. + + Returns + ------- + Universe + The created Universe object. + """ + if isinstance(interchange, Interchange): + openff_topology = interchange.topology + openmm_topology = openff_topology.to_openmm() + else: + with io.StringIO(interchange.topology) as s: + pdb = PDBFile(s) + openmm_topology = pdb.getTopology() + if mol_specs is None: + raise ValueError( + "Molecule specs must be provided if using OpenMMInterchange." + ) + unique_molecules = [ + tk.Molecule.from_json(spec.openff_mol) for spec in mol_specs + ] + openff_topology = tk.Topology.from_openmm( + openmm_topology, + unique_molecules=unique_molecules, + ) + + mols = [mol for mol in openff_topology.molecules] + + u = Universe( + openmm_topology, + str(traj_file), + format=traj_format, + ) + + label_types(u, mols) + label_resnames(u, mols, mol_specs) + label_charges(u, mols, mol_specs) + + return u + + +def label_types(u: Universe, mols: list[tk.Molecule]): + """ + Label atoms in the Universe with unique types based on the molecules. + + Parameters + ---------- + u : Universe + The Universe object to label. + mols : list[tk.Molecule] + The list of Molecule objects. + """ + # add unique counts for each + offset = 0 + mol_types = {} + for mol in set(mols): + mol_types[mol] = range(offset, offset + mol.n_atoms) + offset += mol.n_atoms + all_types = np.concatenate([mol_types[mol] for mol in mols]) + u.add_TopologyAttr("types", all_types) + + +def label_resnames( + u: Universe, mols: list[tk.Molecule], mol_specs: Optional[list[MoleculeSpec]] +): + """ + Label atoms in the Universe with residue names. + + Parameters + ---------- + u : Universe + The Universe object to label. + mols : list[tk.Molecule] + The list of Molecule objects. + mol_specs : list[MoleculeSpec] or None + A list of MoleculeSpec objects or None. + """ + if mol_specs: + resname_list = [[spec.name] * spec.count for spec in mol_specs] + resnames = np.concatenate(resname_list) + else: + resname_list = [mol.to_smiles() for mol in mols] + resnames = np.array(resname_list) + u.add_TopologyAttr("resnames", resnames) + + +def label_charges( + u: Universe, mols: list[tk.Molecule], mol_specs: Optional[list[MoleculeSpec]] +): + """ + Label atoms in the Universe with partial charges. + + Parameters + ---------- + u : Universe + The Universe object to label. + mols : list[tk.Molecule] + The list of Molecule objects. + mol_specs : list[MoleculeSpec] + A list of MoleculeSpec objects. + """ + charge_arrays = [] + if mol_specs: + for spec in mol_specs: + mol = tk.Molecule.from_json(spec.openff_mol) + charge_arr = np.tile(mol.partial_charges / spec.charge_scaling, spec.count) + charge_arrays.append(charge_arr) + else: + warnings.warn( + "`mol_specs` are not present so charges cannot be unscaled. " + "If charges were scaled, conductivity calculations will be inaccurate." + ) + for mol in mols: + charge_arrays.append(mol.partial_charges) + charges = np.concatenate(charge_arrays).magnitude + u.add_TopologyAttr("charges", charges) + + +def create_solute( + u: Universe, + solute_name: str, + networking_solvents: Optional[list[str]] = None, + fallback_radius: Optional[float] = None, + include_solute_in_solvents=False, + analysis_classes=["coordination", "pairing", "speciation", "networking"], + step=1, +): + """ + Create a Solute object from a Universe object. + + Parameters + ---------- + u : Universe + The Universe object containing the solute and solvent atoms. + solute_name : str + The residue name of the solute. + networking_solvents : list[str] or None, optional + A list of residue names of networking solvents or None. + fallback_radius : float or None, optional + The fallback radius for kernel calculations or None. + include_solute_in_solvents : bool, optional + Whether to include the solute in the solvents dictionary. Default is False. + analysis_classes : list[str], optional + The analysis classes to run. Default is ("coordination", "pairing", "speciation", "networking"). + step : int, optional + The step size for the analysis. Default is 1. + + Returns + ------- + Solute + The created Solute object. + """ + solute = u.residues[u.residues.resnames == solute_name].atoms + + unique_resnames = np.unique(u.atoms.residues.resnames) + + solvents = {} + for resname in unique_resnames: + atoms = u.residues[u.residues.resnames == resname].atoms + solvents[resname] = atoms + + if not include_solute_in_solvents: + solvents.pop(solute_name, None) + + solute = Solute.from_atoms( + solute, + solvents, + solute_name=solute_name, + analysis_classes=analysis_classes, + networking_solvents=networking_solvents, + kernel_kwargs={"default": fallback_radius}, + ) + solute.run(step=step) + return solute + + +def identify_solute(u: Universe): + """ + Identify the solute in a Universe object. + + Currently just finds the name of a sinlge cation based on the + partial charges in the universe. + + Parameters + ---------- + u : Universe + The Universe object + + Returns + ------- + str + The residue name of the solute. + """ + cation_residues = u.residues[u.residues.charges > 0.01] + unique_names = np.unique(cation_residues.resnames) + if len(unique_names) > 1: + # TODO: fail gracefully? + raise ValueError("Multiple cationic species detected, not yet supported.") + return unique_names[0] + + +def identify_networking_solvents(u: Universe): + """ + Identify the networking solvents in a Universe object. + + Currently just finds the name of all anions based on the + partial charges in the universe. + + Parameters + ---------- + u : Universe + The Universe object + + Returns + ------- + list[str] + The residue names of the networking solvents. + """ + # currently just anions + anion_residues = u.residues[u.residues.charges < -0.01] + unique_names = np.unique(anion_residues.resnames) + return list(unique_names) + + +def insert_blobs(blobs_store: Store, task_doc: dict, include_traj: bool = True): + """Insert blobs into a task document.""" + interchange_uuid = task_doc["interchange"]["blob_uuid"] + interchange_blob = blobs_store.query_one({"blob_uuid": interchange_uuid}) + task_doc["interchange"] = interchange_blob["data"] + + if len(task_doc["calcs_reversed"]) == 0: + raise ValueError("No calculations found in job output.") + + for calc in task_doc["calcs_reversed"]: + if not include_traj: + calc["output"]["traj_blob"] = None + + traj_blob = calc["output"]["traj_blob"] + + if traj_blob: + traj_uuid = calc["output"]["traj_blob"]["blob_uuid"] + traj_blob = blobs_store.query_one({"blob_uuid": traj_uuid}) + calc["output"]["traj_blob"] = traj_blob["data"] + + +def instantiate_universe( + md_docs: Store, + blobs: Store, + job_uuid: str, + traj_directory: Union[str, Path] = ".", + overwrite_local_traj: bool = True, +): + """ + Instantiate a MDAnalysis universe from a task document. + + This is useful if you want to analyze a small number of systems + without running the whole build pipeline. + + Args: + md_docs: Store + The store containing the task documents. + blobs: Store + The store containing the blobs. + job_uuid: str + The UUID of the job. + traj_directory: str + Name of the DCD file to write. + overwrite_local_traj: bool + Whether to overwrite the local trajectory if it exists. + """ + + # pull job + docs = list(md_docs.query(criteria={"uuid": job_uuid})) + if len(docs) != 1: + raise ValueError( + f"The job_uuid, {job_uuid}, must be unique. Found {len(docs)} documents." + ) + task_doc = docs[0]["output"] + traj_file_type = task_doc["calcs_reversed"][0]["input"]["traj_file_type"] + + # define path to trajectory + traj_directory = Path(traj_directory) + traj_directory.mkdir(parents=True, exist_ok=True) + traj_path = traj_directory / f"{job_uuid}.{traj_file_type}" + + # download and insert blobs if necessary + new_traj = not traj_path.exists() or overwrite_local_traj + insert_blobs(blobs, task_doc, include_traj=new_traj) + task_doc = OpenMMTaskDocument.parse_obj(task_doc) + if new_traj: + with open(traj_path, "wb") as f: + f.write(task_doc.calcs_reversed[0].output.traj_blob) + + # create interchange + interchange_str = task_doc.interchange.decode("utf-8") + interchange = Interchange.parse_raw(interchange_str) + + return create_universe( + interchange, + task_doc.interchange_meta, + traj_path, + traj_format=traj_file_type, + )