Source code for ogstools.logparser.log_file_handler

# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#


from collections.abc import Callable
from pathlib import Path
from queue import Queue
from typing import Any

from watchdog.events import (
    DirModifiedEvent,
    FileCreatedEvent,
    FileModifiedEvent,
    FileSystemEventHandler,
)

from ogstools.logparser.log_parser import (
    normalize_regex,
    parse_line,
    select_regex,
)
from ogstools.logparser.regexes import Context, Log, Termination


[docs] class LogFileHandler(FileSystemEventHandler):
[docs] def __init__( self, file_name: str | Path, queue: Queue, status: Context, stop_callback: Callable[[], tuple[None, Any]], force_parallel: bool = False, line_limit: int = 0, ): """ :param file_name: The location of the log file to monitor. :param queue: The queue where log entries are put and to be consumed. :status: The status of the simulation (e.g. current time step). :stop_callback: A callback function to stop the simulation. :force_parallel: Only needed for MPI run with 1 process. Then it must be set to True. :line_limit: The number of lines to read before stopping the simulation. 0 means no limit. """ self.file_name = Path(file_name) self.queue = queue self.status = status self.stop_callback = stop_callback self.force_parallel = force_parallel self.line_limit = line_limit self._file_read: bool = False self.num_lines_read: int = 0 # real time monitoring is only working for log version 2 and serial logs or parallel sim without (ogs --parallel_log) self.patterns: list = normalize_regex( select_regex(version=2), parallel_log=False )
[docs] def on_created(self, event: FileCreatedEvent) -> None: if event.src_path != str(self.file_name): return print(f"{self.file_name} has been created.") self.process()
[docs] def on_modified(self, event: FileModifiedEvent | DirModifiedEvent) -> None: if event.src_path != str(self.file_name): return self.process()
[docs] def process(self) -> None: if not self._file_read: try: self._file: Any = self.file_name.open("r") self._file.seek(0, 0) self._file_read = True except FileNotFoundError: print(f"File not found yet: {self.file_name}") return print(f"{self.file_name} has been modified.") while True: line = self._file.readline() num_lines_current = self.num_lines_read + 1 if not line or not line.endswith("\n"): break # Wait for complete line before processing log_entry: Log | Termination | None = parse_line( self.patterns, line, parallel_log=False, number_of_lines_read=num_lines_current, ) if log_entry: assert isinstance(log_entry, Log | Termination) self.queue.put(log_entry) self.status.update(log_entry) if isinstance(log_entry, Termination): print("===== Termination =====") self.queue.put(log_entry) self.status.update(log_entry) self.stop_callback() self._file.close() break if self.line_limit > 0 and self.num_lines_read > self.line_limit: self.stop_callback() self._file.close() break self.num_lines_read = self.num_lines_read + 1