Source code for ogstools.feflowlib._feflowlib

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause

import logging as log
from pathlib import Path

import ifm_contrib as ifm
import numpy as np
import pyvista as pv

ifm.forceLicense("Viewer")

# log configuration
logger = log.getLogger(__name__)


def points_and_cells(doc: ifm.FeflowDoc) -> tuple[np.ndarray, list, list]:
    """
    Get points and cells in a pyvista compatible format.

    :param doc: The FEFLOW data.
    :returns: pts, cells, celltypes (points, cells, celltypes)
    """
    # Define variables
    cell_type_dict = {
        2: {3: pv.CellType.TRIANGLE, 4: pv.CellType.QUAD},
        3: {
            4: pv.CellType.TETRA,
            5: pv.CellType.PYRAMID,
            6: pv.CellType.WEDGE,
            8: pv.CellType.HEXAHEDRON,
        },
    }
    dimension = doc.getNumberOfDimensions()
    # Get a list of all cells/elements and reverse it for correct node order in OGS
    elements = doc.c.mesh.get_imatrix()
    # Write the amount of nodes per element to the first entry of each list
    # of nodes. This is needed for pyvista !
    # Could also be done with np.hstack([len(ele)]*len(elements,elements))
    # Also define the celltypes.
    celltypes = []
    for i, element in enumerate(elements):
        nElement = len(element)
        celltype = cell_type_dict[dimension][nElement]
        celltypes.append(celltype)
        if celltype == pv.CellType.PYRAMID:
            elements[i] = element[:4][::-1] + element[4:]
        elements[i].insert(0, nElement)
    # Write the list for all points and their global coordinates.
    if dimension == 2:
        points = doc.c.mesh.df.nodes(global_cos=True)
        pts = points[["X", "Y"]].to_numpy()
        # A 0 is appended since in pyvista points must be given in 3D.
        # So we set the Z-coordinate to 0.
        pts = np.pad(pts, [(0, 0), (0, 1)])
    elif dimension == 3:
        points = doc.c.mesh.df.nodes(
            global_cos=True, par={"Z": ifm.Enum.P_ELEV}
        )
        pts = points[["X", "Y", "Z"]].to_numpy()
    else:
        msg = "The input data is neither 2D nor 3D, which it needs to be."
        raise ValueError(msg)

    # Log information
    logger.info(
        "There are %s number of points and %s number of cells to be converted.",
        len(pts),
        len(celltypes),
    )
    cells = [point for cell in elements for point in cell]
    return pts, cells, celltypes


def _material_ids_from_selections(
    doc: ifm.FeflowDoc,
) -> np.ndarray:
    """
    Get MaterialIDs from the FEFLOW data. Only applicable if they are
    saved in doc.c.sel.selections().

    :param doc: The FEFLOW data.
    :returns: MaterialIDs
    """
    # Note: an error occurs if there are no elements defined to the selection

    # 1. define necessary variables
    mat_ids = []
    elements = []
    dict_matid = {}
    mat_id = 0

    # 2. try to overcome ValueError for selec not referring to element
    for selec in doc.c.sel.selections():
        try:
            # 2.1 write a list of elements that refer to a material that
            # is defined in selections (mat__elements)
            # this call can cause an ValueError since not all selections refer to elements
            # but all elemental selection refer to material_ids
            elements.append(
                doc.c.mesh.df.elements(selection=selec).index.values
            )
            # 2.2 write a list with a corresponding id for
            # that material (mat_id)
            mat_ids.append(int(mat_id))
            # 2.3 write a dictionary to understand which id refers
            # to which selection
            dict_matid[mat_id] = selec
            mat_id += 1
        except ValueError:
            pass

    # 3. write a list of material ids. The id is written to the corresponding
    # entry in the list.
    mat_ids_mesh = [0] * doc.getNumberOfElements()

    for count, value in enumerate(mat_ids):
        for element in elements[count]:
            mat_ids_mesh[element] = value

    # 4. log the dictionary of the MaterialIDs
    logger.info("MaterialIDs refer to: %s", dict_matid)
    # MaterialIDs must be int32
    return np.array(mat_ids_mesh).astype(np.int32)


