Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ channels:
dependencies:
- click
- MDAnalysis
- MDAnalysisTests
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this to the test dependencies in the pyproject.toml.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

- netCDF4
- openff-units
- pip
Expand Down
274 changes: 179 additions & 95 deletions src/openfe_analysis/rmsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import MDAnalysis as mda
import netCDF4 as nc
import numpy as np
import tqdm
from MDAnalysis.analysis import rms
from MDAnalysis.analysis import diffusionmap, rms
from MDAnalysis.analysis.base import AnalysisBase
from MDAnalysis.transformations import unwrap
from numpy import typing as npt

from .reader import FEReader
from .transformations import Aligner, ClosestImageShift, NoJump
Expand Down Expand Up @@ -100,8 +99,141 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers
return u


class Protein2DRMSD(AnalysisBase):
"""
Flattened 2D RMSD matrix

For all unique frame pairs ``(i, j)`` with ``i < j``, this function
computes the RMSD between atomic coordinates after optimal alignment.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe expand this to mention you're doing a center of geometry fit as well as a rotational and translational superposition usingg QCP?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this!

"""

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly define _analysis_algorithm_is_parallelizable = False (it's inherited by default, but it would be good to have it explicitly defined here) in these classes and then raise an issue about looking into parallism?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this and also opened an issue.

def __init__(self, atomgroup, weights=None, **kwargs):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and everywhere else, add typing where possible?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

"""
Parameters
----------
atomgroup: AtomGroup
Protein atoms (e.g. CA selection)
weights: np.ndarray, optional
Per-atom weights to use in the RMSD calculation. If ``None``,
all atoms are weighted equally.
"""
super().__init__(atomgroup.universe.trajectory, **kwargs)
self._weights = weights
self._ag = atomgroup

def _prepare(self):
self._coords = []
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you pre-allocate numpy arrays here instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

self.results.rmsd2d = []

def _single_frame(self):
self._coords.append(self._ag.positions.copy())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is effectively the same as putting the whole trajectory into memory. I'm saying this as an FYI that it will be a possible failure mode when someone runs a really long simulation and then tries to use this class.

Might be good to document in the docstring notes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note in the doc string

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the copy necessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now we're doing the pre-allocation, the copy is not necessary any more (not fully sure if it was before, but I wanted to be safe =)


def _conclude(self):
positions = np.asarray(self._coords)
nframes = positions.shape[0]

output = []
for i, j in itertools.combinations(range(nframes), 2):
posi, posj = positions[i], positions[j]
pair_rmsd = rms.rmsd(
posi,
posj,
self._weights,
center=True,
superposition=True,
)
output.append(pair_rmsd)

self.results.rmsd2d = np.asarray(output)


class RMSDAnalysis(AnalysisBase):
"""
1D RMSD time series for an AtomGroup.

Parameters
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, try to make where you put parameters consistent between the docstrings of different classes. I prefer it in this location (since it reflects what MDAnalysis does), but Protein2DRMSD has it in the __init__.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved Parameters in the Protein2DRMSD to the class.

----------
atomgroup : MDAnalysis.AtomGroup
Atoms to compute RMSD for.
reference: Optional[MDAnalysis.AtomGroup]
Reference AtomGroup. If ``None``, the reference positions are captured
from the mobile AtomGroup at the start of the run (i.e. whatever frame
the trajectory is on when ``.run()`` is called).
mass_weighted : bool, optional
If True, compute mass-weighted RMSD.
superposition : bool, optional
If ``True``, perform rotational superposition before computing RMSD.
"""

def __init__(
self,
atomgroup,
reference=None,
mass_weighted=False,
superposition=False,
**kwargs,
):
super().__init__(atomgroup.universe.trajectory, **kwargs)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_analysis_algorithm_is_parallelizable

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remember to add.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this!

self._ag = atomgroup
self._reference = reference if reference is not None else self._ag
self._mass_weighted = mass_weighted
self._superposition = superposition

def _prepare(self):
self.results.rmsd = []

self._reference_pos = self._reference.positions.copy()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if you call .run(start=10), this will mean your reference is frame 10 not frame 0. This should probably be documented.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc string and also inline comment


if self._mass_weighted:
self._weights = self._ag.masses / np.mean(self._ag.masses)
else:
self._weights = None

def _single_frame(self):
frame_rmsd = rms.rmsd(
self._ag.positions,
self._reference_pos,
self._weights,
center=False,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document why this is False here but not in 2D RMSD? Are there situations where you would want this to be True? (i.e. should it be a kwarg option?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. I had done it this way just to match previous behaviour (I just restructured the code without changing it). I think in 2D RMSD it needs to do the superposition of the different frame pairs each time, however the centering argument doesn't seem to do anything when superposition is True (https://github.com/MDAnalysis/mdanalysis/blob/13e8664f62b536f5530f8498d7878cb61b3a0d23/package/MDAnalysis/analysis/rms.py#L264)?
For now I decided to expose the center argument in the RMSD function, leaving the defaults as is.

superposition=self._superposition,
)
self.results.rmsd.append(frame_rmsd)

def _conclude(self):
self.results.rmsd = np.asarray(self.results.rmsd)


class LigandCOMDrift(AnalysisBase):
"""
Ligand center-of-mass displacement from initial position.
"""

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above re: parallelizable

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

def __init__(self, atomgroup, **kwargs):
super().__init__(atomgroup.universe.trajectory, **kwargs)
self._ag = atomgroup

def _prepare(self):
self.results.com_drift = []
self._initial_com = self._ag.center_of_mass()

def _single_frame(self):
# distance between start and current ligand position
# ignores PBC, but we've already centered the traj
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ignore PBC? Could you not just pass the box kwarg argument along? Or is the box distorted because of the transformation?

Please document this in the docstring.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a note to the doc string, I think that if, e.g. the ligand drifted in the simulation away by more than half the box size, but stayed in the same box, applying the minimum image convention (by passing through the box) would actually make the drift look smaller than it really was. Or would it in that case still identify that the ligand stayed in the same box and not apply the minimum image convention?

drift = mda.lib.distances.calc_bonds(
self._ag.center_of_mass(),
self._initial_com,
)
self.results.com_drift.append(drift)

def _conclude(self):
self.results.com_drift = np.asarray(self.results.com_drift)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it may be more ever so slightly efficient to just pre-allocate the array ahead of time in _prepare by defining a numpy array of length self.n_frames. This also has the nice side effect of not needing a _conclude definition.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!



def gather_rms_data(
pdb_topology: pathlib.Path, dataset: pathlib.Path, skip: Optional[int] = None
pdb_topology: pathlib.Path,
dataset: pathlib.Path,
skip: Optional[int] = None,
) -> dict[str, list[float]]:
"""
Compute structural RMSD-based metrics for a multistate BFE simulation.
Expand Down Expand Up @@ -161,105 +293,57 @@ def gather_rms_data(
# max against 1 to avoid skip=0 case
skip = max(n_frames // 500, 1)

pb = tqdm.tqdm(total=int(n_frames / skip) * n_lambda)

u_top = mda.Universe(pdb_topology)

for i in range(n_lambda):
for state_idx in range(n_lambda):
# cheeky, but we can read the PDB topology once and reuse per universe
# this then only hits the PDB file once for all replicas
u = make_Universe(u_top._topology, ds, state=i)
u = make_Universe(u_top._topology, ds, state=state_idx)

prot = u.select_atoms("protein and name CA")
ligand = u.select_atoms("resname UNK")

# save coordinates for 2D RMSD matrix
# TODO: Some smart guard to avoid allocating a silly amount of memory?
prot2d = np.empty((len(u.trajectory[::skip]), len(prot), 3), dtype=np.float32)

prot_start = prot.positions
ligand_start = ligand.positions
ligand_initial_com = ligand.center_of_mass()
ligand_weights = ligand.masses / np.mean(ligand.masses)

this_protein_rmsd = []
this_ligand_rmsd = []
this_ligand_wander = []

for ts_i, ts in enumerate(u.trajectory[::skip]):
pb.update()

if prot:
prot2d[ts_i, :, :] = prot.positions
this_protein_rmsd.append(
rms.rmsd(
prot.positions,
prot_start,
None, # prot_weights,
center=False,
superposition=False,
)
)
if ligand:
this_ligand_rmsd.append(
rms.rmsd(
ligand.positions,
ligand_start,
ligand_weights,
center=False,
superposition=False,
)
)
this_ligand_wander.append(
# distance between start and current ligand position
# ignores PBC, but we've already centered the traj
mda.lib.distances.calc_bonds(ligand.center_of_mass(), ligand_initial_com)
)

if prot:
# can ignore weights here as it's all Ca
rmsd2d = twoD_RMSD(prot2d, w=None) # prot_weights)
output["protein_RMSD"].append(this_protein_rmsd)
output["protein_2D_RMSD"].append(rmsd2d)
if ligand:
output["ligand_RMSD"].append(this_ligand_rmsd)
output["ligand_wander"].append(this_ligand_wander)

output["time(ps)"] = list(np.arange(len(u.trajectory))[::skip] * u.trajectory.dt)

return output


def twoD_RMSD(positions, w: Optional[npt.NDArray]) -> list[float]:
"""
Compute a flattened 2D RMSD matrix from a trajectory.

For all unique frame pairs ``(i, j)`` with ``i < j``, this function
computes the RMSD between atomic coordinates after optimal alignment.

Parameters
----------
positions : np.ndarray
Atomic coordinates for all frames in the trajectory.
w : np.ndarray, optional
Per-atom weights to use in the RMSD calculation. If ``None``,
all atoms are weighted equally.

Returns
-------
list of float
Flattened list of RMSD values corresponding to all frame pairs
``(i, j)`` with ``i < j``.
"""
nframes, _, _ = positions.shape

output = []

for i, j in itertools.combinations(range(nframes), 2):
posi, posj = positions[i], positions[j]

rmsd = rms.rmsd(posi, posj, w, center=True, superposition=True)

output.append(rmsd)
prot_rmsd = RMSDAnalysis(prot).run(step=skip)
output["protein_RMSD"].append(prot_rmsd.results.rmsd)
# # Using the MDAnalysis RMSD class instead
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remember to remove the commented out regions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this!

# gs = ["protein and name CA"]
# prot_rmsd = rms.RMSD(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two RMSD classes are approximately equal in timing (on the test data)

# u, select="protein and name CA", groupselections=gs, weights="mass")
# prot_rmsd.run(step=skip)
# # The results contain:
# # - frame number
# # - time
# # - RMSD based on select (after superimposing)
# # - RMSD based on groupselections, one array per selection
# output["protein_RMSD"].append(prot_rmsd.results.rmsd.T[3])

prot_rmsd2d = Protein2DRMSD(prot).run(step=skip)
output["protein_2D_RMSD"].append(prot_rmsd2d.results.rmsd2d)
# # Using the MDAnalysis DistanceMatrix class
# prot_rmsd2d = diffusionmap.DistanceMatrix(u, select="protein and name CA")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This MDA code is much slower, on the test data 10s vs. 0.4s.

# prot_rmsd2d.run(step=skip)
# dist_mat = prot_rmsd2d.results.dist_matrix
# i, j = np.triu_indices_from(dist_mat, k=1)
# flattened = dist_mat[i, j]
# output["protein_2D_RMSD"].append(flattened)

if ligand.n_atoms > 0:
Comment thread
hannahbaumann marked this conversation as resolved.
Outdated
lig_rmsd = RMSDAnalysis(ligand, mass_weighted=True).run(step=skip)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ligand RMSD is currently calculated on the hybrid topology, which may not be what we want long term.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a separate PR - the atom selection (or atomgroup) should really be user defined rather than defaulting to UNK.

This might be a good argument for letting Protocols deal with this rather than making it uniform.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue here: #103

output["ligand_RMSD"].append(lig_rmsd.results.rmsd)
# # Using the MDAnalysis RMSD class instead
# groupselections = ["resname UNK"]
# lig_rmsd = rms.RMSD(
# u,
# select="protein and name CA",
# groupselections=groupselections,
# weights="mass",
# )
# lig_rmsd.run(step=skip)
# output["ligand_RMSD"].append(lig_rmsd.results.rmsd.T[3])
lig_com_drift = LigandCOMDrift(ligand).run(step=skip)
output["ligand_wander"].append(lig_com_drift.results.com_drift)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is historical, so it doesn't have to be here, but can we please renamed this to ligand_com_drift or anything else? wander is such an unspecific name 😅

Copy link
Copy Markdown
Contributor Author

@hannahbaumann hannahbaumann May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would do this in a separate PR, since it would require an update in openfe? Raised an issue here #104


output["time(ps)"] = np.arange(len(u.trajectory))[::skip] * u.trajectory.dt

return output
80 changes: 80 additions & 0 deletions src/openfe_analysis/tests/test_rmsd_mda_data.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests for Protein2DRMSD or LigandCOMDrift?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original tests in test_rmsd.py are already covering Protein2DRMSD and LigandCOMDrift. I had added the MDA tests since in one of our meetings you had mentioned that it could make sense to just use those test data, however, for now I still kept all the original tests, also for RMSD, so some things are double tested right now. Should I try to move as much of the testing to use the MDA data, and only have a regression test on our own trajectories or what would you suggest?

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import MDAnalysis as mda
import pytest
from MDAnalysisTests.datafiles import DCD, PSF
from numpy.testing import assert_allclose, assert_almost_equal

from openfe_analysis.rmsd import RMSDAnalysis


@pytest.fixture
def mda_universe():
return mda.Universe(PSF, DCD)


@pytest.fixture()
def correct_values():
return [0, 4.68953]


@pytest.fixture()
def correct_values_mass():
return [0, 4.74920]


def test_rmsd(mda_universe, correct_values):
prot = mda_universe.select_atoms("name CA")
prot_rmsd = RMSDAnalysis(prot, superposition=True).run(step=49)
assert_almost_equal(
prot_rmsd.results.rmsd,
correct_values,
4,
err_msg="error: rmsd profile should match" + "test values",
)


def test_rmsd_frames(mda_universe, correct_values):
prot = mda_universe.select_atoms("name CA")
prot_rmsd = RMSDAnalysis(prot, superposition=True).run(frames=[0, 49])
assert_almost_equal(
prot_rmsd.results.rmsd,
correct_values,
4,
err_msg="error: rmsd profile should match" + "test values",
)


def test_rmsd_single_frame(mda_universe):
prot = mda_universe.select_atoms("name CA")
prot_rmsd = RMSDAnalysis(prot, superposition=True).run(start=5, stop=6)
single_frame = [0.91544906]
assert_almost_equal(
prot_rmsd.results.rmsd,
single_frame,
4,
err_msg="error: rmsd profile should match" + "test values",
)


def test_mass_weighted(mda_universe, correct_values):
# mass weighting the CA should give the same answer as weighing
# equally because all CA have the same mass
prot = mda_universe.select_atoms("name CA")
prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49)

assert_almost_equal(
prot_rmsd.results.rmsd,
correct_values,
4,
err_msg="error: rmsd profile should matchtest values",
)


def test_custom_weighted(mda_universe, correct_values_mass):
prot = mda_universe.select_atoms("all")
prot_rmsd = RMSDAnalysis(prot, superposition=True, mass_weighted=True).run(step=49)
assert_almost_equal(
prot_rmsd.results.rmsd,
correct_values_mass,
4,
err_msg="error: rmsd profile should matchtest values",
)
Loading