Source code for ogstools.materiallib.core.component
# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#
import logging
from typing import Any
from ogstools.materiallib.schema.process_schema import PROCESS_SCHEMAS
from .material import Material
from .property import MaterialProperty
logger = logging.getLogger(__name__)
[docs]
class Component:
[docs]
    def __init__(
        self,
        material: Material,
        phase_type: str,
        role: str,  # This materials role in the phase, e.g. 'solute' or 'solvent, etc.
        process: str,
        diffusion_coefficient: float,
    ):
        self.material = material
        self.phase_type = phase_type
        self.role = role
        self.name = material.name
        schema = PROCESS_SCHEMAS.get(process)
        if not schema:
            msg = f"No process schema found for '{process}'."
            raise ValueError(msg)
        self.schema: dict[str, Any] = schema
        if self.phase_type == "Gas" and self.role == "Vapour":
            self.D = diffusion_coefficient
            logger.info(
                "Binary diffusion coefficient (Component '%s'): %s",
                self.name,
                self.D,
            )
        else:
            self.D = 0.0
        self.properties: list[MaterialProperty] = (
            self._get_filtered_properties()
        ) 
    def _get_filtered_properties(self) -> list[MaterialProperty]:
        """
        This method filters the material properties based on the process schema
        and the role (gas or liquid).
        """
        required_properties = set()
        logger.debug("===============================================")
        # Process schema check and filter required properties
        if self.schema:
            logger.debug(
                "Processing schema for phase type: [bold green]%s[/bold green], for [bold green]%s[/bold green] as: [bold green]%s[/bold green]",
                self.phase_type,
                self.name,
                self.role,
            )
            for phase_def in self.schema.get("phases", []):
                if phase_def.get("type") == self.phase_type:
                    logger.debug(
                        "Found phase definition for %s", self.phase_type
                    )
                    components = phase_def.get("components", {})
                    if self.role in components:
                        logger.debug(
                            "Found component role '%s': %s",
                            self.role,
                            components[self.role],
                        )
                        required_properties.update(components[self.role])
            filtered_properties = []
            for name in required_properties:
                if name == "diffusion":
                    logger.debug(
                        "Inserting binary diffusion coefficient for '%s': D = %s",
                        self.name,
                        self.D,
                    )
                    prop = MaterialProperty(
                        name="diffusion", type_="Constant", value=self.D
                    )
                    filtered_properties.append(prop)
                else:
                    for prop in self.material.properties:
                        if prop.name == name:
                            filtered_properties.append(prop)
                            break
        loaded = {prop.name for prop in filtered_properties}
        missing = required_properties - loaded
        if missing:
            msg = f"Missing required Component properties in material '{self.material.name}': {missing}"
            raise ValueError(msg)
        logger.debug("Loaded %s properties", len(filtered_properties))
        logger.debug(filtered_properties)
        logger.debug("===============================================\n")
        return filtered_properties
[docs]
    def validate(self) -> bool:
        # Look up phase schema
        schema_phases = self.schema.get("phases", [])
        matching_phase = next(
            (p for p in schema_phases if p.get("type") == self.phase_type),
            None,
        )
        if not matching_phase:
            msg = f"Component '{self.name}' is in a phase '{self.phase_type}' not allowed for process."
            raise ValueError(msg)
        # Role check
        components_schema = matching_phase.get("components", {})
        if self.role not in components_schema:
            msg = f"Component '{self.name}' with role '{self.role}' not allowed in phase '{self.phase_type}'."
            raise ValueError(msg)
        # Allowed property names for this role
        allowed_props = set(
            components_schema[self.role] or []
        )  # ← may be empty
        actual_props = {p.name for p in self.properties}
        missing = allowed_props - actual_props
        extra = actual_props - allowed_props
        if missing or extra:
            msg = f"Component '{self.name}' in phase '{self.phase_type}' (role '{self.role}') is invalid.\n"
            if missing:
                msg += f"  Missing properties: {sorted(missing)}\n"
            if extra:
                msg += f"  Unexpected properties: {sorted(extra)}"
            raise ValueError(msg)
        return True 
    # -----------------------
    # Representation
    # -----------------------
    def __repr__(self) -> str:
        lines = [
            f"<Component '{self.name}' (Role: {self.role}, Phase: {self.phase_type})>"
        ]
        if self.properties:
            lines.append(f"  ├─ {len(self.properties)} properties:")
            for prop in self.properties:
                lines.append("  │   " + repr(prop))
        else:
            lines.append("  └─ no properties")
        return "\n".join(lines)