def fetch_user_data(
    user_data: np.ndarray, geom_type: str, val_type: str
) -> list:
    return [
        data[1]
        for data in user_data
        if data[3] == geom_type and data[2] == val_type
    ]


def _point_and_cell_data(
    MaterialIDs: dict, doc: ifm.FeflowDoc
) -> tuple[dict, dict]:
    """
    Get point and cell data from Feflow data. Also write the MaterialIDs to the
    cell data.

    :param doc: The FEFLOW data.
    :param MaterialIDs:
    :returns: pt_data, cell_data (point and cell data)
    """

    # 1. create a dictionary to filter all nodal and elemental values
    #  according to their name and ifm.Enum value
    items_pts = doc.c.mesh.df.get_available_items(Type="nodal")[
        "Name"
    ].to_dict()
    items_cell = doc.c.mesh.df.get_available_items(Type="elemental")[
        "Name"
    ].to_dict()
    # Get user data, which can be assigned manually in the FEFLOW file.
    # The exception is only needed, if there are no user data assigned.
    try:
        user_data = ifm.contrib_lib.user.UserPd(doc).distributions().to_numpy()
    except KeyError:
        user_data = []
    # 2. swap key and values in these dictionaries to have
    #  the name of the ifm.Enum value as key
    pts_dict = {y: x for x, y in items_pts.items()}
    cell_dict = {y: x for x, y in items_cell.items()}

    # 3. get all the nodal and elemental properties of the mesh as pandas
    # Dataframe and drop nans if a column is full of nans for all the properties
    pt_prop_all = doc.c.mesh.df.nodes(
        global_cos=True,
        par=pts_dict,
        distr=fetch_user_data(user_data, "NODAL", "DISTRIBUTION"),
        expr=fetch_user_data(user_data, "NODAL", "EXPRESSION"),
    )
    pt_prop = pt_prop_all.dropna(axis=1, how="all")
    pt_prop_objects = pt_prop.select_dtypes(include="object")
    for column in pt_prop_objects:
        is_dropable = all(
            np.isnan(sublist).any() for sublist in pt_prop_objects[column]
        )
        if is_dropable:
            pt_prop = pt_prop.drop(labels=column, axis=1)

    cell_prop = doc.c.mesh.df.elements(
        global_cos=True,
        par=cell_dict,
        distr=fetch_user_data(user_data, "ELEMENTAL", "DISTRIBUTION"),
        expr=fetch_user_data(user_data, "ELEMENTAL", "EXPRESSION"),
    ).dropna(axis=1, how="all")
    cell_prop_objects = cell_prop.select_dtypes(include="object")

    for column in cell_prop_objects:
        is_dropable = all(
            np.isnan(sublist).any() for sublist in cell_prop_objects[column]
        )
        if is_dropable:
            cell_prop = cell_prop.drop(labels=column, axis=1)

    # 4. write the pandas Dataframe of nodal and elemental properties to
    #  a dictionary
    pt_data = pt_prop.to_dict("series")
    cell_data = cell_prop.to_dict("series")

    # 5. change format of cell data to a dictionary of lists
    cell_data = {key: [cell_data[key]] for key in cell_data}
    # 6. add MaterialIDs to cell data
    if "MaterialIDs" not in cell_data:
        cell_data["MaterialIDs"] = MaterialIDs
    else:
        # This special treatmant is only to be consistent with the data type MaterialIDs usually have
        # if not coming from user-data.
        cell_data["MaterialIDs"] = np.array(cell_data["MaterialIDs"][0]).astype(
            np.int32
        )
    # if P_LOOKUP_REGION is given and there are more different MaterialIDs given
    # than defined in selections, use P_LOOKUP_REGION for MaterialIDs
    if (
        "P_LOOKUP_REGION" in cell_data
        and len(np.unique(MaterialIDs))
        < len(np.unique(cell_data["P_LOOKUP_REGION"]))
        and "MaterialIDs" not in user_data
    ):
        cell_data["MaterialIDs"] = np.array(
            cell_data.pop("P_LOOKUP_REGION")
        ).astype(np.int32)

    # 7. write a list of all properties that have been dropped due to nans
    nan_arrays = [
        x
        for x in list(pts_dict.keys()) or list(cell_dict.keys())
        if x not in list(pt_data.keys()) and list(cell_data.keys())
    ]

    # 8. log the data arrays
    logger.info(
        "These data arrays refer to point data: %s", list(pt_data.keys())
    )
    logger.info(
        "These data arrays refer to cell data: %s", list(cell_data.keys())
    )
    logger.info(
        "These data arrays have been neglected as they are full of nans: %s",
        nan_arrays,
    )

    return (pt_data, cell_data)


