Source code for ogstools.materiallib.core.component

# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#

import logging
from typing import Any

from ogstools.materiallib.schema.process_schema import PROCESS_SCHEMAS

from .material import Material
from .property import MaterialProperty

logger = logging.getLogger(__name__)


[docs] class Component:
[docs] def __init__( self, material: Material, phase_type: str, role: str, # This materials role in the phase, e.g. 'solute' or 'solvent, etc. process: str, diffusion_coefficient: float, ): self.material = material self.phase_type = phase_type self.role = role self.name = material.name schema = PROCESS_SCHEMAS.get(process) if not schema: msg = f"No process schema found for '{process}'." raise ValueError(msg) self.schema: dict[str, Any] = schema if self.phase_type == "Gas" and self.role == "Vapour": self.D = diffusion_coefficient logger.info( "Binary diffusion coefficient (Component '%s'): %s", self.name, self.D, ) else: self.D = 0.0 self.properties: list[MaterialProperty] = ( self._get_filtered_properties() )
def _get_filtered_properties(self) -> list[MaterialProperty]: """ This method filters the material properties based on the process schema and the role (gas or liquid). """ required_properties = set() logger.debug("===============================================") # Process schema check and filter required properties if self.schema: logger.debug( "Processing schema for phase type: [bold green]%s[/bold green], for [bold green]%s[/bold green] as: [bold green]%s[/bold green]", self.phase_type, self.name, self.role, ) for phase_def in self.schema.get("phases", []): if phase_def.get("type") == self.phase_type: logger.debug( "Found phase definition for %s", self.phase_type ) components = phase_def.get("components", {}) if self.role in components: logger.debug( "Found component role '%s': %s", self.role, components[self.role], ) required_properties.update(components[self.role]) filtered_properties = [] for name in required_properties: if name == "diffusion": logger.debug( "Inserting binary diffusion coefficient for '%s': D = %s", self.name, self.D, ) prop = MaterialProperty( name="diffusion", type_="Constant", value=self.D ) filtered_properties.append(prop) else: for prop in self.material.properties: if prop.name == name: filtered_properties.append(prop) break loaded = {prop.name for prop in filtered_properties} missing = required_properties - loaded if missing: msg = f"Missing required Component properties in material '{self.material.name}': {missing}" raise ValueError(msg) logger.debug("Loaded %s properties", len(filtered_properties)) logger.debug(filtered_properties) logger.debug("===============================================\n") return filtered_properties
[docs] def validate(self) -> bool: # Look up phase schema schema_phases = self.schema.get("phases", []) matching_phase = next( (p for p in schema_phases if p.get("type") == self.phase_type), None, ) if not matching_phase: msg = f"Component '{self.name}' is in a phase '{self.phase_type}' not allowed for process." raise ValueError(msg) # Role check components_schema = matching_phase.get("components", {}) if self.role not in components_schema: msg = f"Component '{self.name}' with role '{self.role}' not allowed in phase '{self.phase_type}'." raise ValueError(msg) # Allowed property names for this role allowed_props = set( components_schema[self.role] or [] ) # ← may be empty actual_props = {p.name for p in self.properties} missing = allowed_props - actual_props extra = actual_props - allowed_props if missing or extra: msg = f"Component '{self.name}' in phase '{self.phase_type}' (role '{self.role}') is invalid.\n" if missing: msg += f" Missing properties: {sorted(missing)}\n" if extra: msg += f" Unexpected properties: {sorted(extra)}" raise ValueError(msg) return True
# ----------------------- # Representation # ----------------------- def __repr__(self) -> str: lines = [ f"<Component '{self.name}' (Role: {self.role}, Phase: {self.phase_type})>" ] if self.properties: lines.append(f" ├─ {len(self.properties)} properties:") for prop in self.properties: lines.append(" │ " + repr(prop)) else: lines.append(" └─ no properties") return "\n".join(lines)