Source code for ogstools.core.simulation_controller

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause


import abc
import signal
import typing
from enum import Enum
from pathlib import Path

from .result import Result

if typing.TYPE_CHECKING:
    from .model import Model
    from .simulation import Simulation


[docs] class SimulationStatus(Enum): """ Enumeration of possible simulation states. Attributes: not_started: Simulation has not been started yet. running: Simulation is currently executing. paused: Simulation is paused (interactive mode only). done: Simulation completed successfully. error: Simulation terminated with an error. """ not_started = 0 # open running = 1 paused = 2 done = 3 # reached end_time error = 4 # with error unknown = 5
[docs] class SimulationController(abc.ABC): """ Abstract base class for controlling OGS simulation execution. Provides a unified interface for running simulations, whether in interactive stepwise mode or batch mode. Handles signal interruption (SIGINT, SIGTERM) and manages simulation status. Concrete implementations: - OGSInteractiveController: For stepwise execution control - OGSNativeController: For batch execution """ Status = SimulationStatus
[docs] def __init__( self, model_ref: "Model", sim_output: Path | str | None = None, overwrite: bool | None = None, ) -> None: """ Initialize a SimulationController. :param model_ref: The :class:`ogstools.Model` to simulate. :param sim_output: Optional path for simulation output directory. If None, uses a default location. :param overwrite: If True, overwrite existing output directory. """ self.model_ref = model_ref self._args_list: list[str] = [] self.result = Result(sim_output) self.result._pre_save(overwrite=overwrite) self.result.next_target.mkdir(parents=True, exist_ok=True) self._interrupted = False signal.signal(signal.SIGINT, self._handler) signal.signal(signal.SIGTERM, self._handler)
def _handler(self, signum: int, _: typing.Any) -> None: self._interrupted = True print(f"Received signal {signum}, stopping...") @property def is_interrupted(self) -> bool: """ Check if an interrupt signal was received and reset the flag. :returns: True if SIGINT or SIGTERM was received, False otherwise. """ interrupted = self._interrupted self._interrupted = False return interrupted
[docs] @abc.abstractmethod def terminate(self) -> bool: """ Terminate the simulation immediately. :returns: True if termination was successful, False otherwise. """
[docs] @abc.abstractmethod def run( self, target: Path | str | None = None, id: str | None = None ) -> "Simulation": """ Run the simulation to completion. :param target: Optional path for the simulation output directory. :param id: Optional identifier for the resulting Simulation. :returns: A :class:`ogstools.Simulation` object containing the completed simulation. """
def _create_simulation( self, target: Path | str | None = None, id: str | None = None ) -> "Simulation": """ Create a Simulation object with optional id and target. :param target: Optional path for the simulation output directory. :param id: Optional identifier for the Simulation. :returns: A configured :class:`ogstools.Simulation` object. """ from .simulation import Simulation sim = Simulation(self.model_ref, result=self.result) if id: sim.id = id return sim if target: # This is basically self.save(target) but without pre_save sim._next_target = Path(target) sim.user_specified_target = True sim._save_impl() sim._post_save(user_defined=True) sim._propagate_target() return sim @property @abc.abstractmethod def status(self) -> SimulationStatus: """ Get the current simulation status. :returns: Current SimulationStatus. """
[docs] @abc.abstractmethod def status_str(self) -> str: """ Get a human-readable status string. :returns: String describing the current simulation state. """
@property def log_file(self) -> Path: """Get the path to the log file.""" return self.result.log_file @property def meshseries_file(self) -> Path: """Get the path to the mesh series file.""" return ( self.result.next_target / self.model_ref.project.meshseries_file() ) @property def cmd(self) -> str: """Get the full command used to run the simulation.""" return f"{self.model_ref.cmd} -o {self.result.next_target}"
[docs] def error_report(self) -> str: """ Generate an error report if the simulation failed. Includes the last lines of the log file if available. :returns: A formatted error report string. """ msg = "" if self.status == SimulationController.Status.not_started: msg += "OGS not (yet) started." return msg if self.status != SimulationStatus.error: msg += "Still running." return msg msg += "An error occurred." if not self.result.log_file.exists(): msg += f"No log file written to: {self.result.log_file}." return msg msg += f"Last lines of {self.result.log_file} are:" with self.result.log_file.open() as lf: last_lines = "\n".join(lf.readlines()[-10:]) msg += last_lines return msg
def __repr__(self) -> str: from .storage import StorageBase model_target = StorageBase._format_path( self.model_ref.next_target, for_repr=True ) result_target = StorageBase._format_path( self.result.next_target, for_repr=True ) meshseries = StorageBase._format_path( self.meshseries_file, for_repr=True ) logfile = StorageBase._format_path(self.log_file, for_repr=True) return ( f"Model.from_folder({model_target}).controller(sim_output={result_target}, overwrite=True)\n" f"meshseries_file={meshseries}\n" f"logfile={logfile}\n" f"status={self.status_str()}\n" f"execution.interactive={self.model_ref.execution.interactive}" ) def __str__(self) -> str: from .storage import StorageBase mode = ( "Interactive" if self.model_ref.execution.interactive else "Native" ) return ( f"SimulationController ({mode})\n" f"Model: {StorageBase._format_path(self.model_ref.next_target)}\n" f"Result: {StorageBase._format_path(self.result.next_target)}\n" f"MeshSeries: {StorageBase._format_path(self.meshseries_file)}\n" f"Logfile: {StorageBase._format_path(self.log_file)}\n" f"{self.status_str()}\n" )