# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import time
from pathlib import Path
from queue import Empty, Queue
try:
from bokeh.io import push_notebook
from bokeh.io.notebook import CommsHandle
from bokeh.models import ColumnDataSource
from bokeh.plotting import figure
except ImportError as e:
msg = "Monitor() requires extra dependency 'bokeh'. Install with: pip install ogstools[monitor] or pip install bokeh"
raise RuntimeError(msg) from e
from watchdog.observers import Observer
from ogstools.logparser import regexes as log_regex
from ogstools.logparser.log_file_handler import LogFileHandler
[docs]
class Monitor:
"""
A class to manage the data source for monitoring logs in Bokeh.
"""
[docs]
def __init__(self, notebook_execution: bool = True) -> None:
self.data_source = ColumnDataSource(
data={
"time_step": [],
"step_size": [],
"assembly_time": [],
"linear_solver_time": [],
"step_start_time": [],
"iteration_number": [],
}
)
self.data_source_iter = ColumnDataSource(
data={
"iteration_number": [],
"vspan": [],
"line_width": [],
"dx_x": [],
"dx_x_0": [],
"dx_x_1": [],
"dx_x_2": [],
"dx_x_3": [],
"dx_x_4": [],
"dx_x_5": [],
}
)
self._records: Queue = Queue()
self._status: log_regex.Context = log_regex.Context()
self._observer: Observer | None = None
self._log_file_handler: LogFileHandler | None = None
self.time_step_based_data = [
"step_start_time",
"step_size",
"assembly_time",
"linear_solver_time",
"iteration_number",
]
self.iteration_based_data = [
"dx_x",
"dx_x_0",
"dx_x_1",
"dx_x_2",
"dx_x_3",
"dx_x_4",
"dx_x_5",
]
self.ylabels = {
"step_start_time": "time (s)",
"step_size": "time step size (s)",
"assembly_time": "assembly time (s)",
"linear_solver_time": "linear solver time (s)",
"iteration_number": "iteration number",
"dx_x": "dx_x",
"dx_x_0": "dx_x_0",
"dx_x_1": "dx_x_1",
"dx_x_2": "dx_x_2",
"dx_x_3": "dx_x_3",
"dx_x_4": "dx_x_4",
"dx_x_5": "dx_x_5",
}
self.titles = {
"step_start_time": "Simulation Time",
"step_size": "Step Size",
"assembly_time": "Assembly Time per time step",
"linear_solver_time": "Linear Solver Time per time step",
"iteration_number": "Iteration Number per time step",
"dx_x": "Relative convergence dx_x",
"dx_x_0": "Relative convergence dx_x_0",
"dx_x_1": "Relative convergence dx_x_1",
"dx_x_2": "Relative convergence dx_x_2",
"dx_x_3": "Relative convergence dx_x_3",
"dx_x_4": "Relative convergence dx_x_4",
"dx_x_5": "Relative convergence dx_x_5",
}
self.notebook_execution = notebook_execution
[docs]
def start_log_file_handler(self, log_file: Path) -> None:
"""
Set up the log file handler to monitor the log file.
:param log_file: The path to the log file to monitor.
"""
# Stop any previous observer before creating a new one
if hasattr(self, "_observer") and self._observer is not None:
self._observer.stop()
self._observer.join()
# Reset context for fresh run
self._status = log_regex.Context()
observer = Observer()
self._observer = observer
self._log_file_handler = LogFileHandler(
log_file,
queue=self._records,
status=self._status,
stop_callback=lambda: (
print("Stop Observer"),
observer.stop(),
),
)
self._observer.schedule(
self._log_file_handler,
path=str(log_file.parent),
recursive=False,
)
print("Starting observer...")
self._observer.start()
[docs]
def update_data(
self,
handle_line_chart: CommsHandle,
time_window_length: int,
iteration_window_length: int,
update_interval: int = 2,
) -> None:
"""Update the data source with new records from the queue.
:param handle_line_chart: The handle for the Bokeh line chart.
:param time_window_length: The length of the time window for the data.
:param iteration_window_length: The length of the iteration window for the data.
:param update_interval: The interval in seconds to update the plot.
"""
def clip(data: list, window_length: int) -> list:
if len(data) > window_length:
return data[-window_length:]
return data
t0 = time.time()
while True:
try:
item = self._records.get_nowait()
except Empty:
break
if time.time() - t0 > update_interval:
break
if isinstance(item, log_regex.Termination):
print(
f"Consumer: Termination signal ({item}) received. Exiting."
)
break
if isinstance(item, log_regex.TimeStepStart):
if time_window_length == 0:
new_row = {
"step_size": [item.step_size],
"time_step": [item.time_step],
"assembly_time": [0],
"linear_solver_time": [0],
"step_start_time": [item.step_start_time],
"iteration_number": [0],
}
self.data_source.stream(new_row)
else:
step_size = self.data_source.data["step_size"] + [
item.step_size
]
time_step = self.data_source.data["time_step"] + [
item.time_step
]
# data needs to have correct size, but some fields are updated later
assembly_time = self.data_source.data["assembly_time"] + [0]
linear_solver_time = self.data_source.data[
"linear_solver_time"
] + [0]
step_start_time = self.data_source.data[
"step_start_time"
] + [item.step_start_time]
iteration_number = self.data_source.data[
"iteration_number"
] + [0]
self.data_source.data = {
"step_size": clip(step_size, time_window_length),
"time_step": clip(time_step, time_window_length),
"assembly_time": clip(
assembly_time, time_window_length
),
"linear_solver_time": clip(
linear_solver_time, time_window_length
),
"step_start_time": clip(
step_start_time, time_window_length
),
"iteration_number": clip(
iteration_number, time_window_length
),
}
elif isinstance(item, log_regex.AssemblyTime):
index = len(self.data_source.data["assembly_time"]) - 1
new_time = (
self.data_source.data["assembly_time"][index]
+ item.assembly_time
)
self.data_source.patch({"assembly_time": [(index, new_time)]})
elif isinstance(item, log_regex.LinearSolverTime):
index = len(self.data_source.data["linear_solver_time"]) - 1
new_time = (
self.data_source.data["linear_solver_time"][index]
+ item.linear_solver_time
)
self.data_source.patch(
{"linear_solver_time": [(index, new_time)]}
)
elif isinstance(item, log_regex.IterationEnd):
index = len(self.data_source.data["iteration_number"]) - 1
iteration = item.iteration_number
self.data_source.patch(
{"iteration_number": [(index, iteration)]}
)
elif isinstance(item, log_regex.IterationStart):
iteration_offset = (
self.data_source_iter.data["iteration_number"][-1]
if self.data_source_iter.data["iteration_number"]
else 0
)
line_width_value = 0
if item.iteration_number == 1:
line_width_value = 1
if iteration_window_length == 0:
new_row = {
"iteration_number": [iteration_offset + 1],
"vspan": [iteration_offset + 0.75],
"line_width": [line_width_value],
"dx_x": [1],
"dx_x_0": [1],
"dx_x_1": [1],
"dx_x_2": [1],
"dx_x_3": [1],
"dx_x_4": [1],
"dx_x_5": [1],
}
self.data_source_iter.stream(new_row)
else:
iteration_number = self.data_source_iter.data[
"iteration_number"
] + [iteration_offset + 1]
vspan = self.data_source_iter.data["vspan"] + [
iteration_offset + 0.75
]
line_width = self.data_source_iter.data["line_width"] + [
line_width_value
]
dx_x = self.data_source_iter.data["dx_x"] + [1]
dx_x_0 = self.data_source_iter.data["dx_x_0"] + [1]
dx_x_1 = self.data_source_iter.data["dx_x_1"] + [1]
dx_x_2 = self.data_source_iter.data["dx_x_2"] + [1]
dx_x_3 = self.data_source_iter.data["dx_x_3"] + [1]
dx_x_4 = self.data_source_iter.data["dx_x_4"] + [1]
dx_x_5 = self.data_source_iter.data["dx_x_5"] + [1]
self.data_source_iter.data = {
"iteration_number": clip(
iteration_number, iteration_window_length
),
"vspan": clip(vspan, iteration_window_length),
"line_width": clip(line_width, iteration_window_length),
"dx_x": clip(dx_x, iteration_window_length),
"dx_x_0": clip(dx_x_0, iteration_window_length),
"dx_x_1": clip(dx_x_1, iteration_window_length),
"dx_x_2": clip(dx_x_2, iteration_window_length),
"dx_x_3": clip(dx_x_3, iteration_window_length),
"dx_x_4": clip(dx_x_4, iteration_window_length),
"dx_x_5": clip(dx_x_5, iteration_window_length),
}
elif isinstance(item, log_regex.TimeStepConvergenceCriterion):
index = len(self.data_source_iter.data["iteration_number"]) - 1
self.data_source_iter.patch({"dx_x": [(index, item.dx_x)]})
elif isinstance(item, log_regex.ComponentConvergenceCriterion):
index = len(self.data_source_iter.data["iteration_number"]) - 1
self.data_source_iter.patch(
{f"dx_x_{item.component}": [(index, item.dx_x)]}
)
if self.notebook_execution is True:
push_notebook(handle=handle_line_chart)