Skip to content

Commit

Permalink
[GSoC] Parallelisation of AnalysisBase with multiprocessing and dask (#…
Browse files Browse the repository at this point in the history
…4162)

* add parallelisation to AnalysisBase
* fixes #4158 

DETAILED COMMENTS FROM COMMITS

* Remove _scheduler attribute and make dask-based tests run properly

* Refactor scheduler usage

* Add multiple workers in dask for testing

* Refactor _setup_bslices and add processes to dask scheduler kwargs

* Create frame_indices and trajectory for each bslice during _setup_bslices

* Use explicit initialisation of timeseries wiith zeros

* Add non-trivial _parallel_conclude function

* Fix tests for new dask fixture

* Add type-matching _parallel_conclude

* Add fixtures to test combinations of dask and multiprocessing

* dask and multiprocessing works in test_atomicdistances.py

* Fix bug in results is np.ndarray codepath

* Add _setup_scheduler raising NotImplemented error  in align.py::AverageStructure

* dask and multiprocessing schedulers to test_align.py

* dask scheduler for test_contacts.py and test for incompatibility with multiprocessing

* dask and multiprocessing scheduler for test_density.py

* Add _parallel_conclude implementation for dielectric

* dask and multiprocessing schedulers for test_dielectric.py

* dask and multiprocessing schedulers for test_diffusionmap.py

* Add NotImplementedError for parallel schedulers in dihedrals.py

* only current scheduler for test_dihedrals.py

* dask and multiprocessing tests for test_encore.py -- but some fail because of RMSF module

* Add NotImplementedError for _setup_scheduler in gnm.py

* Add NotImplementedError for _setup_scheduler in helix_analysis.py

* current process scheduler for test_helix_analysis.py

* dask and multiprocessing schedulers for test_hole2.py

* Add NotImplementedError in for not-None schedulers

* current process scheduler and test for failing non-current ones in test_hydrogendbonds_analysis.py

* current process only scheduler and failing test for others in test_lineardensity.py

* Add NotImplementedError for non-current process schedulers

* current process scheduler only and failing tests for non-current ones in test_msd.py

* Add NotImplementedError for non-current process schedulers

* Fix scope of fixtures

* Add NotImplemented error for all non-current process schedulers

* only current process scheduler and failing tests for test_nucleicacids.py

* dask and multiprocessing schedulers for test_persistentlength.py

* Add _parallel_conclude implementation

* dask and multiprocessing schedulers for test_psa.py

* Add _parallel_conclude implementation for RDF and RDF_S

* dask and multiprocessing schedulers for test_rdf_s.py

* dask and multiprocessing schedulers for test_rdf.py

* Add NotImplementedError for RMSD and RMSF classes

* only local process scheduler and failing tests for others for test_rms.py

* current process scheduler only and failing test for others for test_wbridge.py

* Add NotImplementedError in _setup_scheduler

* Add more clear message during exception

* Add timeseries aggregation function

* dask and multiprocessing scheduler for most of the test_base.py testcases

* dask and multiprocessing schedulers for test_rms.py::TestRMSD

* Add NotImplementedError for pca and rms

* dask and multiprocessing schedulers for test_bat

* dcurrent process scheduler for test_gnm.py

* dcurrent process scheduler for test_pca.py

* Fix rmsf-related scheduler usage to only current process scheduler

* remove fixme marks

* Switch to enumerate in _compute main loop and fix code review comments

* Add dask to CI setup actions

* Remove local scheduler for progressbar test

* Add installation with dask as  asetup option

* fix hole2 tests for -- implement only current scheduler and add failing test

* fix progressbar test by changing order of ProgressBar and enumerate

* use only frame indices and frames in _setup_bslices after writing a blogpost

* Refactor _setup_bslices: move enumerate to numpy and fuse logic in defining type of input

* Add documentation to AnalysisBase._parallel_conclude()

* add functional-like interface draft

* Implement proper Client class, separating computations from AnalysisBase

* FINALLY implement working one-time dask cluster setup in kwargs of a client

* Correct tests accordingly

* Separately process case of only one remote worker

* Add available_schedulers to AverageStructure

* Use automatic fixture for AverageStructure

* Add fixture for AverageStructure

* Add fixture for AtomicDistances

* Change default available_backends to all implemented in Client

* Limit available backends for AverageStructure

* Add fixture for BAT

* Add fixture tests to Contacts

* Fix n_workers check and boolean frames handling

* Fix performance of backend="dask"

* Add available_backends for Contacts

