# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause
from pathlib import Path
import numpy as np
from lxml import etree as ET
from ogstools.ogs6py import build_tree
[docs]
class Curves(build_tree.BuildTree):
"""
Class to create the curve section of the project file.
"""
[docs]
def __init__(
self, tree: ET.ElementTree, input_file: Path | str | None = None
) -> None:
self.tree = tree
self.root = self.tree.getroot()
self.curves = self.populate_tree(self.root, "curves", overwrite=True)
self._input_file = Path(input_file) if input_file else None
self.files: list = []
self._reload_curve_files()
def _reload_curve_files(self) -> None:
"""Rebuild curve_files from current XML tree (curves with read_from_file=true)."""
from ogstools.ogs6py.referenced_file import ReferencedFile
self.files = []
for curve in self.tree.findall("./curves/curve"):
rfb = curve.find("read_from_file")
if rfb is None or rfb.text.strip().lower() != "true":
continue
name = curve.findtext("name") or ""
for tag in ("coords", "values"):
elem = curve.find(tag)
if elem is not None and elem.text:
xpath = f"./curves/curve[name='{name}']/{tag}"
rf = ReferencedFile(self.tree, xpath=xpath)
if self._input_file is not None and rf.filename:
src = self._input_file.parent / rf.filename
if src.exists():
rf._active_target = src
self.files.append(rf)
def _get_array(self, name: str, tag: str) -> np.ndarray:
"""Return coords or values of a named curve as a numpy array.
:param name: Curve name as defined in the <name> element.
:param tag: Either ``"coords"`` or ``"values"``.
"""
curve = self.tree.find(f"./curves/curve[name='{name}']")
if curve is None:
raise KeyError(name)
if curve.findtext("read_from_file", "").strip().lower() == "true":
xpath = f"./curves/curve[name='{name}']/{tag}"
rf = next((r for r in self.files if r._xpath == xpath), None)
if rf is None:
msg = f"Binary file for curve {name!r} ({tag}) is not resolved."
raise FileNotFoundError(msg)
return np.fromfile(rf.active_target, dtype="<f8")
return np.fromstring(curve.findtext(tag) or "", dtype=float, sep=" ")
[docs]
def coords(self, name: str) -> np.ndarray:
"""Return the coords of a named curve as a numpy array.
:param name: Curve name as defined in the <name> element.
:returns: 1-D array of coordinate values.
:raises KeyError: If no curve with the given name exists.
:raises FileNotFoundError: If the binary file for a file-based curve
is not resolved.
"""
return self._get_array(name, "coords")
[docs]
def values(self, name: str) -> np.ndarray:
"""Return the values of a named curve as a numpy array.
:param name: Curve name as defined in the <name> element.
:returns: 1-D array of curve values corresponding to each coord.
:raises KeyError: If no curve with the given name exists.
:raises FileNotFoundError: If the binary file for a file-based curve
is not resolved.
"""
return self._get_array(name, "values")
[docs]
def add_curve_from_file(
self, name: str, coords: str | Path, values: str | Path
) -> None:
"""Add a curve whose data is read from binary files.
The binary files must be in little-endian double precision format.
Only the file basenames are stored in the project file; if full paths
are given, ``Project.save()`` copies the files to the project directory.
:param name: Curve name.
:param coords: Path to the binary coords file (full or basename only).
:param values: Path to the binary values file (full or basename only).
"""
from ogstools.ogs6py.referenced_file import ReferencedFile
coords, values = Path(coords), Path(values)
curve = self.populate_tree(self.curves, "curve")
self.populate_tree(curve, "name", name)
self.populate_tree(curve, "read_from_file", text="true")
self.populate_tree(curve, "coords", text=coords.name)
self.populate_tree(curve, "values", text=values.name)
# _bind_to_path records the full source path so Project.save() can copy
# the file to the project directory (only the basename is in the XML).
for tag, src in (("coords", coords), ("values", values)):
xpath = f"./curves/curve[name='{name}']/{tag}"
rf = ReferencedFile(self.tree, xpath=xpath)
if src.is_absolute() and src.exists():
rf._bind_to_path(src)
self.files.append(rf)
[docs]
def add_curve(self, name: str, coords: list, values: list) -> None:
"""
Adds a new curve.
:param name:
:param coords:
:param values:
"""
if len(coords) != len(values):
msg = """Number of time coordinate points differs \
from number of values"""
raise ValueError(msg)
curve = self.populate_tree(self.curves, "curve")
self.populate_tree(curve, "name", name)
coord_str = ""
value_str = ""
for i, coord in enumerate(coords):
if i < (len(coords) - 1):
coord_str = coord_str + str(coord) + " "
value_str = value_str + str(values[i]) + " "
if i == (len(coords) - 1):
coord_str = coord_str + str(coord)
value_str = value_str + str(values[i])
self.populate_tree(curve, "coords", text=coord_str)
self.populate_tree(curve, "values", text=value_str)