Source code for ogstools.core.interactive_simulation_controller

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause

import os
import threading
import time
import typing
from collections.abc import Sequence
from pathlib import Path
from time import sleep

from pyvista import UnstructuredGrid

from .model import Model
from .simulation_controller import SimulationController, SimulationStatus

if typing.TYPE_CHECKING:
    from .simulation import Simulation


[docs] class OGSSimulationInitializationError(Exception): """ Exception raised when OGSSimulation initialization fails. This can occur when: - OGSSimulation returns None (initialization failed) - Multiple OGSSimulation instances are created in parallel (Issue #3589) """
class _NativeOutputCapture: # TODO:: Issue #3589 - Not thread safe! Multiple interactive simulations cannot run in parallel """ Temporary solution for interactive SimulationController to capture the log output into a file. It is not thread safe! Logfile can be empty/corrupted when running multiple interactive simulations in parallel """ def __init__(self, file_path: Path): self._r_fd, self._w_fd = os.pipe() self._old_stdout_fd = os.dup(1) self._old_stderr_fd = os.dup(2) self.file_path = file_path def start(self) -> None: os.dup2(self._w_fd, 1) os.dup2(self._w_fd, 2) self._thread = threading.Thread(target=self._reader) self._thread.daemon = True self._thread.start() def _reader(self) -> None: with ( Path(self.file_path).open("w") as f, os.fdopen(self._r_fd, "r") as r, ): f.writelines(r) def stop(self) -> None: # Restore original stdout/stderr os.dup2(self._old_stdout_fd, 1) os.dup2(self._old_stderr_fd, 2) os.close(self._w_fd) self._thread.join()
[docs] class OGSInteractiveController(SimulationController): """ Controller for interactive stepwise execution of OGS simulations. Allows fine-grained control over simulation execution including: - Executing individual time steps - Inspecting intermediate mesh states - Querying current simulation time - Pausing and resuming execution Requires OGS to be built with interactive mode support. """
[docs] def __init__( self, model_ref: "Model", sim_output: Path | str | None = None, overwrite: bool | None = None, ) -> None: """ Initialize an interactive simulation controller. :param model_ref: The :class:`ogstools.Model` to simulate. :param sim_output: Optional path for simulation output directory. :param overwrite: If True, overwrite existing output directory. """ super().__init__( model_ref=model_ref, sim_output=sim_output, overwrite=overwrite ) from ogs.OGSSimulator import OGSSimulation self._capture = ( _NativeOutputCapture( file_path=Path(self.result.next_target) / "log.txt" ) if model_ref.execution.write_logs else None ) # TODO: Apply all model execution parameters log_level = model_ref.execution.log_level extra_args = model_ref.execution.args self._args_list = [ "", str(model_ref.project.prjfile), "-m", str(model_ref.meshes.active_target), "-o", str(self.result.next_target), *(["-l", log_level] if log_level is not None else []), *(str(extra_args).split() if extra_args is not None else []), ] try: if self._capture: self._capture.start() self.sim = OGSSimulation(self._args_list) if self.sim is None: # TODO:: Issue #3589 - OGSSimulation cannot be initialized multiple times in parallel msg = ( "OGSSimulation initialization failed (returned None). " "This may occur when multiple interactive simulations " "are initialized in parallel (Issue #3589)." ) raise OGSSimulationInitializationError(msg) self._status = SimulationController.Status.running except OGSSimulationInitializationError: self._status = SimulationController.Status.error raise except Exception: self._status = SimulationController.Status.error print(self.status_str()) self.runtime_start = time.time() self.runtime_end: float | None = None
@property def status(self) -> SimulationStatus: """Get the current simulation status.""" return self._status
[docs] def terminate(self) -> bool: """ Terminate the simulation immediately. Closes the OGS simulator and stops log capture if active. :returns: True if termination was successful. """ self.runtime_end = time.time() ret = self.sim.close() if self._capture: self._capture.stop() self._status = SimulationStatus.done return ret
[docs] def run( self, target: Path | str | None = None, id: str | None = None ) -> "Simulation": """ Run the simulation to completion. Executes time steps until the simulation reaches end_time or encounters an error. After completion, closes the simulator and returns a Simulation object. :param target: Optional path for the simulation output directory. :param id: Optional identifier for the resulting Simulation. :returns: A :class:`ogstools.Simulation` object containing the completed simulation. """ while ( self.current_time < self.end_time and not self.is_interrupted and self._status == self.Status.running ): self._status = self.execute_time_step() sleep(0.01) # Must have, if we want to pause the simulation self.sim.close() if self._capture: self._capture.stop() self._status = ( self.Status.done if self._status == self.Status.running else self.Status.error ) self.runtime_end = time.time() ret_code = 0 if self._status == self.Status.done else 1 (self.result.next_target / "returncode").write_text(str(ret_code)) return self._create_simulation(target=target, id=id)
@property def current_time(self) -> float: """ Get the current model time. :returns: Current time value in s. """ return self.sim.current_time() @property def end_time(self) -> float: """ Get the configured model end time. :returns: End time value in s. """ return self.sim.end_time()
[docs] def execute_time_step(self) -> "SimulationStatus": """ Execute a single time step of the simulation. Advances the simulation by one time step and updates the status. :returns: The updated SimulationStatus after executing the time step. """ status = self.sim.execute_time_step() if status == 0: # SUCCESS self._status = SimulationController.Status.running else: self._status = SimulationController.Status.error return self._status
[docs] def mesh( self, name: str, variables: Sequence[str] | None = None ) -> UnstructuredGrid: """ Retrieve the current mesh state during simulation. :param name: Name of the mesh to retrieve. :param variables: Optional list of variable names to include. If None, includes all variables. :returns: UnstructuredGrid containing the mesh and data. """ from ogstools.mesh.cosim import from_simulator return from_simulator(self.sim, name, variables)
[docs] def status_str(self) -> str: """ Get a human-readable status description. :returns: String describing the current simulation state and runtime. """ match self._status: case SimulationController.Status.running: runtime = time.time() - self.runtime_start stat_str = f"running for {runtime} s." case SimulationController.Status.done: assert self.runtime_end runtime = self.runtime_end - self.runtime_start stat_str = "finished successfully." if runtime > 0: stat_str += f"\nExecution took {runtime} s." case SimulationController.Status.error: stat_str = "terminated with error." stat_str += self.error_report() case SimulationController.Status.not_started: stat_str = "not started." case _: stat_str = "unknown." return "Status: " + stat_str