* Remove _setup_scheduler

* Use client fixture for Contacts

* Use client fixture for RMSD/RMSF

* Revert files to their state in develop

* Delete files_for_undoing_changes.txt

* Delete conftest.py

* Delete parallel_analysis_demo.ipynb

* Clean up notebook

* Limit available schedulers in RMSF

* Split test in two due to failing with "expectation" parametrization

* Add fixture generator and fixtures for test_base and test_rms

* Add dask to pyproject.toml

* Return computation groups explicitly

* Fix dask position in setup-deps/action.yaml

* Add dask[distributed] to mdanalysis[parallel] installation

* Undo autoformatter

* Manually define available_backends for RMSD class

* Create separate "parallel" entry

* Add is_installed function to utils

* Add dict-based validatdion and computation logic for ParallelExecutor

* Add tests for ParallelExecutor

* Add documentation for "apply" method of ParallelExecutor

* Correct dask.distributed name

* Use chunksize=1 instead of explicit Pool in _compute_with_dask

* Remove unnecessary function in conftest

* Fix function to retrieve dask client if dask is not installed

* Fix base tests when dask is not installed

* Use new LocalCluster every time

* Fix client/backend logic

* Add documentation to a silly square function

* Switch to package-wise autouse fixture for dask.distributed.Client

* Add explicit result() when computing with cluster

* Fix codereview

* Replace list with tuple in available_backends for RMSD

* Remove unnecessary get_running_dask_client

* Implement fixture injection for subclasses testing

* Add warnings filters

* Fix backend check when client is present

* Return get_runnning_dask_client function

* Change dask fixture scope

* Close LocalCluster to avoid trillions of logs

* Implement ResultsGroup based aggregation instead of type matching

* Add non-default _get_aggregator() to RMS and Base classes

* Mark test_multiprocessing.py::test_creating_multiple_universe_without_offset as skipped

* Restore failing test

* Make aggregation functions static methods of ResultsGroup

* Remove test skip

* Move parallel part into a separate file

* Fix imports

* Proof of concept for duck-typed backends

* Remove unused code

* Replace ParallelExecutor with multiple backend classes and add duck-typing backend in AnalysisBase.run()

* Add all tests for analysis/parallel.py and fix bug in ResultsGroup.ndarray_mean

* Change typing to py3.9 compatible syntax

* Add _is_parallelizable to AnalysisFromFunction

* Remove dask[distributed] even as an optional dependency

* Update documentation

* Remove function to get running dask client

* Remove unused code from analysis/conftest.py

* Fix documentation and minor issues from codereview

* Update package/MDAnalysis/analysis/rms.py

Co-authored-by: Irfan Alibay <[email protected]>

* Add more backend validation tests and fix autoformatter issues

* Start implementing correct result sizes in separate computation groups

* Continue working: diffusionmap and PCA tests fail

* Fix bug in PCA trajectory iteration -- avoid explicit usage of self.start

* update changelog and tests for PCA fix

* Fix diffusionmap and pca

* Make sure not to reset self.{start,stop,step} during self._compute

* Change iteration pattern to sliced trajectory

* Change iteration pattern to sliced trajectory

* Update package/MDAnalysis/analysis/parallel.py

Co-authored-by: Yuxuan Zhuang <[email protected]>

* Apply suggestions from code review

Co-authored-by: Rocco Meli <[email protected]>
Co-authored-by: Yuxuan Zhuang <[email protected]>

* Split _setup_frames into two separate functions

* Add docstrings for _prepare_sliced_trajectory and _define_run_frames

* Remove dask-distributed from dependencies

* Test only 2 processors with parallelizable backends

* Rename available_backends and safe

* Apply codereview changes

* Make tests for AnalysisBase subclasses explicit

* Exclude "multiprocessing" from analysis_class function available backends

* Split parallel.py into results.py and parallel.py

* Finalize separation of results and backends

* Rename parallel.py to backends.py

* Add results and backends to analysis/__init__.py

* Fix pep8 errors in docstrings and code

* Add versionadded to documentation

* Update sphinx documentation with backends and results

* Add parallelization reference to base.py

* Switch to relative imports

* Update documentation, adding introduced changes

* Update documentation adding parallelization support for rms

* Add module documentation to results and backends

* Fix BackendSerial validation and add its tests

* Fix calling of self._is_paralellizable()

* Add tests on is_parallelizable and get_supported_backends

* Fix bug with default progressbar_kwargs being dict

* Apply suggestions from code review

