# Copyright (c) 2012-2024, OpenGeoSys Community (http://www.opengeosys.org)
# Distributed under a Modified BSD License.
# See accompanying file LICENSE.txt or
# http://www.opengeosys.org/project/license
#
"""
This file provides an override to meshios XDMF Reader since it misses a feature
to handle hyperslabs (there are two ways to handle hyperslab:
the common documented `here <https://www.xdmf.org/index.php/XDMF_Model_and_Format#HyperSlab>`_
and the way paraview supports it (documentation missing).
Example::
2D_single_fracture_HT.h5:/meshes/2D_single_fracture/temperature|0 0:1 1:1 190:97 190
to be read like::
| start : stride : count : end
"""
from abc import ABC, abstractmethod
from pathlib import Path
from xml.etree.ElementTree import Element
import h5py
import meshio
import numpy as np
from meshio._mesh import CellBlock
from meshio.xdmf import common, time_series
from meshio.xdmf.time_series import (
ReadError,
cell_data_from_raw,
xdmf_to_numpy_type,
)
def _translate_mixed_cells_patched(data: list) -> list:
# Translate it into the cells dictionary.
# `data` is a one-dimensional vector with
# (cell_type1, p0, p1, ... ,pk, cell_type2, p10, p11, ..., p1k, ...
# https://xdmf.org/index.php/XDMF_Model_and_Format#Arbitrary
# https://gitlab.kitware.com/xdmf/xdmf/blob/master/XdmfTopologyType.hpp#L394
xdmf_idx_to_num_nodes = {
1: 1, # POLYVERTEX
2: 2, # POLYLINE
4: 3, # TRIANGLE
5: 4, # QUADRILATERAL
6: 4, # TETRAHEDRON
7: 5, # PYRAMID
8: 6, # WEDGE
9: 8, # HEXAHEDRON
35: 9, # QUADRILATERAL_9
36: 6, # TRIANGLE_6
37: 8, # QUADRILATERAL_8
38: 10, # TETRAHEDRON_10
39: 13, # PYRAMID_13
40: 15, # WEDGE_15
41: 18, # WEDGE_18
48: 20, # HEXAHEDRON_20
49: 24, # HEXAHEDRON_24
50: 27, # HEXAHEDRON_27
}
# collect types and offsets
types = []
_offsets = []
r = 0
while r < len(data):
xdmf_type = data[r]
types.append(xdmf_type)
_offsets.append(r)
if xdmf_type == 2: # line
if data[r + 1] != 2: # polyline
msg = "XDMF reader: Only supports 2-point lines for now"
raise ReadError(msg)
r += 1
r += 1
r += xdmf_idx_to_num_nodes[xdmf_type]
types = np.asarray(types)
offsets = np.asarray(_offsets)
b = np.concatenate(
[[0], np.where(types[:-1] != types[1:])[0] + 1, [len(types)]]
)
cells = []
for start, end in zip(b[:-1], b[1:], strict=False): # noqa: RUF007
meshio_type = common.xdmf_idx_to_meshio_type[types[start]]
n = xdmf_idx_to_num_nodes[types[start]]
point_offsets = offsets[start:end] + (2 if types[start] == 2 else 1)
indices = np.asarray([np.arange(n) + o for o in point_offsets])
cells.append(CellBlock(meshio_type, data[indices]))
return cells
time_series.translate_mixed_cells = _translate_mixed_cells_patched
[docs]
class DataItem(ABC):
"""
Abstract base class for all classes that end with DataItem.
"""
rawdata_path: Path
[docs]
@abstractmethod
def __getitem__(self, args: tuple | int | slice | np.ndarray) -> np.ndarray:
pass
[docs]
class H5DataItem(DataItem):
"""
A class to handle the data item in the xdmf file that references to a h5 file.
With init only the xdmf meta data is read. (light computation)
With selected_values the requested values are read from h5 file (heavy computation)
"""
[docs]
def __init__(self, file_info: str, xdmf_path: Path):
"""
:param file_info: The file_info string from the XDMF file
example: 2D_single_fracture_HT.h5:/meshes/2D_single_fracture/geometry|0 0 0:1 1 1:1 190 3:97 190 3
:param xdmf_path: Path to the xdmf file that references to the h5 file
"""
file_h5path__selections = file_info.split("|")
file_h5path = file_h5path__selections[0]
selections = (
file_h5path__selections[1]
if len(file_h5path__selections) > 1
else None
)
filename, h5path = file_h5path.split(":")
if selections:
# offsets, slices, current_data_extends, global_data_extends by dimension
m = [
list(map(int, att.split(" "))) for att in selections.split(":")
]
t = np.transpose(m)
selection = tuple(
slice(start, start + extend, step)
for start, step, extend, _ in t
)
else:
selection = ()
self.selection = selection
self.h5path = h5path
self.key = h5path[1:].split("/")[-1]
# The HDF5 file path is given with respect to the XDMF (XML) file.
dirpath = xdmf_path.resolve().parent
self.rawdata_path = dirpath / filename
[docs]
def __getitem__(self, args: tuple | int | slice | np.ndarray) -> np.ndarray:
"""
Reads value from HDF5 file based on given selection.
param args: See numpy array indexing https://numpy.org/doc/stable/user/basics.indexing.html#
:returns: A numpy array (sliced) of the requested data for all timesteps
"""
with h5py.File(self.rawdata_path, "r") as file:
f = file
if self.h5path[0] != "/":
raise ReadError()
for key in self.h5path[1:].split("/"):
f = f[key]
return f[args]
[docs]
def selected_values(self) -> np.ndarray:
"""
Returns all values of the DataItem that are selected in the xdmf file.
An empty selection means all values are read.
In the xdmf file with <DataItem> tag you optionally find a string containing
the 1.h5filename, 2. name of h5 group, 3. selection
e.g. 2D_single_fracture_HT.h5:/meshes/2D_single_fracture/geometry|0 0 0:1 1 1:1 190 3:97 190 3
The selection here is:
0 0 0:1 1 1:1 190 3:97 190 3
The meaning is: [(offset(0,0,0): step(1,1,1) : end(1,190,3) : of_data_with_size(97,190,30))]
:returns: A numpy array (sliced) of the requested data for all timesteps
"""
# should always be just one timestep
return self[self.selection].squeeze()
[docs]
class XMLDataItem(DataItem):
[docs]
def __init__(
self, name: str, dims: list[int], data_type: str | None, precision: str
):
self.key = name
self._data_type = data_type
self._precision = precision
self._dims = dims
[docs]
def __getitem__(self, args: tuple | int | slice | np.ndarray) -> np.ndarray:
return np.fromstring(
self.key,
dtype=xdmf_to_numpy_type[(self._data_type, self._precision)],
sep=" ",
).reshape(self._dims)
[docs]
class BinaryDataItem(DataItem):
# BinaryDataItem( data_item.text.strip(), dims, data_type, precision )
[docs]
def __init__(
self, name: str, dims: list[int], data_type: str | None, precision: str
):
self.key = name
self._data_type = data_type
self._precision = precision
self._dims = dims
[docs]
def __getitem__(self, args: tuple | int | slice | np.ndarray) -> np.ndarray:
return np.fromfile(
self.key,
dtype=xdmf_to_numpy_type[(self._data_type, self._precision)],
).reshape(self._dims)
[docs]
class DataItems:
[docs]
def __init__(self, items: list[DataItem], center: str):
self.items = items
self.center = center
# Actually, `NumberType` is XDMF2 and `DataType` XDMF3, but many files out there
# use both keys interchangeably.
all_in_h5 = np.all(
[isinstance(item, H5DataItem) for item in self.items]
)
if not all_in_h5:
self.fast_access = False
return
all_in_same_file = (
# can ignore lint because isinstance is checked
len(np.unique([item.h5path for item in self.items])) # type: ignore[attr-defined]
== 1
)
if not all_in_same_file:
self.fast_access = False
return
self.fast_access = all_in_h5 and all_in_same_file
[docs]
def __getitem__(self, args: tuple | slice | int) -> np.ndarray:
key = args if isinstance(args, tuple) else (args,)
if not self.fast_access:
all_time_steps = self.items[key[0]]
if not isinstance(all_time_steps, list):
return self.items[key[0]][key[1:]]
arrays = [item[key[1:]] for item in all_time_steps]
return np.stack(arrays)
# If all items are stored within same h5 file, take info from 1st time step
return self.items[0][key]
[docs]
class XDMFReader(meshio.xdmf.TimeSeriesReader):
[docs]
def __init__(self, filename: str):
super().__init__(filename)
### extension for indexing
self.filename: Path = Path(self.filename)
data_items: dict[str, list[DataItem]] = {}
self.data_items: dict[str, DataItems] = {}
data_attribute: dict[str, str] = {}
self.t = None
for grid in self.collection:
for item in grid:
if item.tag == "Time":
self.t = float(item.attrib["Value"])
elif item.tag == "Attribute":
name = item.get("Name")
if len(list(item)) != 1:
raise ReadError()
data_item = next(iter(item))
if item.get("Center") not in [
"Node",
"Cell",
"Other",
]:
raise ReadError()
center = item.get("Center")
data = self.select_item(data_item)
if name in data_items:
data_items[name].append(data)
else:
data_items[name] = [data]
data_attribute[name] = center
for key, value in data_items.items():
self.data_items[key] = DataItems(value, data_attribute[key])
[docs]
def has_fast_access(self, key: str | None = None) -> bool:
if len(self.data_items) == 0:
return False # if there is no data, there is no fast access
if key is None:
all_fast = {
key: item.fast_access for key, item in self.data_items.items()
}
return all(all_fast.values())
key = next(iter(self.data_items)) # checked for len > 0
return self.data_items[key].fast_access
[docs]
def rawdata_path(self, key: str | None = None) -> Path:
# This function should usually work for OGS Simulation result in XDMF [single hdf5 file]
# To be used in combination with h5py to read/save/manipulate of the hdf5file
if self.has_fast_access(key):
if key is None:
key = next(
iter(self.data_items)
) # checked for len > 0 in has_fast_access
return self.data_items[key].items[0].rawdata_path
return self.filename
[docs]
def read_data(self, k: int) -> tuple[float, dict, dict, dict]:
point_data = {}
cell_data_raw: dict = {}
other_data = {}
t = None
cell_data = cell_data_from_raw(self.cells, cell_data_raw)
for c in list(self.collection[k]):
if c.tag == "Time":
t = float(c.attrib["Value"])
elif c.tag == "Attribute":
name = c.get("Name")
if len(list(c)) != 1:
raise ReadError()
data_item = next(iter(c))
data = self._read_data_item(data_item)
if c.get("Center") == "Node":
point_data[name] = data
elif c.get("Center") == "Cell":
cell_data_raw[name] = data
elif c.get("Center") == "Other":
other_data[name] = data
else:
raise ReadError()
else:
# skip the xi:included mesh
continue
if self.cells is None:
raise ReadError()
cell_data = cell_data_from_raw(self.cells, cell_data_raw)
if t is None:
raise ReadError()
return t, point_data, cell_data, other_data
[docs]
def select_item(self, data_item: Element) -> np.ndarray:
dims = [int(d) for d in data_item.get("Dimensions", "").split()]
# Actually, `NumberType` is XDMF2 and `DataType` XDMF3, but many files out there
# use both keys interchangeably.
if data_item.get("DataType"):
if data_item.get("NumberType"):
raise ReadError()
data_type = data_item.get("DataType")
elif data_item.get("NumberType"):
if data_item.get("DataType"):
raise ReadError()
data_type = data_item.get("NumberType")
else:
# Default, see
# <https://xdmf.org/index.php/XDMF_Model_and_Format#XML_Element_.28Xdmf_ClassName.29_and_Default_XML_Attributes>
data_type = "Float"
try:
precision = data_item.attrib["Precision"]
except KeyError:
precision = "4"
data_format = data_item.attrib["Format"]
assert isinstance(data_item.text, str)
if data_format == "XML":
return XMLDataItem(data_item.text, dims, data_type, precision)
if data_format == "Binary":
return BinaryDataItem(
data_item.text.strip(), dims, data_type, precision
)
if data_format == "HDF":
return H5DataItem(
file_info=data_item.text.strip(), xdmf_path=self.filename
)
msg = f"Unknown XDMF Format '{data_format}'."
raise ReadError(msg)
# Copy of _read_data_item of meshio with fix for slices
def _read_data_item(self, data_item: Element) -> np.ndarray:
dims = [int(d) for d in data_item.get("Dimensions", "").split()]
# Actually, `NumberType` is XDMF2 and `DataType` XDMF3, but many files out there
# use both keys interchangeably.
if data_item.get("DataType"):
if data_item.get("NumberType"):
raise ReadError()
data_type = data_item.get("DataType")
elif data_item.get("NumberType"):
if data_item.get("DataType"):
raise ReadError()
data_type = data_item.get("NumberType")
else:
# Default, see
# <https://xdmf.org/index.php/XDMF_Model_and_Format#XML_Element_.28Xdmf_ClassName.29_and_Default_XML_Attributes>
data_type = "Float"
try:
precision = data_item.attrib["Precision"]
except KeyError:
precision = "4"
data_format = data_item.attrib["Format"]
assert isinstance(data_item.text, str)
if data_format == "XML":
return np.fromstring(
data_item.text,
dtype=xdmf_to_numpy_type[(data_type, precision)],
sep=" ",
).reshape(dims)
if data_format == "Binary":
return np.fromfile(
data_item.text.strip(),
dtype=xdmf_to_numpy_type[(data_type, precision)],
).reshape(dims)
if data_format != "HDF":
msg = f"Unknown XDMF Format '{data_format}'."
raise ReadError(msg)
file_info = data_item.text.strip()
file_h5path__selections = file_info.split("|")
file_h5path = file_h5path__selections[0]
selections = (
file_h5path__selections[1]
if len(file_h5path__selections) > 1
else None
)
filename, h5path = file_h5path.split(":")
if selections:
# offsets, slices, current_data_extends, global_data_extends by dimension
m = [
list(map(int, att.split(" "))) for att in selections.split(":")
]
t = np.transpose(m)
selection = tuple(
slice(start, start + extend, step)
for start, step, extend, _ in t
)
else:
selection = ()
# The HDF5 file path is given with respect to the XDMF (XML) file.
dirpath = self.filename.resolve().parent
full_hdf5_path = dirpath / filename
if full_hdf5_path in self.hdf5_files:
f = self.hdf5_files[full_hdf5_path]
else:
import h5py
f = h5py.File(full_hdf5_path, "r")
self.hdf5_files[full_hdf5_path] = f
if h5path[0] != "/":
raise ReadError()
for key in h5path[1:].split("/"):
f = f[key]
# `[()]` gives a np.ndarray
return f[selection].squeeze()