"""
Copyright (c) 2012-2024, OpenGeoSys Community (http://www.opengeosys.org)
Distributed under a Modified BSD License.
See accompanying file LICENSE or
http://www.opengeosys.org/project/license
"""
from typing import Any
from lxml import etree as ET
from ogstools.ogs6py import build_tree
[docs]
class TimeLoop(build_tree.BuildTree):
"""
Class managing the time loop in the project file.
"""
[docs]
def __init__(self, tree: ET.ElementTree) -> None:
self.tree = tree
self.root = self.tree.getroot()
self.time_loop = self.populate_tree(
self.root, "time_loop", overwrite=True
)
self.gpc = self.populate_tree(
self.time_loop, "global_process_coupling", overwrite=True
)
self.processes = self.populate_tree(
self.time_loop, "processes", overwrite=True
)
self.output = self.populate_tree(
self.time_loop, "output", overwrite=True
)
[docs]
def add_process(self, **args: Any) -> None:
"""
Add a process section to timeloop.
Parameters
----------
process : `str`
convergence_type : `str`
abstol : `str`
abstols : `str`
reltol : `str`
reltols : `str`
norm_type : `str`
nonlinear_solver_name : `str`
time_discretization : `str`
"""
self._convertargs(args)
if "process" not in args:
msg = "No process referenced"
raise KeyError(msg)
process_name = args["process"]
process = self.populate_tree(
self.processes, "process", attr={"ref": process_name}
)
if "nonlinear_solver_name" not in args:
msg = """Please specify a name (nonlinear_solver_name) \
for the nonlinear solver."""
raise KeyError(msg)
self.populate_tree(
process, "nonlinear_solver", text=args["nonlinear_solver_name"]
)
if "convergence_type" not in args:
msg = """No convergence criterion given. \
Specify convergence_type."""
raise KeyError(msg)
conv_crit = self.populate_tree(process, "convergence_criterion")
self.populate_tree(conv_crit, "type", text=args["convergence_type"])
if "norm_type" not in args:
msg = "No norm_type given."
raise KeyError(msg)
self.populate_tree(conv_crit, "norm_type", text=args["norm_type"])
if (args["convergence_type"] == "DeltaX") or (
args["convergence_type"] == "Residual"
):
if ("abstols" in args) or ("reltols" in args):
msg = "Plural tolerances only available for PerComponent conv. types"
raise KeyError(msg)
if "abstol" in args:
self.populate_tree(conv_crit, "abstol", text=args["abstol"])
if "reltol" in args:
self.populate_tree(conv_crit, "reltol", text=args["reltol"])
elif (args["convergence_type"] == "PerComponentDeltaX") or (
args["convergence_type"] == "PerComponentResidual"
):
if ("abstol" in args) or ("reltol" in args):
msg = (
"Singular tolerances only available for scalar conv. types"
)
raise KeyError(msg)
if "abstols" in args:
self.populate_tree(conv_crit, "abstols", text=args["abstols"])
if "reltols" in args:
self.populate_tree(conv_crit, "reltols", text=args["reltols"])
else:
msg = "No convergence_type given."
raise KeyError(msg)
if "time_discretization" not in args:
msg = "No time_discretization specified."
raise KeyError(msg)
td = self.populate_tree(process, "time_discretization")
self.populate_tree(td, "type", text=args["time_discretization"])
[docs]
def set_stepping(self, **args: Any) -> None:
"""
Sets the time stepping.
Parameters
----------
type : `str`
process : `str`
process_count : `int` for staggered coupling
t_initial : `int` or `str`
initial_dt : `float` or `str`
t_end : `int` or `str`
repeat : `int` or `str`
delta_t : `float` or `str`
minimum_dt : `float` or `str`
maximum_dt : `float` or `str`
number_iterations : `list`
multiplier : `list`
dt_guess : `float` or `str`
dt_min : `float` or `str`
dt_max : `float` or `str`
rel_dt_max : `float` or `str`
rel_dt_min : `float` or `str`
tol : `float` or `str`
"""
self._convertargs(args)
if "process" not in args:
msg = "Process reference missing"
raise KeyError(msg)
procs = self.processes.findall("./process")
process = None
procs_sub = []
for proc in procs:
if args["process"] == proc.get("ref"):
procs_sub.append(proc)
if "process_count" in args:
try:
process = procs_sub[int(args["process_count"])]
except KeyError:
msg = "Process count out of bounds."
KeyError(msg)
else:
try:
process = procs_sub[-1]
except KeyError:
msg = "Process reference not found."
KeyError(msg)
if "type" not in args:
msg = "No type given."
raise KeyError(msg)
time_stepping = self.populate_tree(process, "time_stepping")
self.populate_tree(time_stepping, "type", text=args["type"])
if args["type"] == "FixedTimeStepping":
self.populate_tree(
time_stepping, "t_initial", text=args["t_initial"]
)
self.populate_tree(time_stepping, "t_end", text=args["t_end"])
if "repeat" in args and "delta_t" in args:
ts = self.populate_tree(time_stepping, "timesteps")
if isinstance(args["repeat"], str) and isinstance(
args["delta_t"], str
):
pair = self.populate_tree(ts, "pair")
self.populate_tree(pair, "repeat", text=args["repeat"])
self.populate_tree(pair, "delta_t", text=args["delta_t"])
else:
for i, entry in enumerate(args["repeat"]):
pair = self.populate_tree(ts, "pair")
self.populate_tree(pair, "repeat", text=entry)
self.populate_tree(
pair, "delta_t", text=args["delta_t"][i]
)
else:
msg = """No proper time stepping defined. \
Please specify repeat and delta_t."""
raise KeyError(msg)
elif args["type"] == "SingleStep":
pass
elif args["type"] == "IterationNumberBasedTimeStepping":
self.populate_tree(
time_stepping, "t_initial", text=args["t_initial"]
)
self.populate_tree(time_stepping, "t_end", text=args["t_end"])
self.populate_tree(
time_stepping, "initial_dt", text=args["initial_dt"]
)
self.populate_tree(
time_stepping, "minimum_dt", text=args["minimum_dt"]
)
self.populate_tree(
time_stepping, "maximum_dt", text=args["maximum_dt"]
)
if isinstance(args["number_iterations"], str) and isinstance(
args["multiplier"], str
):
self.populate_tree(
time_stepping,
"number_iterations",
text=args["number_iterations"],
)
self.populate_tree(
time_stepping, "multiplier", text=args["multiplier"]
)
else:
self.populate_tree(
time_stepping,
"number_iterations",
text=" ".join(str(x) for x in args["number_iterations"]),
)
self.populate_tree(
time_stepping,
"multiplier",
text=" ".join(str(x) for x in args["multiplier"]),
)
elif args["type"] == "EvolutionaryPIDcontroller":
self.populate_tree(
time_stepping, "t_initial", text=args["t_initial"]
)
self.populate_tree(time_stepping, "t_end", text=args["t_end"])
self.populate_tree(time_stepping, "dt_guess", text=args["dt_guess"])
self.populate_tree(time_stepping, "dt_min", text=args["dt_min"])
self.populate_tree(time_stepping, "dt_max", text=args["dt_max"])
self.populate_tree(
time_stepping, "rel_dt_max", text=args["rel_dt_max"]
)
self.populate_tree(
time_stepping, "rel_dt_min", text=args["rel_dt_min"]
)
self.populate_tree(time_stepping, "tol", text=args["tol"])
else:
msg = "Specified time stepping scheme not valid."
raise KeyError(msg)
[docs]
def add_output(self, **args: Any) -> None:
"""
Add output section.
Parameters
----------
type : `str`
prefix : `str`
suffix : `str`
variables : `list`
data_mode : `str`
compress_output : `str`
output_iteration_results: `bool` or `str`
meshes : `list` or `str`
repeat : `list` or `str`
each_steps : `list` or `str`
fixed_output_times : `list` or `str`
"""
if "type" not in args:
msg = """If you want to specify an output method, \
you need to provide type, \
prefix and a list of variables."""
raise KeyError(msg)
self.populate_tree(self.output, "type", text=args["type"])
if "prefix" in args:
self.populate_tree(self.output, "prefix", text=args["prefix"])
if "suffix" in args:
self.populate_tree(self.output, "suffix", text=args["suffix"])
if "data_mode" in args:
self.populate_tree(self.output, "data_mode", text=args["data_mode"])
if "compress_output" in args:
if isinstance(args["compress_output"], bool):
if args["compress_output"] is True:
self.populate_tree(
self.output, "compress_output", text="true"
)
else:
self.populate_tree(
self.output, "compress_output", text="false"
)
else:
self.populate_tree(
self.output, "compress_output", text=args["compress_output"]
)
if "output_iteration_results" in args:
if isinstance(args["output_iteration_results"], bool):
if args["output_iteration_results"] is True:
self.populate_tree(
self.output, "output_iteration_results", text="true"
)
else:
self.populate_tree(
self.output, "output_iteration_results", text="false"
)
else:
self.populate_tree(
self.output,
"output_iteration_results",
text=args["output_iteration_results"],
)
if "meshes" in args:
meshes = self.populate_tree(self.output, "meshes")
if isinstance(args["meshes"], str):
self.populate_tree(meshes, "mesh", text=args["meshes"])
else:
for mesh in args["meshes"]:
self.populate_tree(meshes, "mesh", text=mesh)
if "repeat" in args:
timesteps = self.populate_tree(self.output, "timesteps")
if "each_steps" not in args:
msg = "each_steps is a required tag if repeat is given."
raise KeyError(msg)
if isinstance(args["repeat"], list) and isinstance(
args["each_steps"], list
):
for i, entry in enumerate(args["repeat"]):
pair = self.populate_tree(timesteps, "pair")
self.populate_tree(pair, "repeat", text=entry)
self.populate_tree(
pair, "each_steps", text=args["each_steps"][i]
)
else:
pair = self.populate_tree(timesteps, "pair")
self.populate_tree(pair, "repeat", text=args["repeat"])
self.populate_tree(pair, "each_steps", text=args["each_steps"])
variables = self.populate_tree(self.output, "variables")
if "variables" in args:
if isinstance(args["variables"], list):
for var in args["variables"]:
self.populate_tree(variables, "variable", text=var)
else:
msg = "parameter variables needs to be a list"
raise KeyError(msg)
if "fixed_output_times" in args:
if isinstance(args["fixed_output_times"], list):
self.populate_tree(
self.output,
"fixed_output_times",
text=" ".join(str(x) for x in args["fixed_output_times"]),
)
else:
self.populate_tree(
self.output,
"fixed_output_times",
text=args["fixed_output_times"],
)
[docs]
def add_time_stepping_pair(self, **args: Any) -> None:
"""
Add a time stepping pair.
Parameters
----------
process : `str`
process_count : `int` optional, for staggered coupling
repeat : `int` or `str` or `list`
delta_t : `int` or `str` or `list`
"""
self._convertargs(args)
if "process" not in args:
msg = "No process referenced"
raise KeyError(msg)
procs = self.processes.findall("./process")
process = None
procs_sub = []
for proc in procs:
if args["process"] == proc.get("ref"):
procs_sub.append(proc)
if "process_count" in args:
try:
process = procs_sub[int(args["process_count"])]
except KeyError:
msg = "Process count out of bounds."
KeyError(msg)
else:
try:
process = procs_sub[-1]
except KeyError:
msg = "Process reference not found."
KeyError(msg)
if process is None:
msg = "Could not find any associated process"
raise AttributeError(msg)
ts = process.find("./time_stepping/timesteps")
if ts is None:
msg = "Cannot find time stepping section in the input file."
raise RuntimeError(msg)
if "repeat" in args and "delta_t" in args:
if isinstance(args["repeat"], str) and isinstance(
args["delta_t"], str
):
pair = self.populate_tree(ts, "pair")
self.populate_tree(pair, "repeat", text=args["repeat"])
self.populate_tree(pair, "delta_t", text=args["delta_t"])
else:
for i, entry in enumerate(args["repeat"]):
pair = self.populate_tree(ts, "pair")
self.populate_tree(pair, "repeat", text=entry)
self.populate_tree(pair, "delta_t", text=args["delta_t"][i])
else:
msg = """You muss provide repeat and delta_t attributes to \
define additional time stepping pairs."""
raise KeyError(msg)
[docs]
def add_output_pair(self, **args: Any) -> None:
"""
Add an output pair.
Parameters
----------
repeat : `int` or `str` or `list`
each_steps : `int` or `str` or `list`
"""
self._convertargs(args)
timesteps = self.populate_tree(self.output, "timesteps", overwrite=True)
if "repeat" in args and "each_steps" in args:
if isinstance(args["repeat"], list) and isinstance(
args["each_steps"], list
):
for i, entry in enumerate(args["repeat"]):
pair = self.populate_tree(timesteps, "pair")
self.populate_tree(pair, "repeat", text=entry)
self.populate_tree(
pair, "each_steps", text=args["each_steps"][i]
)
else:
pair = self.populate_tree(timesteps, "pair")
self.populate_tree(pair, "repeat", text=args["repeat"])
self.populate_tree(pair, "each_steps", text=args["each_steps"])
else:
msg = """You muss provide repeat and each_steps attributes \
to define additional output pairs."""
raise KeyError(msg)
[docs]
def add_global_process_coupling(self, **args: Any) -> None:
"""
Add a process section to the timeloop.
Parameters
----------
max_iter : `str` optional, needs to be specified once
convergence_type : `str`
abstol : `str`
abstols : `str`
reltol : `str`
reltols : `str`
norm_type : `str`
local_coupling_processes : `list` with names
local_coupling_processes_max_iter : `str`
"""
self._convertargs(args)
if "max_iter" in args:
self.populate_tree(
self.gpc, "max_iter", text=args["max_iter"], overwrite=True
)
convergence_criteria = self.populate_tree(
self.gpc, "convergence_criteria", overwrite=True
)
if "convergence_type" not in args:
msg = """No convergence criterion given. \
Specify convergence_type."""
raise KeyError(msg)
conv_crit = self.populate_tree(
convergence_criteria, "convergence_criterion"
)
self.populate_tree(conv_crit, "type", text=args["convergence_type"])
if "norm_type" not in args:
msg = "No norm_type given."
raise KeyError(msg)
self.populate_tree(conv_crit, "norm_type", text=args["norm_type"])
if (args["convergence_type"] == "DeltaX") or (
args["convergence_type"] == "Residual"
):
if ("abstols" in args) or ("reltols" in args):
msg = "Plural tolerances only available for PerComponent conv. types"
raise KeyError(msg)
if "abstol" in args:
self.populate_tree(conv_crit, "abstol", text=args["abstol"])
if "reltol" in args:
self.populate_tree(conv_crit, "reltol", text=args["reltol"])
elif (args["convergence_type"] == "PerComponentDeltaX") or (
args["convergence_type"] == "PerComponentResidual"
):
if ("abstol" in args) or ("reltol" in args):
msg = (
"Singular tolerances only available for scalar conv. types"
)
raise KeyError(msg)
if "abstols" in args:
self.populate_tree(conv_crit, "abstols", text=args["abstols"])
if "reltols" in args:
self.populate_tree(conv_crit, "reltols", text=args["reltols"])
else:
msg = "No convergence_type given."
raise KeyError(msg)
if "local_coupling_processes" in args:
if "local_coupling_processes_max_iter" not in args:
msg = "local_coupling_processes_max_iter parameter is missing"
raise KeyError(msg)
lcp = self.populate_tree(self.gpc, "local_coupling_processes")
self.populate_tree(
lcp, "max_iter", text=args["local_coupling_processes_max_iter"]
)
for name in args["local_coupling_processes"]:
self.populate_tree(lcp, "process_name", text=name)