Source code for ogstools.materiallib.core.material

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import copy
import logging
import warnings
from collections.abc import Iterator, Mapping
from pathlib import Path
from typing import Any

import yaml

from ogstools._internal import deprecated
from ogstools.materiallib.schema.required_properties import (
    required_property_names,
)

from .property import MaterialProperty

logger = logging.getLogger(__name__)


[docs] class Material(Mapping[str, MaterialProperty]): """ Represents a single material. - Can be constructed directly from YAML raw data. - Provides access to all properties. - Supports filtering by process schemas or property names. """ __hash__ = None # type: ignore[assignment] # Mutable with __eq__
[docs] def __init__(self, name: str, raw_data: dict[str, Any]): self.name = name self.raw = raw_data # full YAML (e.g. for debugging or export) self.properties: list[MaterialProperty] = [] self._parse_properties()
[docs] @classmethod def from_file(cls, file_path: str | Path) -> Material | None: """Create a Material from a YAML file or return None if invalid.""" with Path(file_path).open(encoding="utf-8") as file: raw_data = yaml.safe_load(file) if not isinstance(raw_data, dict): logger.debug("Skipping invalid YAML file: %s", file_path) return None name = raw_data.get("name") if not isinstance(name, str): logger.debug( "Skipping YAML file without valid 'name': %s", file_path ) return None return cls(name=name, raw_data=raw_data)
[docs] def to_file(self, file_path: str | Path) -> None: """Write this Material to a YAML file.""" output_data = dict(self.raw) output_data["name"] = self.name with Path(file_path).open("w", encoding="utf-8") as file: yaml.safe_dump(output_data, file, sort_keys=False)
def _parse_properties(self) -> None: block = self.raw.get("properties", {}) if not block: logger.debug("Material %s has no properties", self.name) for prop_name, entries in block.items(): for entry in entries if isinstance(entries, list) else [entries]: type_ = entry.get( "type", "Constant" ) # Todo - Error if 'type' not found value = entry.get("value", None) extra = { k: v for k, v in entry.items() if k not in ("type", "value") } self.properties.append( MaterialProperty( name=prop_name, type_=type_, value=value, **extra ) )
[docs] def __getitem__(self, key: str) -> MaterialProperty: for p in self.properties: if p.name == key: return p msg = ( f"No property with name {key} found. Available properties are: " + ", ".join(self) ) raise KeyError(msg)
def __iter__(self) -> Iterator[str]: return iter(dict.fromkeys(p.name for p in self.properties)) def __len__(self) -> int: return len(self.properties) def __bool__(self) -> bool: return bool(self.name) @property def property_names(self) -> list[str]: """Returns a list of all property names of this material.""" return list(self)
[docs] @deprecated(""": use mat[key] instead.""") def get_property(self, key: str) -> MaterialProperty: warnings.warn( "get_property() is deprecated, use mat[key] instead.", DeprecationWarning, stacklevel=2, ) return self[key]
[docs] def filter_process(self, process_schema: dict[str, Any]) -> Material: """ Return a new Material containing only properties required by a given process schema. """ allowed = required_property_names(process_schema) return self.filter_properties(allowed)
[docs] def filter_properties( self, allowed: set[str] | str, key: str = "name" ) -> Material: """ Return a new Material containing only the properties in 'allowed', preserving all extra fields (e.g. scope, unit). :param allowed: values to filter for :param key: attribute to filter for (e.g. 'name' or 'type') """ if isinstance(allowed, str): allowed = {allowed} def prop_attr(p: MaterialProperty, key: str) -> Any: if key in ["name", "type", "value"]: return getattr(p, key) if key not in p.extra: msg = f"Property {p.name} has no attribute called '{key}'." raise KeyError(msg) return p.extra[key] filtered_props = [ p for p in self.properties if prop_attr(p, key) in allowed ] logger.debug( "Material %s: filtered %d/%d properties (%s)", self.name, len(filtered_props), len(self.properties), ", ".join(p.name for p in filtered_props), ) # Build a raw_data dict with lists if multiple entries share the same name raw_block: dict[str, list[dict[str, Any]]] = {} for p in filtered_props: entry = {"type": p.type, "value": p.value, **p.extra} raw_block.setdefault(p.name, []).append(entry) filtered_raw = {"name": self.name, "properties": raw_block} # Create a new Material that parses only the filtered_raw return Material(name=self.name, raw_data=filtered_raw)
def __eq__(self, other: object) -> bool: if not isinstance(other, Material): return NotImplemented def sort_key(d: dict) -> list: return sorted((k, str(v)) for k, v in d.items()) return self.name == other.name and sorted( [p.to_dict() for p in self.properties], key=sort_key ) == sorted([p.to_dict() for p in other.properties], key=sort_key)
[docs] def copy(self) -> Material: """Return a deep copy""" return copy.deepcopy(self)
# ----------------------- # Representation # ----------------------- def __repr__(self) -> str: return ( f"<Material '{self.name}' with {len(self.properties)} properties>" ) def __str__(self) -> str: lines = [repr(self)] for p in self.properties: lines.append(f" {p}") return "\n".join(lines)