Source code for ogstools.core.model

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause


import copy
import typing
from pathlib import Path
from tempfile import mkdtemp

import numpy as np
from matplotlib import pyplot as plt

from ogstools.meshes._meshes import Meshes
from ogstools.ogs6py.project import Project

from .execution import Execution
from .simulation_controller import SimulationController
from .storage import StorageBase

if typing.TYPE_CHECKING:
    from .simulation import Simulation


[docs] class Model(StorageBase): """ A complete OGS model combining project file, meshes, and execution settings. The Model class integrates all components needed to run an OGS simulation: - A project file (prj) defining the simulation setup - Meshes required by the simulation - Execution parameters (parallelization, logging, etc.) Models can be created from individual components, loaded from disk, or initialized from existing folder structures following OGSTools conventions. """ __hash__ = None
[docs] def __init__( self, project: Project | Path | str, meshes: Meshes | Path | str | None = None, execution: Execution | Path | str | None = None, id: str | None = None, ) -> None: """ Initialize a Model object. :param project: Project object or path to a .prj file. If a path is given, the Project will be loaded from that file. :param meshes: Meshes object or path to a folder containing mesh files. If None, attempts to locate meshes in standard locations relative to the project file (same folder or 'meshes' subfolder). :param execution: Execution object or path to an execution.yaml file. If None, creates a default Execution configuration. :param id: Optional unique identifier for this model. :raises ValueError: If meshes cannot be found or located automatically. :raises FileNotFoundError: If specified paths do not exist. """ super().__init__("Model", id=id) if isinstance(project, Project): self.project = project else: project_path = Path(project) if project_path.is_dir(): self.project = Project.from_folder(project_path) else: self.project = Project(input_file=project_path) if isinstance(meshes, Meshes): self.meshes = meshes elif isinstance(meshes, Path | str): meshes = Path(meshes) meta_file = meshes / "meta.yaml" if meta_file.exists(): self.meshes = Meshes.from_folder(meshes) else: meshes_files = self.project.meshpaths(meshes) self.meshes = Meshes.from_files(meshes_files) else: # None assert self.project.input_file # Last resort - Possible conventions to try mesh_locations = [ self.project.input_file.parent, self.project.input_file.parent / "meshes", ] # Test if all meshes in a given folder exist def all_meshes_exist(base: Path) -> bool: return bool( np.all([p.exists() for p in self.project.meshpaths(base)]) ) for base in mesh_locations: if all_meshes_exist(base): meta_file = base / "meta.yaml" if meta_file.exists(): self.meshes = Meshes.from_folder(base) else: meshes_files = self.project.meshpaths(base) self.meshes = Meshes.from_files( meshes_files, domain_key=meshes_files[0].stem ) break else: loc_str = ", ".join(str(b) for b in mesh_locations) msg = f"Not all meshes found. Tried: {loc_str}. Put the meshes in these locations or define Meshes when initializing Model." raise ValueError(msg) if isinstance(execution, Execution): self.execution = execution elif isinstance(execution, Path | str): self.execution = Execution.from_file(execution) else: # None self.execution = Execution.from_default()
# Already initialized as not saved and user_specified_target=False
[docs] @classmethod def from_folder(cls, folder: Path | str) -> "Model": """ Initialize a Model from a folder following OGSTools conventions. Expects the folder structure created by Model.save(): - project/: Subfolder containing project.prj and associated files - meshes/: Subfolder containing mesh files - execution.yaml: Execution configuration :param folder: Path to the folder containing the model files. :returns: A :class:`ogstools.Model` object initialized from the folder contents. :raises FileNotFoundError: If required files are not found in the folder. """ folder = Path(folder) if not folder.exists(): msg = f"The folder {folder!r} to load the model does not exist." raise FileNotFoundError(msg) project = folder / "project" if not project.exists(): project = folder / "default.prj" # backward compat meshes = folder / "meshes" execution = folder / "execution.yaml" model = cls(project=project, meshes=meshes, execution=execution) model._bind_to_path(folder) return model
[docs] @classmethod def from_id(cls, model_id: str) -> "Model": """ Load a Model from the user storage path using its ID. :param model_id: The unique ID of the :class:`ogstools.Model` to load. :returns: A :class:`ogstools.Model` instance with Project and Meshes loaded from disk. """ model_folder = StorageBase.saving_path() / "Model" / model_id if not model_folder.exists(): msg = f"No model found at {model_folder}" raise FileNotFoundError(msg) model = cls.from_folder(model_folder) model._id = model_id return model
def _propagate_target(self) -> None: """ If for this object a saving location was given but for the subobjects (Meshes and Project) not the saving location for the subobjects is derived from the saving location of this object """ if not self.meshes.user_specified_target: self.meshes._next_target = self.next_target / "meshes" self.meshes._propagate_target() if not self.project.user_specified_target: self.project._next_target = self.next_target / "project" self.project._propagate_target() if not self.execution.user_specified_target: self.execution._next_target = self.next_target / "execution.yaml" self.execution._propagate_target() def __eq__(self, other: object) -> bool: if not isinstance(other, Model): return NotImplemented # most expensive at last return ( self.execution == other.execution and self.project == other.project and self.meshes == other.meshes ) def _save_impl( self, dry_run: bool = False, overwrite: bool | None = None ) -> list[Path]: if self.execution.mpi_ranks is not None: self.meshes.add_partitions(self.execution.mpi_ranks) files: list[Path] = [] files += self._save_or_link_child( self.meshes, self.next_target / "meshes", dry_run, overwrite ) files += self._save_or_link_child( self.project, self.next_target / "project", dry_run, overwrite ) files += self._save_or_link_child( self.execution, self.next_target / "execution.yaml", dry_run, overwrite, ) envrc = self.next_target / ".envrc" if not dry_run: envrc.write_text(f'export cmd="{self.cmd}"\n') files.append(envrc) return files
[docs] def save( self, target: Path | str | None = None, overwrite: bool | None = None, dry_run: bool = False, archive: bool = False, id: str | None = None, ) -> list[Path]: """ Save the Model to disk, including Project, Meshes, and Execution config. Creates a folder structure containing: - project/: Subfolder with project.prj and associated files - meshes/: Subfolder with mesh files - execution.yaml: Execution configuration By default, uses symlinks for efficiency. Use archive=True to create a standalone copy with all data materialized. :param target: Path to the folder where the model should be saved. If None, uses a default location based on the model ID. :param overwrite: If True, existing files are overwritten. If False, raises an error if files already exist. :param dry_run: If True, simulates the save without writing files, but returns the list of files that would be created. :param archive: If True, materializes all symlinks by copying referenced data (may be time and space intensive). :param id: Optional identifier. Mutually exclusive with target. :returns: List of Paths to saved files (including meshes, project, and execution configuration). """ user_defined = self._pre_save(target, overwrite, dry_run, id=id) files = self._save_impl(dry_run=dry_run, overwrite=overwrite) self._post_save(user_defined, archive, dry_run) return files
[docs] def run( self, target: Path | str | None = None, overwrite: bool | None = None, id: str | None = None, ) -> "Simulation": """ Run a simulation to completion and wait for it to finish. This is a convenience method that starts the simulation and blocks until it completes. For stepwise control, use start() instead. :param target: Optional path for the resulting Simulation. If None, uses a default location. :param overwrite: If True, overwrite existing output directory. :param id: Optional identifier for the resulting Simulation. :returns: A :class:`ogstools.Simulation` object containing the completed simulation results and metadata. """ if id is not None and target is None: target = StorageBase.saving_path() / "Simulation" / id target_result = Path(target) if target else None sim_controller = self.controller( sim_output=target_result, overwrite=overwrite ) return sim_controller.run(target=target, id=id)
[docs] def controller( self, sim_output: Path | str | None = None, dry_run: bool = False, overwrite: bool | None = None, ) -> SimulationController: """ Start a simulation and return a controller for execution management. The type of controller returned depends on the execution configuration: - OGSInteractiveController: Allows stepwise control (execute_time_step, inspect intermediate results) when execution.interactive is True - OGSNativeController: Runs to completion when execution.interactive is False :param sim_output: Optional path where simulation should be written. If None, uses a default location. :param dry_run: If True, prints the command that would be executed but does not actually run the simulation. :returns: A SimulationController for managing the simulation. """ sim_result = ( Path(sim_output) / "result" if sim_output is not None else None ) sim_model = ( Path(sim_output) / "model" if sim_output is not None else None ) # ToDo Could also check if Model differs between the saved Model and the provided object if not self.is_saved: self._next_target, _ = self._target_for_save(sim_model) self._propagate_target() # self._pre_save() self._save_impl(overwrite=overwrite, dry_run=dry_run) self._post_save(False, False, False) if dry_run: msg = "Dry run not implemented" raise NotImplementedError(msg) if self.execution.interactive: from .interactive_simulation_controller import ( OGSInteractiveController, ) return OGSInteractiveController( model_ref=self, sim_output=sim_result, overwrite=overwrite ) from .native_simulation_controller import OGSNativeController return OGSNativeController( model_ref=self, sim_output=sim_result, overwrite=overwrite )
def _component_repr( self, obj: typing.Any, name: str, load_method: str ) -> str: """Helper to generate repr string for a component (project/meshes/execution).""" if obj.user_specified_id: return f'{name}.from_id("{obj.id}")' if obj.is_saved: path = str(obj.active_target) return f"{name}.{load_method}({path!r})" # Not saved yet path = str(obj.next_target) return f"{name}.{load_method}({path!r})" def __repr__(self) -> str: cls = self.__class__.__name__ base_repr = super().__repr__() if self.user_specified_id: construct = f'{cls}.from_id("{self._id}")' elif self.is_saved: construct = f"{cls}.from_folder({str(self.active_target)!r})" else: # Show how to reconstruct children project_construct = self._component_repr( self.project, "Project", "from_file" ) meshes_construct = self._component_repr( self.meshes, "Meshes", "from_folder" ) execution_construct = self._component_repr( self.execution, "Execution", "from_file" ) construct = ( f"{cls}(\n" f" project={project_construct},\n" f" meshes={meshes_construct},\n" f" execution={execution_construct}\n" f")" ) return f"{construct}\n{base_repr}" def __str__(self) -> str: base_str = super().__str__() lines = [ base_str, f" {self._component_status_str(self.project, 'Project')}", f" {self._component_status_str(self.meshes, 'Meshes')}", f" {self._component_status_str(self.execution, 'Execution')}", ] return "\n".join(lines) def __deepcopy__(self, memo: dict) -> "Model": # Avoid duplicate copies if id(self) in memo: return memo[id(self)] project = copy.deepcopy(self.project, memo) meshes = copy.deepcopy(self.meshes, memo) execution = copy.deepcopy(self.execution, memo) new = self.__class__( project=project, meshes=meshes, execution=execution, ) memo[id(self)] = new return new @property def cmd(self) -> str: """Get the full OGS command (without output path).""" exe = self.execution meshes_path = self.meshes.active_target or self.meshes.next_target if exe.mpi_ranks is not None and exe.mpi_ranks > 1: meshes_path = meshes_path / "partition" / str(exe.mpi_ranks) return f"{exe.cmd} -m {meshes_path} {self.project.prjfile}"
[docs] def plot_constraints(self, **kwargs: typing.Any) -> plt.Figure: """Plot the meshes with annotated boundary conditions and source terms. keyword arguments: see :func:`~ogstools.plot.contourf` """ meshes = self.meshes tmp_path = Path(mkdtemp(prefix="plot_constraints")) if ( self.project.geometry and self.project.geometry.active_target is not None ): gml_file = self.project.geometry.active_target assert gml_file is not None if not meshes.is_saved: meshes.save() meshes.add_gml_subdomains( self.project.meshpaths(self.meshes.active_target)[0], gml_file, tmp_path, ) constraints = self.project.constraints_labels() unused = set(meshes.subdomains.keys()) - set(constraints.keys()) for subdomain in unused: meshes.pop(subdomain) from ogstools.plot import setup loc = kwargs.pop("loc", "upper left") bbox = kwargs.pop("bbox_to_anchor", (1.05, 1)) fontsize = kwargs.get("fontsize", setup.fontsize) leg_fontsize = kwargs.get("leg_fontsize", 0.9 * fontsize) fig = meshes.plot(**kwargs, loc=loc, bbox_to_anchor=bbox) ax: plt.Axes = fig.axes[0] handles, labels = ax.get_legend_handles_labels() for meshname, label in constraints.items(): idx = labels.index(meshname) labels[idx] = label ax.legend( handles, labels, loc=loc, fontsize=leg_fontsize, bbox_to_anchor=bbox, borderaxespad=0.0, numpoints=1, ) # fmt: skip return fig