def _convert_to_SI_units(mesh: pv.UnstructuredGrid) -> None:
    """
    FEFLOW often uses days as unit for time. In OGS SI-units are used.
    This is why days must be converted to seconds for properties to work
    correctly in OGS.

    :param mesh: mesh
    """

    arrays_to_be_converted = ["TRAF", "IOFLOW", "P_COND", "P_DIFF"]
    for data in list(mesh.point_data) + list(mesh.cell_data):
        if any(
            to_be_converted in data
            for to_be_converted in arrays_to_be_converted
        ):
            mesh[data] *= 1 / 86400
        elif "4TH" in data or "2ND" in data:
            mesh[data] *= -1 / 86400
        elif "HEAT" in data or "TEMP" in data:
            mesh[data] = mesh[data] + [273.15] * len(mesh[data])
    return mesh


def get_species_parameter(
    doc: ifm.FeflowDoc, mesh: pv.UnstructuredGrid
) -> tuple[dict, dict]:
    """
    Retrieve species parameters from FEFLOW data for points and cells.

    :param doc: The FEFLOW data.
    :param mesh: mesh
    :returns: Dictionaries of point and cell species-specific data.
    """

    # Define common species parameters in FEFLOW.
    species_parameters = [
        "P_BC_MASS",
        "P_BCMASS_2ND",
        "P_BCMASS_3RD",
        "P_BCMASS_4TH",
        "P_CONC",
        "P_DECA",
        "P_DIFF",
        "P_LDIS",
        "P_PORO",
        "P_SORP",
        "P_TDIS",
        "P_TRAT_IN",
        "P_TRAT_OUT",
    ]

    species_point_dict: dict = {}
    species_cell_dict: dict = {}
    obsolete_data = {}

    data_dict = {"point": mesh.point_data, "cell": mesh.cell_data}
    species_dict = {"point": species_point_dict, "cell": species_cell_dict}
    for point_or_cell in ["point", "cell"]:
        for data in data_dict[point_or_cell]:
            if data in species_parameters:
                obsolete_data[data] = point_or_cell
                # If there is only a single species in the model, doc.getSpeciesName(i) throws a
                # RunTimeError.
                number_of_species = doc.getNumberOfSpecies()
                for species_id in range(number_of_species):
                    if number_of_species > 1:
                        species = doc.getSpeciesName(species_id)
                    else:
                        species = "single_species"
                    par = (
                        doc.getParameter(getattr(ifm.Enum, data), species)
                        if species != "single_species"
                        else doc.getParameter(getattr(ifm.Enum, data))
                    )
                    species_dict[point_or_cell][species + "_" + data] = (
                        np.array(doc.getParamValues(par))
                    )

    return species_dict, obsolete_data


def _calculate_retardation_factor(mesh: pv.UnstructuredGrid) -> None:
    """
    Calculates the retardation factor from the absorption coefficient, which is called
    Henry constant in FEFLOW, according to the formula: R = 1 + (1-p)/p * S. With R
    the retardation factor, p the porosity, S the absorption coefficient. Further details
    can be found in the FEFLOW book by Diersch in chapter 5.4.1.4 equation 5.70.

    :param mesh: mesh
    """
    species_porosities = [
        porosity
        for porosity in mesh.cell_data
        if "PORO" in porosity and porosity != "P_POROH"
    ]
    for species_porosity in species_porosities:
        porosity = mesh.cell_data[species_porosity]
        species = species_porosity.replace("P_PORO", "")
        # calculation of the retardation factor
        mesh.cell_data[species + "retardation_factor"] = (
            1 + mesh.cell_data[species + "P_SORP"] * (1 - porosity) / porosity
        )