Co-authored-by: Rocco Meli <[email protected]>
Co-authored-by: Yuxuan Zhuang <[email protected]>

* Add docstrings to apply() in backends

* Add double n_worker check

* Apply suggestions from code review

Co-authored-by: Paul Smith <[email protected]>

* Fix hasattr in double n_worker check

* Revert test `with expectation` in test_align

* Update testsuite/MDAnalysisTests/analysis/test_pca.py

Co-authored-by: Irfan Alibay <[email protected]>

* Update package/MDAnalysis/lib/util.py

Co-authored-by: Irfan Alibay <[email protected]>

* Update changelog

* Apply suggestions from code review

* Add parallelization section to the documentation

* Fix versionadded in new classes

* Finish parallelization section for documentation

* Fix typos

* Apply suggestions from code review

Co-authored-by: Rocco Meli <[email protected]>

* Apply suggestions from code review

Co-authored-by: Rocco Meli <[email protected]>

* Refactor TreadsBackend example and add a warning

* Add n_workers instantiation from backend argument

* Update package/MDAnalysis/analysis/backends.py

Co-authored-by: Yuxuan Zhuang <[email protected]>

* Update package/doc/sphinx/source/documentation_pages/analysis/parallelization.rst

Co-authored-by: Yuxuan Zhuang <[email protected]>

* Add remark about RMSF parallelization

* Apply suggestions from codereview

* Apply suggestions from code review

* Fix documentation typo

* Update dask installation test after exception text changed

* edited documentation for parallelization

- add reST/sphinx markup for methods and classes and ensure that (most of them)
  resolve; add intersphinx mapping to dask docs
- added cross referencing between parallelization and backends docs
- restructured analysis landing page with additional numbered headings for
  general use and parallelization
- add citation for PMDA
- fixed links
- edited text for flow and readability
- added SeeAlsos (eg for User Guide)
- added notes/warnings

* analysis top level docs fixes

- mark analysis docs as documenting MDAnalysis.analysis so that references resolve
  properly
- link fixes

* Added comments regarding `_is_parallelizable` (and fixed documentation), fixed tests for `is_installed`

* Rename AnalysisBase.parallelizable and fix parallelizable transformations

* Remove explicit parallelizable=True in NoJump test call

* Apply suggestions from code review

* add explicit comment to AnalysisBase._analysis_algorithm_is_parallelizable

* Add client_RMSD explanation

* versioninformation markup fix in base.py

* Apply suggestions from code review

Co-authored-by: Irfan Alibay <[email protected]>
Co-authored-by: Rocco Meli <[email protected]>

* Apply suggestions from code review

Co-authored-by: Irfan Alibay <[email protected]>
Co-authored-by: Rocco Meli <[email protected]>

* Add comments explaining client_... fixtures

* Move class properties to the top of the class

* Undo accidental versionadded change

* Remove duplicating versionadded

* Add versionadded for backend

* Add link to github profile

* Update package/doc/sphinx/source/documentation_pages/analysis/parallelization.rst

Co-authored-by: Irfan Alibay <[email protected]>

* Update testsuite/MDAnalysisTests/analysis/test_backends.py

Co-authored-by: Rocco Meli <[email protected]>

* minor text fixes

* Update package/MDAnalysis/analysis/base.py

Co-authored-by: Oliver Beckstein <[email protected]>

* Update package/MDAnalysis/analysis/base.py

Co-authored-by: Oliver Beckstein <[email protected]>

* Remove issubclass check

---------

Co-authored-by: Egor Marin <[email protected]>
Co-authored-by: Egor Marin <[email protected]>
Co-authored-by: Irfan Alibay <[email protected]>
Co-authored-by: Yuxuan Zhuang <[email protected]>
Co-authored-by: Rocco Meli <[email protected]>
Co-authored-by: Paul Smith <[email protected]>
Co-authored-by: Yuxuan Zhuang <[email protected]>
Co-authored-by: Oliver Beckstein <[email protected]>
  • Loading branch information
