Source code for ogstools.simulation.simulation_controller

# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#


import signal
from time import sleep  # For simulation pause / interrupt
from typing import Any

from ogs.OGSSimulator import OGSSimulation


[docs] class SimulationController(OGSSimulation):
[docs] def __init__(self, args: list[str]) -> None: super().__init__(args) signal.signal(signal.SIGINT, self._handler) signal.signal(signal.SIGTERM, self._handler) self._interrupted = False
def _handler(self, signum: int, _: Any) -> None: self._interrupted = True print(f"Received signal {signum}, stopping...")
[docs] def is_interrupted(self) -> bool: interrupted = self._interrupted self._interrupted = False return interrupted
[docs] def wait(self, close: bool = False) -> None: """ Wait until the simulation is finished. """ while ( self.current_time() < self.end_time() and not self.is_interrupted() ): self.execute_time_step() sleep(0.01) # Must have, if we want to pause the simulation if close: self.close()