Source code for ogstools.meshlib.mesh_series

"""A class to handle Meshseries data."""

from pathlib import Path
from typing import Union

import meshio
import numpy as np
import pyvista as pv
from meshio.xdmf.time_series import (
    ReadError,
    cell_data_from_raw,
    xdmf_to_numpy_type,
)


[docs]class TimeSeriesReader(meshio.xdmf.TimeSeriesReader):
[docs] def __init__(self, filename): super().__init__(filename)
[docs] def read_data(self, k: int): point_data = {} cell_data_raw = {} other_data = {} t = None for c in list(self.collection[k]): if c.tag == "Time": t = float(c.attrib["Value"]) elif c.tag == "Attribute": name = c.get("Name") if len(list(c)) != 1: raise ReadError() data_item = list(c)[0] data = self._read_data_item(data_item) if c.get("Center") == "Node": point_data[name] = data elif c.get("Center") == "Cell": cell_data_raw[name] = data elif c.get("Center") == "Other": other_data[name] = data else: raise ReadError() else: # skip the xi:included mesh continue if self.cells is None: raise ReadError() cell_data = cell_data_from_raw(self.cells, cell_data_raw) if t is None: raise ReadError() return t, point_data, cell_data, other_data
def _read_data_item(self, data_item): dims = [int(d) for d in data_item.get("Dimensions").split()] # Actually, `NumberType` is XDMF2 and `DataType` XDMF3, but many files out there # use both keys interchangeably. if data_item.get("DataType"): if data_item.get("NumberType"): raise ReadError() data_type = data_item.get("DataType") elif data_item.get("NumberType"): if data_item.get("DataType"): raise ReadError() data_type = data_item.get("NumberType") else: # Default, see # <https://xdmf.org/index.php/XDMF_Model_and_Format#XML_Element_.28Xdmf_ClassName.29_and_Default_XML_Attributes> data_type = "Float" try: precision = data_item.attrib["Precision"] except KeyError: precision = "4" data_format = data_item.attrib["Format"] if data_format == "XML": return np.fromstring( data_item.text, dtype=xdmf_to_numpy_type[(data_type, precision)], sep=" ", ).reshape(dims) if data_format == "Binary": return np.fromfile( data_item.text.strip(), dtype=xdmf_to_numpy_type[(data_type, precision)], ).reshape(dims) if data_format != "HDF": msg = f"Unknown XDMF Format '{data_format}'." raise ReadError(msg) file_info = data_item.text.strip() file_h5path__selections = file_info.split("|") file_h5path = file_h5path__selections[0] selections = ( file_h5path__selections[1] if len(file_h5path__selections) > 1 else None ) filename, h5path = file_h5path.split(":") if selections: # offsets, slices, current_data_extends, global_data_extends by dimension m = [ list(map(int, att.split(" "))) for att in selections.split(":") ] t = np.transpose(m) selection = tuple( slice(start, start + extend, step) for start, step, extend, _ in t ) else: selection = () # The HDF5 file path is given with respect to the XDMF (XML) file. dirpath = self.filename.resolve().parent full_hdf5_path = dirpath / filename if full_hdf5_path in self.hdf5_files: f = self.hdf5_files[full_hdf5_path] else: import h5py f = h5py.File(full_hdf5_path, "r") self.hdf5_files[full_hdf5_path] = f if h5path[0] != "/": raise ReadError() for key in h5path[1:].split("/"): f = f[key] # `[()]` gives a np.ndarray return f[selection].squeeze()
[docs]class MeshSeries: """ A wrapper around pyvista and meshio for reading of pvd and xdmf timeseries. Will be replaced by own module in ogstools with similar interface. """
[docs] def __init__(self, filepath: Union[str, Path]) -> None: if isinstance(filepath, Path): filepath = str(filepath) self._data: dict[int, pv.UnstructuredGrid] = {} self._data_type = filepath.split(".")[-1] if self._data_type == "pvd": self._pvd_reader = pv.PVDReader(filepath) elif self._data_type == "xdmf": self._xdmf_reader = TimeSeriesReader(filepath) elif self._data_type == "vtu": self._vtu_reader = pv.XMLUnstructuredGridReader(filepath) else: msg = "Can only read 'pvd', 'xdmf' or 'vtu' files." raise TypeError(msg)
def _read_pvd(self, timestep: int) -> pv.UnstructuredGrid: self._pvd_reader.set_active_time_point(timestep) return self._pvd_reader.read()[0] def _read_xdmf(self, timestep: int) -> pv.UnstructuredGrid: points, cells = self._xdmf_reader.read_points_cells() _, point_data, cell_data, field_data = self._xdmf_reader.read_data( timestep ) meshio_mesh = meshio.Mesh(points, cells, point_data, cell_data) return pv.from_meshio(meshio_mesh)
[docs] def read( self, timestep: int, lazy_eval: bool = True ) -> pv.UnstructuredGrid: """Lazy read function.""" if timestep in self._data: return self._data[timestep] if self._data_type == "pvd": mesh = self._read_pvd(timestep) elif self._data_type == "xdmf": mesh = self._read_xdmf(timestep) elif self._data_type == "vtu": mesh = self._vtu_reader.read() if lazy_eval: self._data[timestep] = mesh return mesh
[docs] def clear(self) -> None: self._data.clear()
@property def timesteps(self) -> range: """Return the timesteps of the timeseries data.""" if self._data_type == "vtu": return range(0) if self._data_type == "pvd": return range(self._pvd_reader.number_time_points) # elif self._data_type == "xdmf": return range(len(self.timevalues)) @property def timevalues(self) -> list[float]: """Return the timevalues of the timeseries data.""" if self._data_type == "vtu": return [0] if self._data_type == "pvd": return self._pvd_reader.time_values # elif self._data_type == "xdmf": time_values = [] for collection_i in self._xdmf_reader.collection: for element in collection_i: if element.tag == "Time": time_values += [float(element.attrib["Value"])] return time_values
[docs] def closest_timestep(self, timevalue: float) -> int: """Return the corresponding timestep from a timevalue.""" return int(np.argmin(np.abs(np.array(self.timevalues) - timevalue)))
[docs] def closest_timevalue(self, timevalue: float) -> float: """Return the closest timevalue to a timevalue.""" return self.timevalues[self.closest_timestep(timevalue)]
[docs] def read_closest(self, timevalue: float) -> pv.UnstructuredGrid: """Return the closest timestep in the data for a given timevalue.""" return self.read(self.closest_timestep(timevalue))
[docs] def read_interp( self, timevalue: float, lazy_eval: bool = True ) -> pv.UnstructuredGrid: """Return the temporal interpolated mesh for a given timevalue.""" t_vals = np.array(self.timevalues) ts1 = int(t_vals.searchsorted(timevalue, "right") - 1) ts2 = min(ts1 + 1, len(t_vals) - 1) if np.isclose(timevalue, t_vals[ts1]): return self.read(ts1, lazy_eval) mesh1 = self.read(ts1, lazy_eval) mesh2 = self.read(ts2, lazy_eval) mesh = mesh1.copy(deep=True) for key in mesh1.point_data: if np.all(mesh1.point_data[key] == mesh2.point_data[key]): continue dt = t_vals[ts2] - t_vals[ts1] slope = (mesh2.point_data[key] - mesh1.point_data[key]) / dt mesh.point_data[key] = mesh1.point_data[key] + slope * ( timevalue - t_vals[ts1] ) return mesh