Source code for ogstools.core.native_simulation_controller

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause


import contextlib
import subprocess
import time
import typing
from pathlib import Path

import psutil

from .simulation_controller import SimulationController, SimulationStatus

if typing.TYPE_CHECKING:
    from .model import Model
    from .simulation import Simulation


[docs] class OGSNativeController(SimulationController): """ Controller for batch execution of OGS simulations. Runs OGS as a separate process and waits for completion. Does not support stepwise execution or intermediate state inspection. Suitable for standard production runs. Objects should be used as read-only. """
[docs] def __init__( self, model_ref: "Model", sim_output: Path | str | None = None, overwrite: bool | None = None, ) -> None: """ Initialize a native simulation controller. :param model_ref: The :class:`ogstools.Model` to simulate. :param sim_output: Optional path for simulation output directory. :param overwrite: If True, overwrite existing output directory. """ super().__init__( model_ref=model_ref, sim_output=sim_output, overwrite=overwrite ) exe = model_ref.execution logfile = self.result.next_target / "log.txt" if exe.write_logs: with logfile.open("w") as logf: self.process = subprocess.Popen( self.cmd, shell=True, stdout=logf, stderr=subprocess.STDOUT, env=exe.env, ) else: self.process = subprocess.Popen( self.cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, env=exe.env, ) self.runtime_start = time.time() self.runtime_end: float | None = None
[docs] def terminate(self) -> bool: """ Terminate the simulation if it is running. Attempts to gracefully terminate the OGS process and all child processes. If graceful termination fails, forcefully kills them. :returns: True if the run was terminated successfully, False otherwise. """ timeout = 3 proc = self.process if proc.poll() is not None: print( "Requested termination - but the Simulation is already finished." ) return True try: p = psutil.Process(proc.pid) children = p.children(recursive=True) for c in children: with contextlib.suppress(psutil.NoSuchProcess): c.terminate() psutil.wait_procs(children, timeout=timeout) for c in children: if c.is_running(): with contextlib.suppress(psutil.NoSuchProcess): c.kill() psutil.wait_procs(children, timeout=timeout) if p.is_running(): try: p.terminate() p.wait(timeout) except psutil.TimeoutExpired: p.kill() p.wait() return not p.is_running() except psutil.NoSuchProcess: with contextlib.suppress(Exception): proc.wait(timeout=timeout) return True
[docs] def run( self, target: Path | str | None = None, id: str | None = None ) -> "Simulation": """ Wait for the simulation to complete and return a Simulation object. Blocks until the OGS process finishes. :param target: Optional path for the simulation output directory. :param id: Optional identifier for the resulting Simulation. :returns: A :class:`ogstools.Simulation` object containing the completed simulation. """ self.ret_code = self.process.wait() self.runtime_end = time.time() (self.result.next_target / "returncode").write_text(str(self.ret_code)) return self._create_simulation(target=target, id=id)
[docs] def status_str(self) -> str: """ Get a human-readable status description. :returns: String describing the current simulation state and runtime. """ match self.process.poll(): case None: runtime = time.time() - self.runtime_start stat_str = f"running for {runtime} s." case 0: stat_str = "finished successfully." if self.runtime_end: runtime = self.runtime_end - self.runtime_start stat_str += f"\nExecution took {runtime} s" elif self.result.log_file and self.result.log_file.exists(): stat = self.result.log_file.stat() runtime = stat.st_mtime - stat.st_ctime stat_str += f"\nExecution took {runtime} s" case code: stat_str = f"terminated with error code {code}." if self.process.returncode != 0: stat_str += self.error_report() return "Status: " + stat_str
@property def status(self) -> SimulationStatus: """ Get the current simulation status. Queries the process state to determine if the simulation is running, completed, or encountered an error. :returns: Current SimulationStatus. """ if not self.process: return SimulationStatus.not_started match self.process.poll(): case None: return SimulationStatus.running case 0: return SimulationStatus.done case _: return SimulationStatus.error
# paused can not be reached, if suspended process sim is in State running