Source code for ogstools.logparser.monitor

# SPDX-FileCopyrightText: Copyright (c) OpenGeoSys Community (opengeosys.org)
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import time
from pathlib import Path
from queue import Empty, Queue

try:
    from bokeh.io import push_notebook
    from bokeh.io.notebook import CommsHandle
    from bokeh.models import ColumnDataSource
    from bokeh.plotting import figure

except ImportError as e:
    msg = "Monitor() requires extra dependency 'bokeh'. Install with: pip install ogstools[monitor] or pip install bokeh"
    raise RuntimeError(msg) from e
from watchdog.observers import Observer

from ogstools.logparser import regexes as log_regex
from ogstools.logparser.log_file_handler import LogFileHandler


[docs] class Monitor: """ A class to manage the data source for monitoring logs in Bokeh. """
[docs] def __init__(self, notebook_execution: bool = True) -> None: self.data_source = ColumnDataSource( data={ "time_step": [], "step_size": [], "assembly_time": [], "linear_solver_time": [], "step_start_time": [], "iteration_number": [], } ) self.data_source_iter = ColumnDataSource( data={ "iteration_number": [], "vspan": [], "line_width": [], "dx_x": [], "dx_x_0": [], "dx_x_1": [], "dx_x_2": [], "dx_x_3": [], "dx_x_4": [], "dx_x_5": [], } ) self._records: Queue = Queue() self._status: log_regex.Context = log_regex.Context() self._observer: Observer | None = None self._log_file_handler: LogFileHandler | None = None self.time_step_based_data = [ "step_start_time", "step_size", "assembly_time", "linear_solver_time", "iteration_number", ] self.iteration_based_data = [ "dx_x", "dx_x_0", "dx_x_1", "dx_x_2", "dx_x_3", "dx_x_4", "dx_x_5", ] self.ylabels = { "step_start_time": "time (s)", "step_size": "time step size (s)", "assembly_time": "assembly time (s)", "linear_solver_time": "linear solver time (s)", "iteration_number": "iteration number", "dx_x": "dx_x", "dx_x_0": "dx_x_0", "dx_x_1": "dx_x_1", "dx_x_2": "dx_x_2", "dx_x_3": "dx_x_3", "dx_x_4": "dx_x_4", "dx_x_5": "dx_x_5", } self.titles = { "step_start_time": "Simulation Time", "step_size": "Step Size", "assembly_time": "Assembly Time per time step", "linear_solver_time": "Linear Solver Time per time step", "iteration_number": "Iteration Number per time step", "dx_x": "Relative convergence dx_x", "dx_x_0": "Relative convergence dx_x_0", "dx_x_1": "Relative convergence dx_x_1", "dx_x_2": "Relative convergence dx_x_2", "dx_x_3": "Relative convergence dx_x_3", "dx_x_4": "Relative convergence dx_x_4", "dx_x_5": "Relative convergence dx_x_5", } self.notebook_execution = notebook_execution
[docs] def start_log_file_handler(self, log_file: Path) -> None: """ Set up the log file handler to monitor the log file. :param log_file: The path to the log file to monitor. """ # Stop any previous observer before creating a new one if hasattr(self, "_observer") and self._observer is not None: self._observer.stop() self._observer.join() # Reset context for fresh run self._status = log_regex.Context() observer = Observer() self._observer = observer self._log_file_handler = LogFileHandler( log_file, queue=self._records, status=self._status, stop_callback=lambda: ( print("Stop Observer"), observer.stop(), ), ) self._observer.schedule( self._log_file_handler, path=str(log_file.parent), recursive=False, ) print("Starting observer...") self._observer.start()
[docs] def generate_figure(self, log_data: str, time_y_axis_type: str) -> figure: """Generates a Bokeh figure for the given log data.""" if log_data not in self.ylabels: msg = f"Log data '{log_data}' is not recognized." raise ValueError(msg) def axis_type(log_data: str, time_y_axis_type: str) -> str: if log_data in ["step_start_time", "step_size"]: return time_y_axis_type if log_data in [ "assembly_time", "linear_solver_time", "iteration_number", ]: return "linear" if log_data in self.iteration_based_data: return "log" msg = f"Log data '{log_data}' is not recognized." raise ValueError(msg) fig = figure( width=500, height=450, tooltips=[ (log_data, f"@{log_data}"), ], title=f"OGS Log Monitor: {self.titles[log_data]}", y_axis_type=axis_type(log_data, time_y_axis_type), ) if log_data in self.time_step_based_data: fig.line( x="time_step", y=log_data, line_color="blue", line_width=3.0, source=self.data_source, ) fig.xaxis.axis_label = "Time Step" fig.yaxis.axis_label = self.ylabels[log_data] print(f"Plotting {log_data} against time_step") elif log_data in self.iteration_based_data: fig.line( x="iteration_number", y=log_data, line_color="blue", line_width=3.0, source=self.data_source_iter, ) fig.vspan( x="vspan", line_width="line_width", line_color="tomato", source=self.data_source_iter, ) fig.xaxis.axis_label = "Iteration Number" fig.yaxis.axis_label = self.ylabels[log_data] print(f"Plotting {log_data} against iteration_number") return fig
[docs] def update_data( self, handle_line_chart: CommsHandle, time_window_length: int, iteration_window_length: int, update_interval: int = 2, ) -> None: """Update the data source with new records from the queue. :param handle_line_chart: The handle for the Bokeh line chart. :param time_window_length: The length of the time window for the data. :param iteration_window_length: The length of the iteration window for the data. :param update_interval: The interval in seconds to update the plot. """ def clip(data: list, window_length: int) -> list: if len(data) > window_length: return data[-window_length:] return data t0 = time.time() while True: try: item = self._records.get_nowait() except Empty: break if time.time() - t0 > update_interval: break if isinstance(item, log_regex.Termination): print( f"Consumer: Termination signal ({item}) received. Exiting." ) break if isinstance(item, log_regex.TimeStepStart): if time_window_length == 0: new_row = { "step_size": [item.step_size], "time_step": [item.time_step], "assembly_time": [0], "linear_solver_time": [0], "step_start_time": [item.step_start_time], "iteration_number": [0], } self.data_source.stream(new_row) else: step_size = self.data_source.data["step_size"] + [ item.step_size ] time_step = self.data_source.data["time_step"] + [ item.time_step ] # data needs to have correct size, but some fields are updated later assembly_time = self.data_source.data["assembly_time"] + [0] linear_solver_time = self.data_source.data[ "linear_solver_time" ] + [0] step_start_time = self.data_source.data[ "step_start_time" ] + [item.step_start_time] iteration_number = self.data_source.data[ "iteration_number" ] + [0] self.data_source.data = { "step_size": clip(step_size, time_window_length), "time_step": clip(time_step, time_window_length), "assembly_time": clip( assembly_time, time_window_length ), "linear_solver_time": clip( linear_solver_time, time_window_length ), "step_start_time": clip( step_start_time, time_window_length ), "iteration_number": clip( iteration_number, time_window_length ), } elif isinstance(item, log_regex.AssemblyTime): index = len(self.data_source.data["assembly_time"]) - 1 new_time = ( self.data_source.data["assembly_time"][index] + item.assembly_time ) self.data_source.patch({"assembly_time": [(index, new_time)]}) elif isinstance(item, log_regex.LinearSolverTime): index = len(self.data_source.data["linear_solver_time"]) - 1 new_time = ( self.data_source.data["linear_solver_time"][index] + item.linear_solver_time ) self.data_source.patch( {"linear_solver_time": [(index, new_time)]} ) elif isinstance(item, log_regex.IterationEnd): index = len(self.data_source.data["iteration_number"]) - 1 iteration = item.iteration_number self.data_source.patch( {"iteration_number": [(index, iteration)]} ) elif isinstance(item, log_regex.IterationStart): iteration_offset = ( self.data_source_iter.data["iteration_number"][-1] if self.data_source_iter.data["iteration_number"] else 0 ) line_width_value = 0 if item.iteration_number == 1: line_width_value = 1 if iteration_window_length == 0: new_row = { "iteration_number": [iteration_offset + 1], "vspan": [iteration_offset + 0.75], "line_width": [line_width_value], "dx_x": [1], "dx_x_0": [1], "dx_x_1": [1], "dx_x_2": [1], "dx_x_3": [1], "dx_x_4": [1], "dx_x_5": [1], } self.data_source_iter.stream(new_row) else: iteration_number = self.data_source_iter.data[ "iteration_number" ] + [iteration_offset + 1] vspan = self.data_source_iter.data["vspan"] + [ iteration_offset + 0.75 ] line_width = self.data_source_iter.data["line_width"] + [ line_width_value ] dx_x = self.data_source_iter.data["dx_x"] + [1] dx_x_0 = self.data_source_iter.data["dx_x_0"] + [1] dx_x_1 = self.data_source_iter.data["dx_x_1"] + [1] dx_x_2 = self.data_source_iter.data["dx_x_2"] + [1] dx_x_3 = self.data_source_iter.data["dx_x_3"] + [1] dx_x_4 = self.data_source_iter.data["dx_x_4"] + [1] dx_x_5 = self.data_source_iter.data["dx_x_5"] + [1] self.data_source_iter.data = { "iteration_number": clip( iteration_number, iteration_window_length ), "vspan": clip(vspan, iteration_window_length), "line_width": clip(line_width, iteration_window_length), "dx_x": clip(dx_x, iteration_window_length), "dx_x_0": clip(dx_x_0, iteration_window_length), "dx_x_1": clip(dx_x_1, iteration_window_length), "dx_x_2": clip(dx_x_2, iteration_window_length), "dx_x_3": clip(dx_x_3, iteration_window_length), "dx_x_4": clip(dx_x_4, iteration_window_length), "dx_x_5": clip(dx_x_5, iteration_window_length), } elif isinstance(item, log_regex.TimeStepConvergenceCriterion): index = len(self.data_source_iter.data["iteration_number"]) - 1 self.data_source_iter.patch({"dx_x": [(index, item.dx_x)]}) elif isinstance(item, log_regex.ComponentConvergenceCriterion): index = len(self.data_source_iter.data["iteration_number"]) - 1 self.data_source_iter.patch( {f"dx_x_{item.component}": [(index, item.dx_x)]} ) if self.notebook_execution is True: push_notebook(handle=handle_line_chart)