Source code for ogstools.simulation.simulation_controller
# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#
import signal
from time import sleep  # For simulation pause / interrupt
from typing import Any
from ogs.OGSSimulator import OGSSimulation
[docs]
class SimulationController(OGSSimulation):
[docs]
    def __init__(self, args: list[str]) -> None:
        super().__init__(args)
        signal.signal(signal.SIGINT, self._handler)
        signal.signal(signal.SIGTERM, self._handler)
        self._interrupted = False 
    def _handler(self, signum: int, _: Any) -> None:
        self._interrupted = True
        print(f"Received signal {signum}, stopping...")
[docs]
    def is_interrupted(self) -> bool:
        interrupted = self._interrupted
        self._interrupted = False
        return interrupted 
[docs]
    def wait(self, close: bool = False) -> None:
        """
        Wait until the simulation is finished.
        """
        while (
            self.current_time() < self.end_time() and not self.is_interrupted()
        ):
            self.execute_time_step()
            sleep(0.01)  # Must have, if we want to pause the simulation
        if close:
            self.close()