Source code for ogstools.core.simulation

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause

import typing
from copy import deepcopy
from pathlib import Path
from typing import overload

from typing_extensions import Self

from ogstools.core.model import Model
from ogstools.core.result import Result
from ogstools.core.simulation_controller import SimulationStatus
from ogstools.core.storage import StorageBase
from ogstools.logparser.log import Log
from ogstools.meshseries._meshseries import MeshSeries
from ogstools.ogs6py.project import Project


[docs] class Simulation(StorageBase): """ A completed or ongoing OGS simulation with associated model and results. Combines a Model (project setup) with Results (output data) and provides convenient access to simulation outputs like mesh series and log files. Simulations can be saved, loaded, and analyzed after completion. """ __hash__ = None Status = SimulationStatus
[docs] @classmethod def from_id(cls, simulation_id: str) -> Self: """ Load a Simulation from the user storage path using its ID. :param simulation_id: The unique identifier of the :class:`ogstools.Simulation` to load. :returns: A :class:`ogstools.Simulation` instance restored from disk. :raises FileNotFoundError: If no simulation with the given ID exists. """ sim_folder = ( StorageBase.saving_path() / "Simulation" / f"{simulation_id}" ) if not sim_folder.exists(): msg = f"No simulation found at {sim_folder}" raise FileNotFoundError(msg) simulation = cls.from_folder(sim_folder) simulation._id = simulation_id return simulation
[docs] @classmethod def from_folder(cls, sim_folder: Path | str) -> Self: """ Load a Simulation from a folder following OGSTools conventions. Expects the folder structure created by Simulation.save(): - model/: Subfolder containing the Model - result/: Subfolder containing simulation results :param sim_folder: Path to the folder containing the simulation. :returns: A :class:`ogstools.Simulation` instance loaded from the folder. :raises FileNotFoundError: If required components are not found. """ sim_folder = Path(sim_folder) model = Model.from_folder(sim_folder / "model") result = Result(sim_folder / "result") return cls(model, result)
[docs] def __init__( self, model: Model, result: Result, ) -> None: """ Initialize a Simulation object. :param model: The :class:`ogstools.Model` used for this simulation. :param result: The Result object containing simulation output. """ super().__init__("Simulation") self.model = model self.result = result result._bind_to_path(result.next_target) self._log: Log | None = None self._meshseries: MeshSeries | None = None
def __deepcopy__(self, memo: dict) -> "Simulation": """ Create a full deep copy of this Simulation, including model and result. """ new_model = deepcopy(self.model, memo) new_result = deepcopy(self.result, memo) new_sim = Simulation(new_model, new_result) new_sim._meshseries = ( deepcopy(self._meshseries, memo) if self._meshseries else None ) new_sim._log = deepcopy(self._log, memo) if self._log else None return new_sim def _component_repr( self, obj: typing.Any, name: str, load_method: str | None = None ) -> str: """Helper to generate repr string for a component (model/result). :param obj: The component object :param name: Class name (e.g., "Model", "Result") :param load_method: Class method name (e.g., "from_folder") or None to use the constructor directly """ if getattr(obj, "user_specified_id", False): return f'{name}.from_id("{obj.id}")' if getattr(obj, "is_saved", False): path = str(obj.active_target) elif (next_t := getattr(obj, "next_target", None)) is not None: path = str(next_t) else: return f"{name}(...)" if load_method: return f"{name}.{load_method}({path!r})" # Use constructor directly (e.g., Result(sim_output=...)) return f"{name}({path!r})" def __repr__(self) -> str: cls = self.__class__.__name__ base_repr = super().__repr__() if self.user_specified_id: construct = f'{cls}.from_id("{self._id}")' elif self.is_saved: construct = f"{cls}.from_folder({str(self.active_target)!r})" else: # Show how to reconstruct from its children model_construct = self._component_repr( self.model, "Model", "from_folder" ) result_construct = self._component_repr(self.result, "Result") construct = ( f"{cls}(model={model_construct}, " f"result={result_construct})\n" f"log_file={str(self.log_file)!r}\n" f"meshseries_file={str(self.meshseries_file)!r}" ) return f"{construct}\nstatus={self.status_str}\n{base_repr}" def __str__(self) -> str: base_str = super().__str__() lines = [ base_str, f" {self._component_status_str(self.model, 'Model')}", f" {self._component_status_str(self.result, 'Result')}", f" Log file: {self._format_path(self.log_file)}", f" MeshSeries: {self._format_path(self.meshseries_file)}", f" {self.status_str}", ] return "\n".join(lines) def __eq__(self, other: object) -> bool: if not isinstance(other, Simulation): return False return self.model == other.model and self.result == other.result @property def status(self) -> SimulationStatus: """ Get the current simulation status from the return code file. Reads the returncode file written by the simulation controller to determine if the simulation completed successfully or with an error. :returns: SimulationStatus enum value (running, done, error, or unknown). """ returncode_file = self.result.sim_output / "returncode" if returncode_file.exists(): return ( SimulationStatus.done if returncode_file.read_text().strip() == "0" else SimulationStatus.error ) if not self.log_file.exists(): return SimulationStatus.unknown log_text = self.log_file.read_text(errors="replace") if "Error" in log_text or "Critical" in log_text: return SimulationStatus.error if "Simulation completed" in log_text: return SimulationStatus.done if "OGS started on" in log_text: return SimulationStatus.running return SimulationStatus.not_started @property def status_str(self) -> str: """ Get a human-readable status description of the simulation. Includes information about completion status and result availability. :returns: String describing the simulation state. """ from .simulation_controller import SimulationController status = self.status if status == SimulationController.Status.done: if self.meshseries_file.exists(): return "Status: completed successfully (results available)" return "Status: completed successfully (results pending)" last_lines = Project._failed_run_print_log_tail( self.model.execution.write_logs, self.log_file ) return f"Status: terminated with error\n{last_lines}" @property def cmd(self) -> str: """Get the full command used to run the simulation.""" return f"{self.model.cmd} -o {self.result.next_target}" @property def log_file(self) -> Path: """Get the absolute path to the log file.""" if self.is_saved and self.active_target is not None: return self.active_target / "result" / self.result._log_filename return self.result.log_file @property def log(self) -> Log: """ Access the parsed log file of this simulation. Lazily loads and parses the log file on first access. :returns: A Log object for querying simulation log data. """ if self._log is None or self._log.df_records.empty: self._log = Log(self.log_file) return self._log @property def meshseries_file(self) -> Path: """ Get the path to the mesh series output file. :returns: Path to the mesh series file (pvd, xdmf, etc.). """ return self.result.next_target / self.model.project.meshseries_file() @property def meshseries(self) -> MeshSeries: """ Access the mesh series of this simulation. Lazily loads the mesh series on first access. :returns: A MeshSeries containing the simulation results. """ if self.status == SimulationStatus.running: print("Simulation still running. MeshSeries can be incomplete.") if not self.meshseries_file.exists(): msg = f"Can not find simulation result for a MeshSeries. Status of the simulation: {self.status_str}." raise ValueError(msg) if not self._meshseries: self._meshseries = MeshSeries(self.meshseries_file) return self._meshseries def _propagate_target(self) -> None: if not self.model.user_specified_target: self.model._next_target = self.next_target / "model" self.model._propagate_target() if not self.result.user_specified_target: self.result._next_target = self.next_target / "result" self.result._propagate_target() def _save_impl( self, dry_run: bool = False, overwrite: bool | None = None ) -> list[Path]: files: list[Path] = [] files += self._save_or_link_child( self.model, self.next_target / "model", dry_run, overwrite ) files += self._save_or_link_child( self.result, self.next_target / "result", dry_run, overwrite ) return files
[docs] def save( self, target: Path | str | None = None, overwrite: bool | None = None, dry_run: bool = False, archive: bool = False, id: str | None = None, ) -> list[Path]: """ Save the Simulation to disk, including Model and Result. Creates a folder structure containing: - model/: The Model (project, meshes, execution config) - result/: The simulation results :param target: Path to save the :class:`ogstools.Simulation`. If None, uses a default location. :param overwrite: If True, overwrite existing files. Defaults to False. :param dry_run: If True, simulate save without writing files. :param archive: If True, materialize all symlinks by copying data. :param id: Optional identifier. Mutually exclusive with target. :returns: List of Paths to saved files. """ user_defined = self._pre_save(target, overwrite, dry_run, id=id) files = self._save_impl(dry_run=dry_run, overwrite=overwrite) self._post_save(user_defined, archive, dry_run) return files
@overload def restart( self, restart_suffix: str = "_restart", zero_displacement: bool = False, initialize_porosity_from_medium_property: bool = False, *, t_initial: float, t_end: float, initial_dt: float | None = None, ) -> Model: ... @overload def restart( self, restart_suffix: str = "_restart", zero_displacement: bool = False, initialize_porosity_from_medium_property: bool = False, *, timevalues: list, ) -> Model: ...
[docs] def restart( self, restart_suffix: str = "_restart", zero_displacement: bool = False, initialize_porosity_from_medium_property: bool = False, *, t_initial: float | None = None, t_end: float | None = None, initial_dt: float | None = None, timevalues: list | None = None, ) -> Model: """ Prepares the PRJ file for a restart run. Supports two mutually exclusive modes: - Time-range mode: use ``t_initial`` and ``t_end`` (optionally ``initial_dt``). - Explicit-times mode: use ``timevalues`` only. The method reads the last written time step from the PVD referenced in the PRJ and updates initial conditions, output prefix, and time control blocks accordingly. If ``zero_displacement`` is True, displacement initial conditions are set to zero. ``restart_suffix`` is appended to the output prefix. ``t_initial``/``t_end`` and ``timevalues`` must not be used together. All time-scheme control arguments are keyword-only. :param restart_suffix: appended to the output prefix. Default: "_restart" :param zero_displacement: set displacement initial conditions to zero. Default is False. :param t_initial: restart interval start time (time-range mode) :param t_end: restart interval end time (time-range mode) :param initial_dt: initial time step (time-range mode) :param timevalues: explicit list of restart times (explicit-times mode) :return: None """ def _get_target_file(time: float) -> Path: """new domain mesh""" index = mesh_series.closest_timestep(time) target_diff = abs(mesh_series.timevalues[index] - time) if (target_diff) > 1e-6: msg = f"Output data file corresponding to timestep: {time} not found! Target: {target_diff} away" raise FileNotFoundError(msg) return mesh_series[index] model_restart = self.model.copy() prj = model_restart.project mesh_series = self.meshseries new_meshes = model_restart.meshes if (t_initial is None and t_end is None) == (timevalues is None): msg = "Exactly one of timevalues or (t_initial, t_end, Optional[initial_dt]) must be provided." raise TypeError(msg) if timevalues is None: prj._set_timescheme( t_initial=t_initial, t_end=t_end, initial_dt=initial_dt ) else: assert len(timevalues) > 0 prj._set_timescheme(timevalues=timevalues) t_initial = timevalues[0] assert t_initial is not None new_bulk_mesh = _get_target_file(t_initial) root = prj._get_root() old_bulk_mesh = root.findtext("./mesh") or root.findtext( "./meshes/mesh" ) output_prefix = root.findtext("./time_loop/output/prefix") assert old_bulk_mesh is not None, "Expected <mesh> definition." assert ( output_prefix is not None ), "Expected time_loop/output/prefix definition." prj.set(output_prefix=output_prefix + restart_suffix) prj._process_initial_conditions_for_restart( old_bulk_mesh, zero_displacement, initialize_porosity_from_medium_property, ) new_meshes.domain = new_bulk_mesh return model_restart