Source code for ogstools.meshlib.meshes_from_yaml

# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#

import math
from pathlib import Path
from tempfile import mkdtemp
from typing import Any

import gmsh
import yaml  # type: ignore[import]
from simpleeval import simple_eval


def _load_geometry_from_yaml(path: Path) -> dict[str, Any]:
    """Load geometry definition from YAML file."""
    with path.open("r", encoding="utf-8") as f:
        return yaml.safe_load(f)


def _make_eval_context(
    params: dict[str, Any]
) -> tuple[dict[str, Any], dict[str, Any]]:
    """Prepare context for simple_eval with math constants and functions."""
    names = {
        **params,
        **{k: v for k, v in vars(math).items() if not k.startswith("_")},
    }
    functions = {k: v for k, v in vars(math).items() if callable(v)}
    return names, functions


def _evaluate_expr(
    expr: Any, params: dict[str, Any]
) -> int | float | str | list:
    """
    Evaluates an expression (str, list, int, float).
    Allows mathematical functions and variables.
    """
    names, functions = _make_eval_context(params)

    if isinstance(expr, (int | float)):
        return expr

    if isinstance(expr, str):
        try:
            return simple_eval(expr, names=names, functions=functions)
        except Exception as e:
            msg = f"Error evaluating expression '{expr}': {e}"
            raise ValueError(msg) from e

    if isinstance(expr, list):
        results = []
        for item in expr:
            if isinstance(item, str):
                try:
                    results.append(
                        simple_eval(item, names=names, functions=functions)
                    )
                except Exception as e:
                    msg = f"Error evaluating expression '{item}': {e}"
                    raise ValueError(msg) from e
            else:
                results.append(item)
        return results

    msg = f"Unsupported type for expression: {type(expr)}. Expected str, list, int, or float."
    raise TypeError(msg)


def _evaluate_parameters(params: dict[str, Any]) -> dict[str, Any]:
    """
    Evaluate all string expressions in the params dict using _evaluate_expr.
    Updates the dict in-place with numeric results.
    Raises ValueError with a consistent message if evaluation fails.
    """
    for key, value in list(params.items()):
        if isinstance(value, str):
            try:
                params[key] = _evaluate_expr(value, params)
            except Exception as e:
                msg = f"Failed to evaluate parameter '{key}': {e}"
                raise ValueError(msg) from e
    return params


def _validate_block(geometry: dict[str, Any], block_name: str) -> None:
    if block_name not in geometry:
        msg = f"Missing top-level block: '{block_name}'"
        raise ValueError(msg)

    block = geometry[block_name]
    if not isinstance(block, dict):
        msg = f"Block '{block_name}' must be a mapping, got {type(block).__name__}"
        raise ValueError(msg)

    for name, definition in block.items():
        if not isinstance(definition, dict):
            msg = f"{block_name[:-1].capitalize()} '{name}' must be a mapping, got {type(definition).__name__}"
            raise ValueError(msg)


def _validate_geometry_schema(geometry: dict[str, Any]) -> None:
    """Ensure required blocks exist and have the correct types."""
    required_blocks = ["points", "lines", "surfaces", "groups"]
    for key in required_blocks:
        _validate_block(geometry, key)


[docs] def meshes_from_yaml( geometry_file: Path, output_dir: Path | None = None ) -> Path: """ Generate a 2D mesh from a YAML geometry file using gmsh. """ output_dir = output_dir or Path(mkdtemp()) geometry = _load_geometry_from_yaml(geometry_file) _validate_geometry_schema(geometry) params = _evaluate_parameters(geometry.get("parameters") or {}) gmsh.initialize() gmsh.model.add("simple_mesh") # --- Points --- gmsh_points: dict[str, int] = {} for name, val in geometry["points"].items(): if "coords" not in val: msg = f"Incomplete point definition for {name}" raise ValueError(msg) coords = val["coords"] # make sure coords is a list or a tuple. if not isinstance(coords, (list | tuple)): msg = f"Point {name}: 'coords' must be a list of [x, y], got {type(coords).__name__}" raise ValueError(msg) if len(coords) != 2: msg = f"Point {name}: 'coords' must have exactly 2 entries [x, y], but got {len(coords)} values: {coords}" raise ValueError(msg) x, y = coords x = _evaluate_expr(x, params) y = _evaluate_expr(y, params) char_length = _evaluate_expr(val["char_length"], params) gmsh_points[name] = gmsh.model.geo.addPoint(x, y, 0.0, char_length) # --- Lines & arcs --- gmsh_lines: dict[str, int] = {} for name, val in geometry["lines"].items(): start = gmsh_points[val["start"]] end = gmsh_points[val["end"]] if "center" in val: center = gmsh_points[val["center"]] gmsh_lines[name] = gmsh.model.geo.addCircleArc(start, center, end) else: gmsh_lines[name] = gmsh.model.geo.addLine(start, end) # --- Surfaces --- gmsh_surfaces: dict[str, int] = {} for sname, sdef in geometry["surfaces"].items(): if "loop" in sdef: loop = [ ( -gmsh_lines[lref[1:]] if lref.startswith("-") else gmsh_lines[lref] ) for lref in sdef["loop"] ] loops = [gmsh.model.geo.addCurveLoop(loop)] elif "loops" in sdef: loops = [] for subloop in sdef["loops"]: loop = [ ( -gmsh_lines[lref[1:]] if lref.startswith("-") else gmsh_lines[lref] ) for lref in subloop ] loops.append(gmsh.model.geo.addCurveLoop(loop)) else: msg = f"Surface {sname} has no 'loop' or 'loops'." raise ValueError(msg) gmsh_surfaces[sname] = gmsh.model.geo.addPlaneSurface(loops) gmsh.model.geo.synchronize() # --- Groups --- for gname, gdef in geometry["groups"].items(): if gdef["dim"] == 2: entity_ids = [gmsh_surfaces[e] for e in gdef["entities"]] elif gdef["dim"] == 1: entity_ids = [gmsh_lines[e] for e in gdef["entities"]] elif gdef["dim"] == 0: entity_ids = [gmsh_points[e] for e in gdef["entities"]] else: msg = f"Unsupported group dimension: {gdef['dim']}" raise NotImplementedError(msg) tag = gmsh.model.addPhysicalGroup(gdef["dim"], entity_ids) gmsh.model.setPhysicalName(gdef["dim"], tag, gname) # --- Mesh output --- gmsh.model.mesh.generate(2) logs = gmsh.logger.get() for msg in logs: print("GMSH:", msg) output_file = Path(output_dir) / "mesh.msh" output_file.parent.mkdir(parents=True, exist_ok=True) gmsh.write(str(output_file)) gmsh.finalize() return output_file
__all__ = ["meshes_from_yaml"]