9 people committed Aug 16, 2024
1 parent f6a2c29 commit 481e36a
Show file tree
Hide file tree
Showing 25 changed files with 2,373 additions and 362 deletions.
3 changes: 3 additions & 0 deletions .github/actions/setup-deps/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ inputs:
default: 'chemfiles-python>=0.9'
clustalw:
default: 'clustalw=2.1'
dask:
default: 'dask'
distopia:
default: 'distopia>=0.2.0'
h5py:
Expand Down Expand Up @@ -134,6 +136,7 @@ runs:
${{ inputs.biopython }}
${{ inputs.chemfiles-python }}
${{ inputs.clustalw }}
${{ inputs.dask }}
${{ inputs.distopia }}
${{ inputs.gsd }}
${{ inputs.h5py }}
Expand Down
2 changes: 2 additions & 0 deletions package/CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Fixes
* Fix groups.py doctests using sphinx directives (Issue #3925, PR #4374)

Enhancements
* Introduce parallelization API to `AnalysisBase` and to `analysis.rms.RMSD` class
(Issue #4158, PR #4304)
* Improve error message for `AtomGroup.unwrap()` when bonds are not present.(Issue #4436, PR #4642)
* Add `analysis.DSSP` module for protein secondary structure assignment, based on [pydssp](https://github.com/ShintaroMinami/PyDSSP)
* Added a tqdm progress bar for `MDAnalysis.analysis.pca.PCA.transform()`
Expand Down
2 changes: 2 additions & 0 deletions package/MDAnalysis/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

__all__ = [
'align',
'backends',
'base',
'contacts',
'density',
Expand All @@ -45,6 +46,7 @@
'pca',
'psa',
'rdf',
'results',
'rms',
'waterdynamics',
]
333 changes: 333 additions & 0 deletions package/MDAnalysis/analysis/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
"""Analysis backends --- :mod:`MDAnalysis.analysis.backends`
============================================================
.. versionadded:: 2.8.0
The :mod:`backends` module provides :class:`BackendBase` base class to
implement custom execution backends for
:meth:`MDAnalysis.analysis.base.AnalysisBase.run` and its
subclasses.
.. SeeAlso:: :ref:`parallel-analysis`
.. _backends:
Backends
--------
Three built-in backend classes are provided:
* *serial*: :class:`BackendSerial`, that is equivalent to using no
parallelization and is the default
* *multiprocessing*: :class:`BackendMultiprocessing` that supports
parallelization via standard Python :mod:`multiprocessing` module
and uses default :mod:`pickle` serialization
* *dask*: :class:`BackendDask`, that uses the same process-based
parallelization as :class:`BackendMultiprocessing`, but different
serialization algorithm via `dask <https://dask.org/>`_ (see `dask
serialization algorithms
<https://distributed.dask.org/en/latest/serialization.html>`_ for details)
Classes
-------
"""
import warnings
from typing import Callable
from MDAnalysis.lib.util import is_installed


class BackendBase:
"""Base class for backend implementation.
Initializes an instance and performs checks for its validity, such as
``n_workers`` and possibly other ones.
Parameters
----------
n_workers : int
number of workers (usually, processes) over which the work is split
Examples
--------
.. code-block:: python
from MDAnalysis.analysis.backends import BackendBase
class ThreadsBackend(BackendBase):
def apply(self, func, computations):
from multiprocessing.dummy import Pool
with Pool(processes=self.n_workers) as pool:
results = pool.map(func, computations)
return results
import MDAnalysis as mda
from MDAnalysis.tests.datafiles import PSF, DCD
from MDAnalysis.analysis.rms import RMSD
u = mda.Universe(PSF, DCD)
ref = mda.Universe(PSF, DCD)
R = RMSD(u, ref)
n_workers = 2
backend = ThreadsBackend(n_workers=n_workers)
R.run(backend=backend, unsupported_backend=True)
.. warning::
Using `ThreadsBackend` above will lead to erroneous results, since it
is an educational example. Do not use it for real analysis.
.. versionadded:: 2.8.0
"""

def __init__(self, n_workers: int):
self.n_workers = n_workers
self._validate()

def _get_checks(self):
"""Get dictionary with ``condition: error_message`` pairs that ensure the
validity of the backend instance
Returns
-------
dict
dictionary with ``condition: error_message`` pairs that will get
checked during ``_validate()`` run
"""
return {
isinstance(self.n_workers, int) and self.n_workers > 0:
f"n_workers should be positive integer, got {self.n_workers=}",
}

def _get_warnings(self):
"""Get dictionary with ``condition: warning_message`` pairs that ensure
the good usage of the backend instance
Returns
-------
dict
dictionary with ``condition: warning_message`` pairs that will get
checked during ``_validate()`` run
"""
return dict()

def _validate(self):
"""Check correctness (e.g. ``dask`` is installed if using ``backend='dask'``)
and good usage (e.g. ``n_workers=1`` if backend is serial) of the backend
Raises
------
ValueError
if one of the conditions in :meth:`_get_checks` is ``True``
"""
for check, msg in self._get_checks().items():
if not check:
raise ValueError(msg)
for check, msg in self._get_warnings().items():
if not check:
warnings.warn(msg)

def apply(self, func: Callable, computations: list) -> list:
"""map function `func` to all tasks in the `computations` list
Main method that will get called when using an instance of
``BackendBase``. It is equivalent to running ``[func(item) for item in
computations]`` while using the parallel backend capabilities.
Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to
Returns
-------
list
list of results of the function
"""
raise NotImplementedError


class BackendSerial(BackendBase):
"""A built-in backend that does serial execution of the function, without any
parallelization.
Parameters
----------
n_workers : int
Is ignored in this class, and if ``n_workers`` > 1, a warning will be
given.
.. versionadded:: 2.8.0
"""

def _get_warnings(self):
"""Get dictionary with ``condition: warning_message`` pairs that ensure
the good usage of the backend instance. Here, it checks if the number
of workers is not 1, otherwise gives warning.
Returns
-------
dict
dictionary with ``condition: warning_message`` pairs that will get
checked during ``_validate()`` run
"""
return {
self.n_workers == 1:
"n_workers is ignored when executing with backend='serial'"
}

def apply(self, func: Callable, computations: list) -> list:
"""
Serially applies `func` to each task object in ``computations``.
Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to
Returns
-------
list
list of results of the function
"""
return [func(task) for task in computations]


class BackendMultiprocessing(BackendBase):
"""A built-in backend that executes a given function using the
:meth:`multiprocessing.Pool.map <multiprocessing.pool.Pool.map>` method.
Parameters
----------
n_workers : int
number of processes in :class:`multiprocessing.Pool
<multiprocessing.pool.Pool>` to distribute the workload
between. Must be a positive integer.
Examples
--------
.. code-block:: python
from MDAnalysis.analysis.backends import BackendMultiprocessing
import multiprocessing as mp
backend_obj = BackendMultiprocessing(n_workers=mp.cpu_count())
.. versionadded:: 2.8.0
"""

def apply(self, func: Callable, computations: list) -> list:
"""Applies `func` to each object in ``computations`` using `multiprocessing`'s `Pool.map`.
Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to
Returns
-------
list
list of results of the function
"""
from multiprocessing import Pool

with Pool(processes=self.n_workers) as pool:
results = pool.map(func, computations)
return results


class BackendDask(BackendBase):
"""A built-in backend that executes a given function with *dask*.
Execution is performed with the :func:`dask.compute` function of
:class:`dask.delayed.Delayed` object (created with
:func:`dask.delayed.delayed`) with ``scheduler='processes'`` and
``chunksize=1`` (this ensures uniform distribution of tasks among
processes). Requires the `dask package <https://docs.dask.org/en/stable/>`_
to be `installed <https://docs.dask.org/en/stable/install.html>`_.
Parameters
----------
n_workers : int
number of processes in to distribute the workload
between. Must be a positive integer. Workers are actually
:class:`multiprocessing.pool.Pool` processes, but they use a different and
more flexible `serialization protocol
<https://docs.dask.org/en/stable/phases-of-computation.html#graph-serialization>`_.
Examples
--------
.. code-block:: python
from MDAnalysis.analysis.backends import BackendDask
import multiprocessing as mp
backend_obj = BackendDask(n_workers=mp.cpu_count())
.. versionadded:: 2.8.0
"""

def apply(self, func: Callable, computations: list) -> list:
"""Applies `func` to each object in ``computations``.
Parameters
----------
func : Callable
function to be called on each of the tasks in computations list
computations : list
computation tasks to apply function to
Returns
-------
list
list of results of the function
"""
from dask.delayed import delayed
import dask

computations = [delayed(func)(task) for task in computations]
results = dask.compute(computations,
scheduler="processes",
chunksize=1,
num_workers=self.n_workers)[0]
return results

def _get_checks(self):
"""Get dictionary with ``condition: error_message`` pairs that ensure the
validity of the backend instance. Here checks if ``dask`` module is
installed in the environment.
Returns
-------
dict
dictionary with ``condition: error_message`` pairs that will get
checked during ``_validate()`` run
"""
base_checks = super()._get_checks()
checks = {
is_installed("dask"):
("module 'dask' is missing. Please install 'dask': "
"https://docs.dask.org/en/stable/install.html")
}
return base_checks | checks
Loading

0 comments on commit 481e36a

Please sign in to comment.