def _deactivate_cells(mesh: pv.UnstructuredGrid) -> int:
    """
    Multiplies the MaterialID of all cells that are inactive in FEFLOW by -1.
    Therefore, the input mesh is modified.
    :param mesh: mesh
    :return: 0 for no cells have been deactivated and 1 for cells have been deactivated
    """
    inactive_cells = np.where(mesh.cell_data["P_INACTIVE_ELE"] == 0)
    if len(inactive_cells[0]) == 0:
        return_int = 0
    else:
        mesh.cell_data["MaterialIDs"][inactive_cells] = -(
            mesh.cell_data["MaterialIDs"][inactive_cells] + 1
        )
        return_int = 1
        log.info(
            "There are inactive cells in FEFLOW, which are assigned to a MaterialID multiplied by -1 in the converted bulk mesh."
        )
    return return_int


def convert_geometry_mesh(doc: ifm.FeflowDoc) -> pv.UnstructuredGrid:
    """
    Get the geometric construction of the mesh.

    :param doc: The FEFLOW data.
    :returns: mesh
    """
    points, cells, celltypes = points_and_cells(doc)
    return pv.UnstructuredGrid(cells, celltypes, points)


def update_geometry(
    mesh: pv.UnstructuredGrid, doc: ifm.FeflowDoc
) -> pv.UnstructuredGrid:
    """
    Update the geometric construction of the mesh with point and cell data.

    :param mesh: The mesh to be updated.
    :param doc: The FEFLOW data.
    :returns: mesh
    """
    MaterialIDs = _material_ids_from_selections(doc)
    point_data, cell_data = _point_and_cell_data(MaterialIDs, doc)
    for pt_data in point_data:
        arr = point_data[pt_data].tolist()
        mesh.point_data.update({pt_data: arr})
    for c_data in cell_data:
        values = (
            cell_data[c_data][0]
            if c_data != "MaterialIDs"
            else cell_data[c_data]
        )
        mesh.cell_data.update({c_data: values.tolist()})
    # If the FEFLOW problem class refers to a mass problem,
    # the following if statement will be true.
    if doc.getProblemClass() in [1, 3, 5, 7]:
        (
            species_dict,
            obsolete_data,
        ) = get_species_parameter(doc, mesh)
        for point_data in species_dict["point"]:
            mesh.point_data.update(
                {point_data: species_dict["point"][point_data]}
            )
        for cell_data in species_dict["cell"]:
            mesh.cell_data.update(
                {cell_data: species_dict["cell"][cell_data][0]}
            )
        for data, geometry in obsolete_data.items():
            if geometry == "point":
                mesh.point_data.remove(data)
            elif geometry == "cell":
                mesh.cell_data.remove(data)
            else:
                logger.error(
                    "Unknown geometry to remove obsolet data after conversion of chemical species."
                )
        _calculate_retardation_factor(mesh)
    return _convert_to_SI_units(mesh)


def convert_properties_mesh(doc: ifm.FeflowDoc) -> pv.UnstructuredGrid:
    """
    Get the mesh with point and cell properties.

    :param doc: The FEFLOW data.
    :returns: mesh
    """
    mesh = convert_geometry_mesh(doc)
    update_geometry(mesh, doc)
    _deactivate_cells(mesh)
    return mesh


[docs] def read_feflow(feflow_file: Path | str) -> pv.UnstructuredGrid: """ Initialize a Mesh object read from a FEFLOW file. This mesh stores all model specific information such as boundary conditions or material parameters. :param feflow_file: Path to the feflow file. :returns: A Mesh object """ doc = ifm.loadDocument(str(feflow_file)) return convert_properties_mesh(doc)