"""The FastMDXplora project-level orchestrator.
This module implements the central orchestrator class. Following the
phase-based orchestration pattern (Aina & Kwan, JCC 2026),
the orchestrator:
1. Holds shared project state (system input, output directory, options)
2. Knows its registered phases (setup, simulate, analyze, report)
3. Applies intelligent defaults and validates per-phase options
4. Executes phases in coordinated sequence
5. Consolidates outputs into a single project directory
Unlike a generic workflow engine (Snakemake, Nextflow, Galaxy), the workflow
is built-in and the user expresses intent through include/exclude and option
overrides, not by describing a DAG (directed acyclic graph: the
task-and-dependency model those engines use).
"""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastmdxplora.utils.logging import get_logger
logger = get_logger("project")
# ---------------------------------------------------------------------------
# The canonical phase list. This is the only place phase order is defined.
# ---------------------------------------------------------------------------
PHASES: tuple[str, ...] = ("setup", "simulation", "analysis", "report")
@dataclass
class PhaseResult:
"""Lightweight record of a single phase invocation."""
name: str
status: str # "ok" | "skipped" | "error"
output_dir: Path | None = None
started_at: str = ""
finished_at: str = ""
message: str = ""
artifacts: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"status": self.status,
"output_dir": str(self.output_dir) if self.output_dir else None,
"started_at": self.started_at,
"finished_at": self.finished_at,
"message": self.message,
"artifacts": self.artifacts,
}
@dataclass
class RunResult:
"""Result of one run within an exploration.
``explore()`` always returns a list of these — a single study is a
list of one, a sweep is a list of many. Each carries the run's
identity and the per-phase results inside ``phases``.
"""
run_id: str
system: str
status: str # "ok" | "error"
output_dir: Path | None = None
sweep_values: dict[str, Any] = field(default_factory=dict)
phases: list[PhaseResult] = field(default_factory=list)
message: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"system": self.system,
"status": self.status,
"output_dir": str(self.output_dir) if self.output_dir else None,
"sweep_values": self.sweep_values,
"phases": [p.to_dict() for p in self.phases],
"message": self.message,
}
# Convenience: treat a RunResult a bit like its phase list, so common
# patterns (iterating phases, checking a phase status) stay ergonomic.
def phase(self, name: str) -> PhaseResult | None:
"""Return the PhaseResult for ``name``, or None if it didn't run."""
for p in self.phases:
if p.name == name:
return p
return None
[docs]
class FastMDXplora:
"""Project-level orchestrator for end-to-end MD studies.
Parameters
----------
system : str | os.PathLike
Input for a single study. Accepted forms (auto-detected):
- Path to a PDB / CIF file (e.g. ``"protein.pdb"``)
- 4-character PDB ID (e.g. ``"1L2Y"``), fetched from RCSB
- One-letter amino-acid sequence, if structure prediction is
available (future)
Mutually exclusive with ``config``.
config : str | os.PathLike | None
Path to a YAML config file. Drives one system or many (with an
optional parameter sweep and parallel execution); the interface
is the same either way. Mutually exclusive with ``system``.
output_dir : str | os.PathLike | None
Where to write project outputs. Defaults to
``./fastmdxplora_output_<timestamp>``.
options : dict[str, dict] | None
Per-phase keyword arguments, e.g.
``{"simulation": {"duration_ns": 100}}``.
verbose : bool
If True, log progress to stdout in addition to the project log file.
include, exclude : list[str] | None
Default phase selection (``explore()`` arguments still override).
Examples
--------
>>> fmdx = FastMDXplora(system="protein.pdb")
>>> fmdx.explore() # doctest: +SKIP
>>> fmdx = FastMDXplora(system="1L2Y") # PDB ID, fetched from RCSB
>>> fmdx.explore( # doctest: +SKIP
... include=["setup", "simulation"],
... options={"simulation": {"duration_ns": 50}},
... )
>>> # A config file: one system or many, same interface:
>>> fmdx = FastMDXplora(config="study.yml")
>>> fmdx.explore() # doctest: +SKIP
"""
def __init__(
self,
system: str | os.PathLike | None = None,
*,
config: str | os.PathLike | None = None,
config_data: dict[str, Any] | None = None,
output_dir: str | os.PathLike | None = None,
options: dict[str, dict[str, Any]] | None = None,
verbose: bool = False,
include: list[str] | None = None,
exclude: list[str] | None = None,
) -> None:
# FastMDXplora is the single user-facing entry point. Ways to
# construct it:
# - config=... : a YAML config path (one system or many, with
# optional sweep / parallel execution).
# - config_data=... : the same, as an already-parsed dict (used by
# the CLI, which assembles a config from flags).
# - system=... : a single concrete study, run directly. This is
# also the path the internal batch worker uses
# for each run, so it must not recurse.
# config/config_data execution is deferred to explore().
n_config = sum(x is not None for x in (config, config_data))
if n_config and system is not None:
raise ValueError(
"Pass either `system=` (a single study) or a config "
"(`config=` / `config_data=`), not both."
)
self._config_path: str | None = (
str(config) if config is not None else None
)
self._config_data: dict[str, Any] | None = config_data
self._deferred_output_dir = output_dir
self._deferred_verbose = verbose
if n_config:
# Config-driven: defer everything to explore(). We don't create
# an output directory or banner here because the batch machinery
# owns the layout (flat for one run, runs/<id>/ for many).
self.system = None # resolved per-run by the batch layer
self.options = options or {}
self.verbose = bool(verbose)
self._config_include = include
self._config_exclude = exclude
self.results = []
return
# ---- Direct single-study path -----------------------------------
if system is None:
raise ValueError(
"FastMDXplora requires either a `system` input (a PDB/CIF "
"file path, a 4-character PDB ID, or a one-letter sequence) "
"or a `config` file."
)
self.system: str = str(system)
# Phase selection (the batch layer passes the config's include/exclude)
self._config_include: list[str] | None = include
self._config_exclude: list[str] | None = exclude
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
self.output_dir: Path = (
Path(output_dir) if output_dir
else Path(f"fastmdxplora_output_{timestamp}")
)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.options: dict[str, dict[str, Any]] = options or {}
self.verbose: bool = bool(verbose)
# Per-phase output subdirectories (created lazily by each phase)
self._phase_dirs: dict[str, Path] = {
phase: self.output_dir / phase for phase in PHASES
}
# Record of phase executions in this session
self.results: list[PhaseResult] = []
self._configure_logging()
self._presenter = self._configure_presenter()
# Display the opening banner once the session is ready.
from fastmdxplora import __version__
self._presenter.banner(
System=self.system,
Output=str(self.output_dir),
Version=__version__,
)
# The banner already shows system/output to the user; this log
# is for the file/audit trail and verbose console only.
logger.debug(
"FastMDXplora initialized: system=%s output=%s", self.system, self.output_dir
)
# ------------------------------------------------------------------
# Orchestration entry point
# ------------------------------------------------------------------
[docs]
def explore(
self,
*,
include: list[str] | None = None,
exclude: list[str] | None = None,
options: dict[str, dict[str, Any]] | None = None,
report: bool = True,
dry_run: bool = False,
) -> list[RunResult]:
"""Run the full pipeline, end to end.
Parameters
----------
include : list of str, optional
Phases to run (subset of {"setup", "simulation", "analysis", "report"}).
If omitted, all phases run.
exclude : list of str, optional
Phases to skip. Mutually exclusive with ``include``.
options : dict, optional
Per-phase option overrides applied on top of the orchestrator's
``options`` attribute.
report : bool, default True
Convenience flag. If False, skip the report phase even when
``include``/``exclude`` would otherwise enable it.
dry_run : bool, default False
If True, print the plan — every run, its system, swept values,
output directory, and the phases that would execute — and
return without running anything.
Returns
-------
list[RunResult]
One :class:`RunResult` per run, always. A single study is a
list of one; a sweep is a list of many. Each ``RunResult``
carries its per-phase :class:`PhaseResult` list in ``.phases``.
Notes
-----
When ``include`` / ``exclude`` are omitted here but were set in a
config file passed to the constructor, the config-file values are
used. Explicit arguments to this method always win.
"""
# Config-driven runs (one system or many) go through the batch
# machinery internally. The user always sees the same FastMDXplora
# interface; the batch layer is an implementation detail.
if self._config_path is not None or self._config_data is not None:
return self._explore_config(
include=include, exclude=exclude, report=report, dry_run=dry_run,
)
# Config-file phase selection is the fallback when this call omits it.
if include is None and exclude is None:
include = self._config_include
exclude = self._config_exclude
plan = self._build_plan(include=include, exclude=exclude, want_report=report)
# Dry run: report the plan and return without executing.
if dry_run:
self._print_dry_run_single(plan)
return [RunResult(
run_id="s1", system=self.system, status="planned",
output_dir=self.output_dir, phases=[],
)]
merged_options = self._merge_options(options)
# Remember the resolved phase selection + merged options so the
# resolved_config.yml dump reflects what *actually* ran (including
# any per-call overrides), not just the construction-time config.
self._resolved_include = include
self._resolved_exclude = exclude
self._resolved_options = {
p: opts for p, opts in merged_options.items() if opts
}
# Plan goes to file/audit; the presenter shows headers visually.
logger.debug("Plan: %s", " -> ".join(plan))
for phase in plan:
self._presenter.phase_start(phase)
result = self._run_phase(phase, merged_options.get(phase, {}))
self.results.append(result)
self._presenter.phase_end(phase, status=result.status)
if result.status == "error":
logger.error("Phase '%s' failed: %s", phase, result.message)
break
self._write_manifest()
self._write_resolved_config()
self._presenter.done()
# Wrap the phase results into a single RunResult (a study of one).
status = "error" if any(r.status == "error" for r in self.results) else "ok"
return [RunResult(
run_id="s1",
system=self.system,
status=status,
output_dir=self.output_dir,
phases=list(self.results),
)]
def _print_dry_run_single(self, plan: list[str]) -> None:
"""Print the plan for a single study without running it."""
print("\nFastMDXplora dry run (no execution)")
print("=" * 40)
print(f" system: {self.system}")
print(f" output: {self.output_dir}")
print(f" phases: {' → '.join(plan) if plan else '(none)'}")
def _explore_config(
self,
*,
include: list[str] | None,
exclude: list[str] | None,
report: bool,
dry_run: bool = False,
) -> list[RunResult]:
"""Run a config-driven study through the internal batch machinery.
Handles one system or many identically. Exposed to the user only as
``FastMDXplora(config=...).explore()``; the batch layer underneath
is private.
"""
from fastmdxplora.batch import BatchExplorer
batch = BatchExplorer(
config=self._config_path,
config_data=self._config_data,
output_dir=self._deferred_output_dir,
verbose=self._deferred_verbose,
)
# explore()-level phase overrides win over the config file.
if include is not None:
batch._raw["include"] = include
batch._raw["exclude"] = None
elif exclude is not None:
batch._raw["exclude"] = exclude
batch._raw["include"] = None
if not report:
existing = batch._raw.get("exclude") or []
if "report" not in existing and not batch._raw.get("include"):
batch._raw["exclude"] = [*existing, "report"]
if dry_run:
run_results = batch.dry_run()
self.output_dir = batch.output_dir
self.results = run_results
return run_results
run_results = batch.run()
# Surface the resolved output location for callers that read it.
self.output_dir = batch.output_dir
self.results = run_results
return run_results
# Convenience: per-phase entry points (also called by the CLI)
[docs]
def setup(self, **kwargs: Any) -> PhaseResult:
"""Run only the setup phase."""
return self._run_phase("setup", kwargs)
[docs]
def simulate(self, **kwargs: Any) -> PhaseResult:
"""Run only the simulation phase."""
return self._run_phase("simulation", kwargs)
[docs]
def analyze(self, **kwargs: Any) -> PhaseResult:
"""Run only the analysis phase."""
return self._run_phase("analysis", kwargs)
[docs]
def report(self, **kwargs: Any) -> PhaseResult:
"""Run only the report phase."""
return self._run_phase("report", kwargs)
[docs]
def compare(self, *, output_dir: str | os.PathLike | None = None) -> Path | None:
"""(Re)build the cross-run comparison report for a multi-run study.
A multi-run ``explore()`` builds this automatically; call this to
regenerate it — for example after re-running some of the runs, or
to produce it for a batch that finished earlier.
Parameters
----------
output_dir : str | os.PathLike, optional
The batch output directory to read (the one containing
``batch_manifest.json``). Defaults to this object's
``output_dir`` — i.e. the study it just ran.
Returns
-------
Path or None
The ``comparison/`` directory, or None if there was nothing to
compare (fewer than two successful runs, or no analysis
outputs were found).
"""
from fastmdxplora.batch.compare import build_comparison_report
target = Path(output_dir) if output_dir is not None else getattr(
self, "output_dir", None
)
if target is None:
raise ValueError(
"compare() needs an output directory — pass output_dir=, or "
"call it after explore() so the run's output is known."
)
return build_comparison_report(target)
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _build_plan(
self,
*,
include: list[str] | None,
exclude: list[str] | None,
want_report: bool,
) -> list[str]:
if include is not None and exclude is not None:
raise ValueError("Specify either `include` or `exclude`, not both.")
if include is not None:
unknown = set(include) - set(PHASES)
if unknown:
raise ValueError(f"Unknown phase(s): {sorted(unknown)}. Valid: {PHASES}")
plan = [p for p in PHASES if p in include]
elif exclude is not None:
unknown = set(exclude) - set(PHASES)
if unknown:
raise ValueError(f"Unknown phase(s): {sorted(unknown)}. Valid: {PHASES}")
plan = [p for p in PHASES if p not in exclude]
else:
plan = list(PHASES)
if not want_report and "report" in plan:
plan.remove("report")
return plan
def _merge_options(
self, override: dict[str, dict[str, Any]] | None
) -> dict[str, dict[str, Any]]:
merged: dict[str, dict[str, Any]] = {p: dict(self.options.get(p, {})) for p in PHASES}
if override:
for phase, opts in override.items():
if phase not in PHASES:
raise ValueError(
f"Unknown phase '{phase}' in options. Valid: {PHASES}"
)
merged[phase].update(opts)
return merged
def _run_phase(self, phase: str, kwargs: dict[str, Any]) -> PhaseResult:
phase_dir = self._phase_dirs[phase]
phase_dir.mkdir(parents=True, exist_ok=True)
started = datetime.now(timezone.utc).isoformat()
logger.debug("--> Phase '%s' starting (output=%s)", phase, phase_dir)
try:
run_fn = self._resolve_phase_runner(phase)
artifacts = run_fn(
orchestrator=self,
output_dir=phase_dir,
**kwargs,
)
finished = datetime.now(timezone.utc).isoformat()
return PhaseResult(
name=phase,
status="ok",
output_dir=phase_dir,
started_at=started,
finished_at=finished,
message=f"Phase '{phase}' completed.",
artifacts=list(artifacts or []),
)
except Exception as exc: # noqa: BLE001 -- we log and record
finished = datetime.now(timezone.utc).isoformat()
logger.exception("Phase '%s' raised an exception", phase)
return PhaseResult(
name=phase,
status="error",
output_dir=phase_dir,
started_at=started,
finished_at=finished,
message=str(exc),
)
@staticmethod
def _resolve_phase_runner(phase: str):
"""Look up the run() entry point for a given phase.
Each phase package exposes a ``run(orchestrator, output_dir, **kwargs)``
callable; the orchestrator imports it lazily so that an optional
backend (e.g. OpenMM) is only required when its phase is invoked.
"""
if phase == "setup":
from fastmdxplora.setup.pipeline import run
return run
if phase == "simulation":
from fastmdxplora.simulation.pipeline import run
return run
if phase == "analysis":
from fastmdxplora.analysis.analyze import run
return run
if phase == "report":
from fastmdxplora.report import run
return run
raise ValueError(f"Unknown phase: {phase}")
def _write_manifest(self) -> None:
"""Write a single JSON manifest summarizing this session."""
from fastmdxplora import __citation__, __doi__, __version__
manifest = {
"tool": "FastMDXplora",
"version": __version__,
"doi": __doi__,
"citation": __citation__,
"system": self.system,
"output_dir": str(self.output_dir),
"phases": [r.to_dict() for r in self.results],
"options": self.options,
}
manifest_path = self.output_dir / "manifest.json"
with manifest_path.open("w", encoding="utf-8") as fh:
json.dump(manifest, fh, indent=2)
logger.debug("Wrote manifest: %s", manifest_path)
def _write_resolved_config(self) -> None:
"""Write the fully-merged configuration for reproducibility.
Produces ``resolved_config.yml`` capturing the system, output,
phase selection, and per-phase options actually used. The file is
a valid FastMDXplora config — feeding it back to ``--config``
reproduces the run.
"""
from fastmdxplora.config import write_resolved_config
resolved = {
"system": self.system,
"output": str(self.output_dir),
"verbose": self.verbose,
"include": getattr(self, "_resolved_include", None) or self._config_include,
"exclude": getattr(self, "_resolved_exclude", None) or self._config_exclude,
"options": getattr(self, "_resolved_options", None) or self.options,
}
try:
path = write_resolved_config(resolved, self.output_dir)
logger.debug("Wrote resolved config: %s", path)
except Exception as exc: # noqa: BLE001 -- never fail a run over this
logger.debug("Could not write resolved config: %s", exc)
def _configure_logging(self) -> None:
"""Wire up console and file logging for this project session.
The root ``fastmdx`` logger is set to DEBUG so all records flow to
the handlers; each handler then applies its own level filter. The
file handler always captures at DEBUG (full audit trail). The
console handler defaults to INFO, raised to DEBUG when
``verbose=True`` or ``FASTMDX_LOGLEVEL=DEBUG`` is set.
"""
from fastmdxplora.utils.logging import attach_file_logger, set_level, setup_console
console_level = logging.DEBUG if self.verbose else logging.INFO
setup_console(level=console_level)
attach_file_logger(self.output_dir / "fastmdxplora.log", level=logging.DEBUG)
# Root logger must be at the lowest handler level so records flow.
set_level(logging.DEBUG)
# ...but the console handler still applies its own filter.
# set_level above promoted ALL handlers to DEBUG; re-apply the
# console-level filter so quiet mode stays quiet.
from fastmdxplora.utils.logging import _console_handler
if _console_handler is not None:
_console_handler.setLevel(console_level)
def _configure_presenter(self):
"""Create the session presenter for structural output.
The presenter is silent when ``FASTMDX_LOG_STYLE=plain`` (handled
internally by :class:`SessionPresenter`) or when stdout is not a
TTY (handled by color auto-detection). Users wanting different
behaviour can replace ``self._presenter`` after construction.
"""
from fastmdxplora.utils.presenter import SessionPresenter
return SessionPresenter()