Source code for ogstools.core.execution

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause

import copy
import inspect
import os
import shutil
import subprocess
import sys
import warnings
from pathlib import Path

import yaml
from typing_extensions import Self

from .storage import StorageBase


[docs] class Execution(StorageBase): """ Configuration for OGS simulation execution parameters. This class encapsulates all settings related to how an OGS simulation is executed, including parallelization, containerization, and logging options. **OGS binary / container selection** The ``ogs_path`` parameter accepts either: - A directory containing the OGS executable (``"/path/to/bin"``). - A container image path or URL (``.sif`` / ``.squashfs``), in which case OGS is run inside the container as ``ogs``. Use pre-built container image URLs:: Execution(ogs_path=Execution.CONTAINER_PARALLEL_V6_5_7) When ``ogs`` is not set, the executable is looked up via ``OGS_BIN_PATH`` and, if not defined, on PATH. **Site-wide defaults via** ``OGS_EXECUTION_DEFAULTS`` Point the environment variable to a YAML file (e.g. in ``.envrc``, ``activate``, or ``.bashrc``) to set site-wide defaults:: export OGS_EXECUTION_DEFAULTS=/path/to/my_execution_defaults.yaml A typical cluster defaults file:: ogs_path: "/scratch/containers/ogs-6.5.7-petsc.squashfs" mpi_wrapper: "srun --ntasks" write_logs: true A typical developer defaults file:: ogs_path: "ogs/build/release/bin" """ __hash__ = None _CONTAINER_EXEC: str = "apptainer exec" default: "Execution" CONTAINER_SERIAL = "https://vip.s3.ufz.de/ogs/public/binaries/ogs6/6.5.7/ogs-6.5.7-serial.squashfs" CONTAINER_PARALLEL = "https://vip.s3.ufz.de/ogs/public/binaries/ogs6/6.5.7/ogs-6.5.7-petsc.squashfs" """Default :class:`Execution` instance, created at import time via auto-detection. Use this to inspect or share the default execution configuration without creating a new object each time. Users can override the default for the entire session:: Execution.default = Execution(ogs_path="/path/to/bin") """
[docs] def __init__( self, interactive: bool = False, args: str | None = None, mpi_wrapper: str | None = "mpirun -np", wrapper: str | None = None, ogs_path: str | None = None, mpi_ranks: int | None = None, omp_num_threads: int | None = None, ogs_asm_threads: int | None = None, write_logs: bool = True, log_level: str | None = None, id: str | None = None, ) -> None: """ Initialize an Execution configuration. :param interactive: If True, use interactive mode for stepwise control. :param args: Extra OGS command-line flags appended verbatim (see ``ogs --help`` for the full list). Useful flags: Example: ``"--write-prj --log-parallel"``. :param mpi_wrapper: MPI launcher prefix, e.g. ``"mpirun -np"``, ``"mpiexec -n"``, ``"srun --ntasks"``. :param wrapper: Generic command prefix prepended before the full command, e.g. ``"valgrind"``, ``"perf stat"``. :param ogs_path: Directory containing the OGS executable, or a container image path/URL (``.sif`` / ``.squashfs``). When not set, looked up via ``OGS_BIN_PATH`` or PATH. :param mpi_ranks: Number of MPI ranks. ``None`` = serial run. :param omp_num_threads: OpenMP threads per MPI rank. ``None`` = let OGS decide. See `OpenMP parallelization <https://www.opengeosys.org/6.5.7/docs/userguide/basics/openmp/>`_. :param ogs_asm_threads: OGS assembly threads. ``None`` = let OGS decide. See `OpenMP parallelization <https://www.opengeosys.org/6.5.7/docs/userguide/basics/openmp/>`_. :param write_logs: If True, write OGS log output to a file. :param log_level: OGS log verbosity: ``none|error|warn|info|debug|all``. ``None`` omits the ``-l`` flag (OGS default is ``info``). :param id: Optional unique identifier for this execution config. """ super().__init__("Execution", file_ext="yaml", id=id) self.interactive = interactive self.args = args self.mpi_wrapper = mpi_wrapper self.wrapper = wrapper self.ogs_path = ( ogs_path if ogs_path is not None else self._detect_ogs(mpi_ranks) ) self.mpi_ranks = mpi_ranks self.omp_num_threads = omp_num_threads self.ogs_asm_threads = ogs_asm_threads self.write_logs = write_logs self.log_level = log_level
@classmethod def _detect_ogs(cls, mpi_ranks: int | None) -> str | None: """Detect which OGS binary or container to use based on environment and ``mpi_ranks``. Priority for parallel runs (``mpi_ranks`` is set): 1. ``CONTAINER_PARALLEL`` (container with PETSc support). Priority for serial runs: 1. ``OGS_BIN_PATH`` environment variable — directory containing the ``ogs`` binary. 2. OGS Python wheel (``pip install ogs``) or ``ogs`` on PATH — returns ``None`` so that ``ogs`` is resolved from PATH at runtime. 3. ``CONTAINER_SERIAL`` as fallback. """ from ogstools._find_ogs import has_ogs_wheel, read_ogs_path if mpi_ranks is not None: return cls.CONTAINER_PARALLEL ogs_path = read_ogs_path() if ogs_path is not None: return str(ogs_path) if has_ogs_wheel(): return None # ogs is on PATH via the wheel return cls.CONTAINER_SERIAL
[docs] @classmethod def from_file(cls, filepath: Path | str) -> Self: """ Restore an Execution object from an execution.yaml file. :param filepath: Path to execution.yaml file. :returns: Restored Execution instance. :raises FileNotFoundError: If the specified file does not exist. """ filepath = Path(filepath) if not filepath.exists(): msg = f"Execution file does not exist: {filepath}" raise FileNotFoundError(msg) with filepath.open("r") as f: data = yaml.safe_load(f) or {} execution = cls(**data) execution._bind_to_path(filepath) return execution
[docs] @classmethod def from_id(cls, execution_id: str) -> Self: """ Load Execution from the user storage path using its ID. StorageBase.Userpath must be set. :param execution_id: The unique ID of the Execution to load. :returns: An Execution instance restored from disk. """ execution_file = ( StorageBase.saving_path() / "Execution" / f"{execution_id}" / "meta.yaml" ) if not execution_file.exists(): msg = f"No execution file found at {execution_file}" raise FileNotFoundError(msg) execution = cls.from_file(execution_file) execution._id = execution_id return execution
def _save_impl(self, dry_run: bool = False) -> list[Path]: target = Path(self.next_target) self._validate_container() self._validate_ogs() self._validate_thread_count() if dry_run: return [target] target.parent.mkdir(parents=True, exist_ok=True) data = { "interactive": self.interactive, "args": self.args, "wrapper": self.wrapper, "mpi_wrapper": self.mpi_wrapper, "ogs_path": self.ogs_path, "mpi_ranks": self.mpi_ranks, "omp_num_threads": self.omp_num_threads, "ogs_asm_threads": self.ogs_asm_threads, "write_logs": self.write_logs, "log_level": self.log_level, } with target.open("w") as f: yaml.safe_dump(data, f, sort_keys=False) return [target]
[docs] def save( self, target: Path | str | None = None, overwrite: bool | None = None, dry_run: bool = False, archive: bool = False, id: str | None = None, ) -> list[Path]: user_defined = self._pre_save(target, overwrite, dry_run, id=id) files = self._save_impl(dry_run=dry_run) self._post_save(user_defined, archive, dry_run) return files
def _propagate_target(self) -> None: pass def __repr__(self) -> str: cls_name = self.__class__.__name__ attrs = self._non_default_attributes() base_repr = super().__repr__() if self.user_specified_id: construct = f'{cls_name}.from_id("{self._id}")' elif self.is_saved: construct = f"{cls_name}.from_file({str(self.active_target)!r})" else: attrs_filtered = {k: v for k, v in attrs.items() if k != "id"} inner = ", ".join(f"{k}={v!r}" for k, v in attrs_filtered.items()) construct = f"{cls_name}({inner})" return f"{construct}\n{base_repr}" def __str__(self) -> str: base_str = super().__str__() non_defaults = self._non_default_attributes() lines = [base_str, " Execution settings:"] signature = inspect.signature(self.__class__.__init__) for name, param in signature.parameters.items(): if name == "self" or param.default is inspect._empty: continue value = getattr(self, name) marker = "*" if name in non_defaults else " " lines.append(f" {marker} {name}: {value}") return "\n".join(lines) @property def env(self) -> dict[str, str]: """Process environment for OGS subprocess, with OMP/ASM thread vars injected.""" e = os.environ.copy() if self.omp_num_threads is not None: e["OMP_NUM_THREADS"] = str(self.omp_num_threads) else: e.pop("OMP_NUM_THREADS", None) if self.ogs_asm_threads is not None: e["OGS_ASM_THREADS"] = str(self.ogs_asm_threads) else: e.pop("OGS_ASM_THREADS", None) return e def _value_attrs(self) -> dict[str, object]: """Return attributes that define value (excludes StorageBase state and env).""" return { k: v for k, v in self.__dict__.items() if k not in self._SAVE_STATE_ATTRS } def __deepcopy__(self, memo: dict) -> "Execution": if id(self) in memo: return memo[id(self)] new = Execution() for k, v in self._value_attrs().items(): setattr(new, k, copy.deepcopy(v, memo)) memo[id(self)] = new return new def __eq__(self, other: object) -> bool: if not isinstance(other, Execution): return NotImplemented return self._value_attrs() == other._value_attrs() @staticmethod def _is_container(ogs: str) -> bool: """Return True if ``ogs`` refers to a container image rather than a binary.""" return ogs.startswith(("http://", "https://")) or Path( ogs ).suffix.lower() in (".sif", ".squashfs") @property def container_path(self) -> str | None: """Container reference for the active run, or ``None`` for native execution.""" return ( self.ogs_path if self.ogs_path is not None and self._is_container(self.ogs_path) else None ) @property def container_prefix(self) -> str | None: """Container launch prefix, e.g. 'apptainer exec /path/to.sif', or None.""" if self.container_path is None: return None is_url = self.container_path.startswith(("http://", "https://")) ref = ( self.container_path if is_url else str(Path(self.container_path).expanduser()) ) return f"{self._CONTAINER_EXEC} {ref}" def _validate_thread_count(self) -> None: """Warn if total thread count exceeds system CPU count.""" mpi = self.mpi_ranks or 1 cpu_count = os.cpu_count() or 1 omp = self.omp_num_threads or cpu_count if mpi * omp > cpu_count: warnings.warn( f"omp_num_threads={omp} and mpi_ranks={mpi}: " f"total threads ({mpi * omp}) exceeds system CPU count ({cpu_count}), " "which may cause resource over-subscription.", UserWarning, stacklevel=2, ) def _validate_container(self) -> None: """Validate container path and launcher availability.""" if self.container_path is None: return if Path(self.container_path).suffix.lower() not in ( ".sif", ".squashfs", ): msg = "Container must be a *.sif or *.squashfs file." raise RuntimeError(msg) if sys.platform == "win32": msg = "Running OGS in a container is only possible on Linux." raise RuntimeError(msg) launcher = self._CONTAINER_EXEC.split()[0] if shutil.which(launcher) is None: msg = f"Container launcher '{launcher}' was not found." raise RuntimeError(msg) if not self.container_path.startswith(("http://", "https://")): container = Path(self.container_path).expanduser() if not container.is_file(): msg = "Container path is not a file." raise RuntimeError(msg) @property def _ogs_resolved(self) -> str: """Resolved OGS binary path. Container: ogs, Path given: <Path>/ogs, Then try to find ogs on $PATH, falls back to ``"ogs"`` if not found. """ if self.container_path is not None: return "ogs" if self.ogs_path is not None: return str(Path(self.ogs_path) / "ogs") resolved = shutil.which("ogs") return resolved if resolved is not None else "ogs" @property def _ogs_call_cmd(self) -> str: parts = [] if self.wrapper: parts.append(self.wrapper) if prefix := self.container_prefix: parts.append(prefix) if ( self.mpi_ranks is not None and self.mpi_ranks >= 1 and self.mpi_wrapper is not None ): parts += [self.mpi_wrapper, str(self.mpi_ranks)] parts.append(self._ogs_resolved) return " ".join(parts) @property def cmd(self) -> str: """OGS invocation command without project file and meshes path.""" parts = [self._ogs_call_cmd] if self.log_level is not None: parts += ["-l", self.log_level] if self.args is not None: parts.append(str(self.args)) return " ".join(parts) def _validate_ogs(self) -> None: """Validate the OGS executable and check PETSc configuration. For native execution the executable must be locatable via PATH. Running ``ogs --version`` must succeed. For parallel execution (``mpi_ranks`` is set) the version output must contain ``-DOGS_USE_PETSC="ON"``. For sequential execution ``-DOGS_USE_PETSC="ON"`` must be absent. """ parallel = self.mpi_ranks is not None version_cmd = f"{self._ogs_call_cmd} --version".strip() result = subprocess.run( version_cmd, shell=True, capture_output=True, text=True, check=False, ) if result.returncode != 0: msg = ( f"OGS executable failed to run '--version' " f"(exit code {result.returncode}): {result.stderr or result.stdout}" ) raise RuntimeError(msg) has_petsc = '-DOGS_USE_PETSC="ON"' in result.stdout if parallel and not has_petsc: msg = ( "Parallel execution requires OGS built with PETSc " '(-DOGS_USE_PETSC="ON"), but it was not found in ' "'ogs --version' output." ) raise RuntimeError(msg) if not parallel and has_petsc: msg = ( "Sequential execution requires OGS built without PETSc " '(-DOGS_USE_PETSC must be absent or "OFF"), ' "but 'ogs --version' reports -DOGS_USE_PETSC=\"ON\". " "Use a non-PETSc OGS build for sequential execution." ) raise RuntimeError(msg) from ogstools._find_ogs import has_ogs_wheel if self.interactive and not has_ogs_wheel(): msg = ( "Interactive simulation requires the OGS Python wheel. " "Install it with: pip install ogstools[ogs]" ) raise RuntimeError(msg)
[docs] @classmethod def from_default(cls, file: str | Path | None = None) -> Self: """Create the default :class:`Execution` instance. If ``OGS_EXECUTION_DEFAULTS`` is set, load settings from that file and construct with those values. Otherwise use the empty constructor. """ if file is None: env_path = os.environ.get("OGS_EXECUTION_DEFAULTS") if not env_path: return cls() path = Path(env_path) if not path.exists(): msg = f"OGS_EXECUTION_DEFAULTS={env_path!r} does not exist." raise FileNotFoundError(msg) else: path = Path(file) with path.open("r") as f: data = yaml.safe_load(f) or {} return cls(**data)